十年河东,十年河西,莫欺少年穷
学无止境,精益求精
上一节介绍了RabbitMQ定向模式,本篇介绍Rabbitmq 的消息确认机制
我的系列博客:
NetCore RabbitMQ Topics 通配符模式
NetCore RabbitMQ ,Routing定向模式
NetCore RabbitMQ 发布订阅模式,消息广播
RabbitMQ的六种工作模式
NetCore RabbitMQ 简介及兔子生产者、消费者 【简单模式,work工作模式,竞争消费】
windows环境下,RabbitMQ 安装教程
kafka、Rabbitmq、EasyNetQ NetCore 源码下载
在一些场合,如支付时每一条消息都必须保证成功的被处理。
AMQP是金融级的消息队列协议,有很高的可靠性,这里介绍在使用RabbitMQ时怎么保证消息被成功处理的。
消息确认可以分为两种:
一种是生产者发送消息到Broke时,Broker给生产者发送确认回执,用于告诉生产者消息已被成功发送到Broker;
一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于通知消息已成功被消费者接收。
生产者端消息确认机制分为两种,一种是基于事务机制,另一种是基于Confrim确认机制。事务机制占用资源较多,会拉低生产效率,因此,事务模式在市场上用的比较少。
事务机制类似于数据库事务,要先开启事务,发完消息后,提交事务,发生异常时回滚事务。具体展现在C#代码如下:
channel.TxSelect(); //开启事务模式
channel.TxCommit();//提交事务
channel.TxRollback();//异常时,回滚事务
使用事务机制,我们首先要通过txSelect方法开启事务,然后发布消息给broker服务器了,如果txCommit提交成功了,则说明消息成功被broker接收了;
如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们可以捕获异常,通过txRollback回滚事务。看一个事务机制的简单实现:
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel
using (var channel = connection.CreateModel())
{
string Ename = "MyExChange";
channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null);
//声明广播的队列
string QnameName = "MyQueue";
channel.QueueDeclare(QnameName, false, false, false, null);
string routingKey = "MyroutingKey"; //
//
channel.QueueBind(QnameName, Ename, routingKey);
var messages = "Hello,RabbitMQ的事务方式"; //
try
{
channel.TxSelect(); //开启事务模式
//发送消息
for (int i = 0; i < 10; i++)
{
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //
}
channel.TxCommit();//提交事务
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
channel.TxRollback();//异常时,回滚事务
}
}
}
Console.Read();
}
}
}
上述代码执行后
如果我们将上述代码:channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); // 中的routingKey修改为空字符串,如下:
channel.BasicPublish(Ename, "", null, Encoding.UTF8.GetBytes(messages + "_" + i)); //
再次运行发现,消息并不能发送到队列中,程序也不会报异常。也就是说,事务提交了,但消息并没发出去。
因此:虽说执行了事务提交,程序也没报异常,但消息不一定会发出去。
C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect(),WaitForConfirms(),WaitForConfirmOrDie()
channel.ConfirmSelect() 表示开启Confirm模式;
channel.WaitForConfirms() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。
channel.WaitForConfirmsOrDie() 和WaitForConfirms作用类似,也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个OperationInterrupedException类型异常。
看一个Confirm模式的简单实现:
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel
using (var channel = connection.CreateModel())
{
string Ename = "MyExChange";
channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null);
//声明广播的队列
string QnameName = "MyQueue";
channel.QueueDeclare(QnameName, false, false, false, null);
string routingKey = "MyroutingKey"; //
//
channel.QueueBind(QnameName, Ename, routingKey);
var messages = "Hello,RabbitMQ的事务方式"; //
channel.ConfirmSelect(); // 启用服务器确认机制方式
//
for (int i = 0; i < 10; i++)
{
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //发送消息
}
if (channel.WaitForConfirms())
{
Console.WriteLine("消息发送成功");
}
else
{
//重发 或者 写具体的处理逻辑
Console.WriteLine("消息发送失败");
}
}
}
Console.Read();
}
}
}
这里需要说明的是,WaitForConfirms是指等待所有消息确认,如果你在调试过程中,将发送消息刻意循环三次,在执行WaitForConfirms时,返回值依旧是True,因此三次发送均成功了。
我在网上看到还有一种写法是这样的,如下:
for (int i = 0; i < 10; i++)
{
channel.ConfirmSelect(); // 启用服务器确认机制方式
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //发送消息
if (channel.WaitForConfirms())
{
Console.WriteLine("消息发送成功");
}
else
{
//重发 或者 写具体的处理逻辑
Console.WriteLine("消息发送失败");
}
}
每发一次消息,确认一次,这种写法无疑会浪费资源。大家有何看法,欢迎评论。~_~
同理
如果我们将上述代码:channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); // 中的routingKey修改为空字符串,如下:
channel.BasicPublish(Ename, "", null, Encoding.UTF8.GetBytes(messages + "_" + i)); //
再次运行发现,消息并不能发送到队列中,程序也不会报异常。但WaitForConfirms依旧返回True
因此,这种机制还事务模式一样,都不能完全保证消息发送到 队列。
从Broke发送到消费者时,RabbitMQ提供了两种消息确认的方式:自动确认和显示确认。
自动确认:当RabbbitMQ将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可,如下:
channel.BasicConsume(Qname, true, consumer);//开启自动确认
注意:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了。
消费者代码如下
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQConsumer_2
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"消费者收到消息: {message}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100);
};
//启动消费者
string Qname = "MyQueue";
channel.BasicConsume(Qname, true, consumer);//开启自动确认
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
}
View Code
2 显示确认
自动确认可能会出现消息丢失的问题,我们不免会想到:Broker收到回执后才删除消息,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执,这样就保证消费端不会丢失消息了!这正是显式确认的思路。使用显示确认也比较简单,首先将Resume方法的参数autoAck设置为false,然后在消费端使用代码channel.BasicAck()/BasicReject()等方法来确认和拒绝消息。看一个栗子:
生产者代码:
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel
using (var channel = connection.CreateModel())
{
string Ename = "MyExChange";
channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null);
//声明广播的队列
string QnameName = "MyQueue";
channel.QueueDeclare(QnameName, false, false, false, null);
string routingKey = "MyroutingKey"; //
//
channel.QueueBind(QnameName, Ename, routingKey);
var messages = "MyHello,RabbitMQ"; //
//
for (int i = 0; i < 10; i++)
{
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //发送消息
}
}
}
Console.Read();
}
}
}
View Code
消费者代码:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQConsumer_2
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
//确认该消息已被消费
if (message.StartsWith("Hello"))
{
Console.WriteLine($"消费者收到消息: {message}");
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
Console.WriteLine($"消费者拒绝接收的消息: {message}");
//拒绝接收
channel.BasicReject(ea.DeliveryTag, false);
}
};
//启动消费者
string Qname = "MyQueue";
channel.BasicConsume(Qname, false, consumer);//开启自动确认
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
}
View Code
注意:显示确认时,自动确认要关闭。
//确认该消息已被消费
if (message.StartsWith("Hello"))
{
Console.WriteLine($"消费者收到消息: {message}");
//deliveryTag 参数分发的标记
//multiple 是否确认多条
//void BasicAck(ulong deliveryTag, bool multiple);
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
Console.WriteLine($"消费者拒绝接收的消息: {message}");
//deliveryTag 参数分发的标记
// requeue false 时,拒绝的消息会被直接删除 true 拒绝的消息会被重新放入队列中
//void BasicReject(ulong deliveryTag, bool requeue);
//拒绝接收
channel.BasicReject(ea.DeliveryTag, false);
}
介绍一下代码中的两个方法:channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple:false);方法用于确认消息,deliveryTag参数是分发的标记,multiple表示是否确认多条。channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue:false);方法用于拒绝消息,deliveryTag也是指分发的标记,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。
运行这两个应用程序,通过生产者发送两条消息,效果如下:
一些意外的情况:使用显式确认时,如果消费者处理完消息不发送确认回执,那么消息不会被删除,消息的状态一直是Unacked,这条消息也不会再发送给其他消费者。如果一个消费者在处理消息时尚未发送确认回执的情况下挂掉了,那么消息会被重新放入队列(状态从Unacked变成Ready),有其他消费者存时,消息会发送给其他消费者。
例如,我们将上述消费者代码中的回执部分注释掉,如下
再次生产消息后,运行消费者端代码,此时,消息的状态为:Unacked
关闭消费者端调试后,消息状态又变成了 Ready
@天才卧龙的波尔卡
微信发消息发不出去一直转圈圈怎么办 微信消息发送失败解决办法