• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ报错 Already closed: The AMQP operation was interrupted

武飞扬头像
_Hey_Jude
帮助1

C#使用rabbitmq在接收消息事件处理中报错:

Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead', classId=60, methodId=40

解决办法是将接收事件代码里面末尾加个线程休眠“System.Threading.Thread.Sleep(1);”

  1.  
    /// <summary>
  2.  
    /// 监听消息队里的消息
  3.  
    /// </summary>
  4.  
    /// <param name="func">外围业务方法</param>
  5.  
    public static void Receive(Func<string, bool> func)
  6.  
    {
  7.  
    try
  8.  
    {
  9.  
    //如果未连接则重连连接队列
  10.  
    AgainInitMessageQueue();
  11.  
     
  12.  
    //创建消费者对象
  13.  
    var consumer = new EventingBasicConsumer(channel);
  14.  
    //监听消费事件,如果执行shutdown了,此事件会执行失败
  15.  
    consumer.Received = (model, ea) =>
  16.  
    {
  17.  
    //接收到的消息
  18.  
    var message = Encoding.UTF8.GetString(ea.Body.Span);
  19.  
    //委托外围方法处理业务
  20.  
    var result = func(message);
  21.  
    if (result)
  22.  
    {
  23.  
    //业务处理成功后单条通知生产者
  24.  
    channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答
  25.  
    LogHelper.Info(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-消费成功,message={message}");
  26.  
    }
  27.  
    else
  28.  
    {
  29.  
    //业务处理失败需要重回队列等待消费
  30.  
    //channel.BasicReject(ea.DeliveryTag, true);//拒绝接收单条消息,是否重回队列
  31.  
    LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-业务处理失败,进入缓存等待重新入列,message={message}", true);
  32.  
     
  33.  
    //###为了防止队里一直处于等到出列,导致后面数据无法消费,此处自动消费成功,进入缓存等待重新入列消费####
  34.  
    //添加处理失败的数据进缓存中
  35.  
    //添加消息重试次数缓存 超过上限不再进行重试
  36.  
    var mqBaseModel = SerializerHelper.DeserializerJson<MQBaseModel>(message);
  37.  
    if (mqBaseModel != null)
  38.  
    {
  39.  
    int retryTimes = ConvertBasic.ToInt32(RedisClient.GetValue<int>(CacheKey.SendMsgFailRetryKey mqBaseModel.MQCode));
  40.  
    RedisClient.SetValue(CacheKey.SendMsgFailRetryKey mqBaseModel.MQCode, retryTimes 1);
  41.  
    }
  42.  
    var ret = RedisClient.SAdd(CacheKey.SendMsgFailListKey, message);
  43.  
    if (!ret)
  44.  
    {
  45.  
    LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-添加处理失败的数据进缓存中失败,message={message}", true);
  46.  
    }
  47.  
    channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答
  48.  
    }
  49.  
    Thread.Sleep(1);
  50.  
    //此行代码必须,不然写入队列消息会报错
  51.  
    //错误消息:Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead', classId=60, methodId=40
  52.  
    };
  53.  
    //监听shutdown事件
  54.  
    consumer.Shutdown = (model, e) =>
  55.  
    {
  56.  
    //记录日志
  57.  
    LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-shutdown被执行,队列监听失败e={SerializerHelper.SerializerJson(e)}", true);
  58.  
    };
  59.  
    //消费者开启监听,手动应答
  60.  
    channel.BasicQos(0, 1, false);//逐条消费
  61.  
    channel.BasicConsume(queue: MQNameConfig.CommentQueue, autoAck: false, consumer: consumer);
  62.  
    }
  63.  
    catch (Exception ex)
  64.  
    {
  65.  
    LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-监听消息队里的消息出错,{ex.Message}", ex, true);
  66.  
    }
  67.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgffgci
系列文章
更多 icon
同类精品
更多 icon
继续加载