RabbitMQ实例C# - 西伯利亚的狼 - 博客园

来源: RabbitMQ实例C# – 西伯利亚的狼 – 博客园

驱动组件.NET版本

官网推荐驱动:RabbitMQ.Client

https://www.rabbitmq.com/devtools.html#dotnet-dev

Connection和Channel

Connection是一个TCP连接,一般服务器这种资源都是很宝贵的,所以提供了Channel,完成消息的发布消费。这样Connection就可以做成单例模式的。

1、事件

Connection和Channel里面包含了几个事件。分别在不同的情况下触发

 

其他时间执行发生异常,就会执行这个

Connection.CallbackException

恢复连接成功

Connection.RecoverySucceeded

连接恢复异常时会触发这个事件

Connection.ConnectionRecoveryError

RabbitMQ出于自身保护策略,通过阻塞方式限制写入,导致了生产者应用“假死”,不对外服务。比若说CPU  IO RAM下降,队列堆积,导致堵塞。  就会触发这个事件

Connection.ConnectionBlocked

阻塞解除会触发这个事件

Connection.ConnectionUnblocked

connection断开连接时候

Connection.ConnectionShutdown

——————————————————————————————————————————————————-

.NET RabbitMQ.Client中Channel叫做Model

 

channel断开连接时候触发

Channel.ModelShutdown

其他时间执行发生异常,就会执行这个

Channel.CallbackException

broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)

Channel.BasicReturn

Channel.BasicRecoverOk

Signalled when a Basic.Nack command arrives from the broker.

Channel.BasicNacks

Signalled when a Basic.Ack command arrives from the broker.

Channel.BasicAcks

Channel.FlowControl

2、属性

最大channel数量

connetion.ChannelMax

服务上这个连接的对象属性

connetion.ClientProperties

服务器上这个连接的名字

connetion.ClientProvidedName

关闭原因

connetion.CloseReason

端口

connetion.Endpoint

和客户端通信时所允许的最大的frame size

connetion.FrameMax

连接的心跳包

connetion.Heartbeat

是否打开

connetion.IsOpen

获取vhost

connetion.KnownHosts

本地端口

connetion.LocalPort

连接串使用的协议

connetion.Protocol

远程端口,服务器

connetion.RemotePort

服务器属性

connetion.ServerProperties

关停信息

connetion.ShutdownReport

—————————————————————————————————————————————-

channel编号

channel.ChannelNumber

关闭原因

channel.CloseReason

连接超时时间

channel.ContinuationTimeout

channel.IsClosed

channel.IsOpen

下一个消息编号

channel.NextPublishSeqNo

3、方法

终止连接以及他们的channel,可以指定时间长度。

connetion.Abort()

关闭连接以及他的channel

connetion.Close()

创建channel

connetion.CreateModel()

connetion.HandleConnectionBlocked()

connetion.HandleConnectionUnblocked()

 

发送消息Confirm模式

目的确认消息是否到达消息队列中 

 

1、mandatory

broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)

channel.BasicReturn += Channel_BasicReturn;
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

 

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
{
    Console.WriteLine("Channel_BasicReturn");
}

 

2、普通Confirm模式

复制代码
channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey1",mandatory:true, basicProperties: null, body: body);

if (channel.WaitForConfirms())
{
    Console.WriteLine("普通发送方确认模式");
}
复制代码

 

3、批量Confirm模式

复制代码
channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey",mandatory:true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

channel.WaitForConfirmsOrDie();
Console.WriteLine("普通发送方确认模式");
复制代码

 

 

4、异步Confirm模式

java版本组件有

5、事物

 

复制代码
try
{
    //声明事物
    channel.TxSelect();
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    //提交事物
    channel.TxCommit();
}
catch (Exception)
{
    //回滚
    channel.TxRollback();

}
复制代码

上面说的是生产者发布消息确认,那么消费者消费如何确认呢,大家都知道消费者有ack机制,但是用到事物的时候,是怎样的呢

1.autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决之后,在做决定是确认消息还是重新放回队列,如果你手动确认现在之后,又回滚了事务,那么已事务回滚为主,此条消息会重新放回队列;
2.autoAck=true如果自定确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;

 

事物比较耗性能

 

简单消息发送
复制代码
static void Main(string[] args)
        {


            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.140.161";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "TestVHost";
            //创建connetion
            using (var connetion = factory.CreateConnection())
            {
                connetion.CallbackException += Connetion_CallbackException;
                connetion.RecoverySucceeded += Connetion_RecoverySucceeded;
                connetion.ConnectionRecoveryError += Connetion_ConnectionRecoveryError;
                connetion.ConnectionBlocked += Connetion_ConnectionBlocked;
                connetion.ConnectionUnblocked += Connetion_ConnectionUnblocked;
                //连接关闭的时候
                connetion.ConnectionShutdown += Connetion_ConnectionShutdown;



                //创建channel
                using (var channel = connetion.CreateModel())
                {

                    //消息会在何时被 confirm?
                    //The broker will confirm messages once:
                    //broker 将在下面的情况中对消息进行 confirm :
                    //it decides a message will not be routed to queues
                    //(if the mandatory flag is set then the basic.return is sent first) or
                    //broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
                    //a transient message has reached all its queues(and mirrors) or
                    //非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
                    //a persistent message has reached all its queues(and mirrors) and been persisted to disk(and fsynced) or
                    //持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
                    //a persistent message has been consumed(and if necessary acknowledged) from all its queues
                    //持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)


                    //broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
                    channel.BasicReturn += Channel_BasicReturn;

                    //(可以不声明)如果不声明交换机 ,那么就使用默认的交换机  (每一个vhost都会有一个默认交换机)
                    //channel.ExchangeDeclare("amq.direct", ExchangeType.Direct,true);

                    //创建一个队列  bool durable(持久化), bool exclusive(专有的), bool autoDelete(自动删除)
                    //channel.QueueDeclare("TestQueue", true, false, false, null);
                    //不做绑定的话,使用默认的交换机。
                    //channel.QueueBind("TestQueue", "amq.direct", "MyRoutKey", null);

                    //发布消息
                    var body = Encoding.UTF8.GetBytes("西伯利亚的狼");


                    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
                }

                Console.WriteLine("Hello World!");
                Console.ReadKey();
            }
        }

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            Console.WriteLine("Channel_BasicReturn");
        }

private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionShutdown");
        }

private static void Connetion_ConnectionUnblocked(object sender, EventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionUnblocked");
        }

private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionBlocked");
        }

private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionRecoveryError");
        }

private static void Connetion_RecoverySucceeded(object sender, EventArgs e)
{
    Console.WriteLine("Connetion_RecoverySucceeded");
}

private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
{
    Console.WriteLine("Connetion_CallbackException");
}
复制代码

 

 

场景分析

 

 

消息持久化

Broker持久化、交换机持久化、队列持久化  。目的是维持重启后  这些东西的存在。

消息持久化,才是把消息持久化到硬盘中,因为消息在队列中,所以需要队列持久化。

设置消息持久化,需要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).  默认的就是持久化。

复制代码
//发布消息
var body = Encoding.UTF8.GetBytes("西伯利亚的狼");
BasicProperties pro = new BasicProperties();
pro.DeliveryMode = 2;

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: pro, body: body);
复制代码

 

消费者消费消息

为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息并且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就可以删除掉这条已经处理的消息任务。但如果消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会重新将消息入队,如果有另外一个消费端在线,就会快速的重新发送到另外一个消费端。

RabbitMQ中没有消息超时的概念,只有当消费端关闭或奔溃时,RabbitMQ才会重新分发消息。

复制代码
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "192.168.140.161";
factory.Port = 5672;
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "TestVHost";
//创建connetion
using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //构造消费者实例
        var consumer = new EventingBasicConsumer(channel);
        //绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模拟耗时
            Console.WriteLine(" [x] Done");
            // 主要改动的是将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        //启动消费者
        channel.BasicConsume(queue: "TestQueue", autoAck: false, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //构造消费者实例
        var consumer = new EventingBasicConsumer(channel);
        //绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模拟耗时
            Console.WriteLine(" [x] Done");
        };
        //启动消费者
        channel.BasicConsume(queue: "TestQueue", autoAck: true, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}
复制代码

 

 

消费负载均衡

1、当一个队列有多个消费者时,队列会以循环(round-robin)的方式发送给消费者。每条消息只会给一个订阅的消费者。

2、默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者将获得相同数量的消息。这种分发消息的方式叫做循环(round-robin)。

3、RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙情况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理一般任务处于空置状态,而只是它们分配的任务数量一样。

但我们可以通过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。

分库分表模式

比如说客户积分同步。

一般电商中这种数据量比较大,及时性比较高。

ID 编号 1-10000的用户积分表更放在队列1,10001-20000放在队列2,不同的消费者消费不同队列。以此类推…

RPC

第一步,主要是进行远程调用的客户端需要指定接收远程回调的队列,并申明消费者监听此队列。
第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。

远程调用客户端:

复制代码
//申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };
复制代码

远程调用服务端:

复制代码
//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
复制代码

 

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏