RabbitMQ消息持久化 - Bigberg - 博客园

mikel阅读(1016)

来源: RabbitMQ消息持久化 – Bigberg – 博客园

 

一、前言

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。当然还是会有一些小概率事件会导致消息丢失。

二、队列持久化

  2.1 查看存在的队列和消息数量

在windows环境下,在rabbitmq的安装目录/sbin下,通过rabbitmqctl.bat list_queues查看 

这边启动了两个producer,分别生成两个队列hello 和 hello1,并且他们都有一个消息存在

重启rabbitmq,模拟故障

可以看到重启后两个队列都消失了

  2.2 持久化队列

我们就hello队列持久化

在声明队列名称时,持久化队列,生产端和消费端都要

1
channel.queue_declare(queue='hello', durable=True)

我们重复上面的操作,但是给hello队列做持久化,而hello1不做,并重启rabbitmq

可以看到重启后,hello队列还在,hello1队列消失了,但是原本hello中的一条消息也没有保存下来。所以在这边我们仅仅做到了消息队列的持久化,还没有做消息持久化。

三、消息持久化

我们刚才实现了在rabbitmq崩溃的情况下,就队列本身保存下来,重启后队列还在。接下来我们要将消息也保存下来,即消息的持久化

1
2
3
4
5
6
7
8
9
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='hello',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ))
# 增加properties,这个properties 就是消费端 callback函数中的properties
# delivery_mode = 2  持久化消息

生产端生成一个消息,并重启rabbitmq

可以看到,经过队列和消息持久化后的hello, 在重启的情况下,队列和消息都存在,没有消失。

消费端再重启后也能正常接收

 

四、总结

  1. 队列持久化需要在声明队列时添加参数 durable=True,这样在rabbitmq崩溃时也能保存队列
  2. 仅仅使用durable=True ,只能持久化队列,不能持久化消息
  3. 消息持久化需要在消息生成时,添加参数 properties=pika.BasicProperties(delivery_mode=2)

RabbitMQ实例C# - 西伯利亚的狼 - 博客园

mikel阅读(1121)

来源: RabbitMQ实例C# – 西伯利亚的狼 – 博客园

驱动组件.NET版本

官网推荐驱动:RabbitMQ.Client

https://www.rabbitmq.com/devtools.html#dotnet-dev

Connection和Channel

Connection是一个TCP连接,一般服务器这种资源都是很宝贵的,所以提供了Channel,完成消息的发布消费。这样Connection就可以做成单例模式的。

1、事件

Connection和Channel里面包含了几个事件。分别在不同的情况下触发

 

其他时间执行发生异常,就会执行这个

Connection.CallbackException

恢复连接成功

Connection.RecoverySucceeded

连接恢复异常时会触发这个事件

Connection.ConnectionRecoveryError

RabbitMQ出于自身保护策略,通过阻塞方式限制写入,导致了生产者应用“假死”,不对外服务。比若说CPU  IO RAM下降,队列堆积,导致堵塞。  就会触发这个事件

Connection.ConnectionBlocked

阻塞解除会触发这个事件

Connection.ConnectionUnblocked

connection断开连接时候

Connection.ConnectionShutdown

——————————————————————————————————————————————————-

.NET RabbitMQ.Client中Channel叫做Model

 

channel断开连接时候触发

Channel.ModelShutdown

其他时间执行发生异常,就会执行这个

Channel.CallbackException

broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)

Channel.BasicReturn

Channel.BasicRecoverOk

Signalled when a Basic.Nack command arrives from the broker.

Channel.BasicNacks

Signalled when a Basic.Ack command arrives from the broker.

Channel.BasicAcks

Channel.FlowControl

2、属性

最大channel数量

connetion.ChannelMax

服务上这个连接的对象属性

connetion.ClientProperties

服务器上这个连接的名字

connetion.ClientProvidedName

关闭原因

connetion.CloseReason

端口

connetion.Endpoint

和客户端通信时所允许的最大的frame size

connetion.FrameMax

连接的心跳包

connetion.Heartbeat

是否打开

connetion.IsOpen

获取vhost

connetion.KnownHosts

本地端口

connetion.LocalPort

连接串使用的协议

connetion.Protocol

远程端口,服务器

connetion.RemotePort

服务器属性

connetion.ServerProperties

关停信息

connetion.ShutdownReport

—————————————————————————————————————————————-

channel编号

channel.ChannelNumber

关闭原因

channel.CloseReason

连接超时时间

channel.ContinuationTimeout

channel.IsClosed

channel.IsOpen

下一个消息编号

channel.NextPublishSeqNo

3、方法

终止连接以及他们的channel,可以指定时间长度。

connetion.Abort()

关闭连接以及他的channel

connetion.Close()

创建channel

connetion.CreateModel()

connetion.HandleConnectionBlocked()

connetion.HandleConnectionUnblocked()

 

发送消息Confirm模式

目的确认消息是否到达消息队列中 

 

1、mandatory

broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)

channel.BasicReturn += Channel_BasicReturn;
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

 

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
{
    Console.WriteLine("Channel_BasicReturn");
}

 

2、普通Confirm模式

复制代码
channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey1",mandatory:true, basicProperties: null, body: body);

if (channel.WaitForConfirms())
{
    Console.WriteLine("普通发送方确认模式");
}
复制代码

 

3、批量Confirm模式

复制代码
channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey",mandatory:true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

channel.WaitForConfirmsOrDie();
Console.WriteLine("普通发送方确认模式");
复制代码

 

 

4、异步Confirm模式

java版本组件有

5、事物

 

复制代码
try
{
    //声明事物
    channel.TxSelect();
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    //提交事物
    channel.TxCommit();
}
catch (Exception)
{
    //回滚
    channel.TxRollback();

}
复制代码

上面说的是生产者发布消息确认,那么消费者消费如何确认呢,大家都知道消费者有ack机制,但是用到事物的时候,是怎样的呢

1.autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决之后,在做决定是确认消息还是重新放回队列,如果你手动确认现在之后,又回滚了事务,那么已事务回滚为主,此条消息会重新放回队列;
2.autoAck=true如果自定确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;

 

事物比较耗性能

 

简单消息发送
复制代码
static void Main(string[] args)
        {


            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.140.161";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "TestVHost";
            //创建connetion
            using (var connetion = factory.CreateConnection())
            {
                connetion.CallbackException += Connetion_CallbackException;
                connetion.RecoverySucceeded += Connetion_RecoverySucceeded;
                connetion.ConnectionRecoveryError += Connetion_ConnectionRecoveryError;
                connetion.ConnectionBlocked += Connetion_ConnectionBlocked;
                connetion.ConnectionUnblocked += Connetion_ConnectionUnblocked;
                //连接关闭的时候
                connetion.ConnectionShutdown += Connetion_ConnectionShutdown;



                //创建channel
                using (var channel = connetion.CreateModel())
                {

                    //消息会在何时被 confirm?
                    //The broker will confirm messages once:
                    //broker 将在下面的情况中对消息进行 confirm :
                    //it decides a message will not be routed to queues
                    //(if the mandatory flag is set then the basic.return is sent first) or
                    //broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
                    //a transient message has reached all its queues(and mirrors) or
                    //非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
                    //a persistent message has reached all its queues(and mirrors) and been persisted to disk(and fsynced) or
                    //持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
                    //a persistent message has been consumed(and if necessary acknowledged) from all its queues
                    //持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)


                    //broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
                    channel.BasicReturn += Channel_BasicReturn;

                    //(可以不声明)如果不声明交换机 ,那么就使用默认的交换机  (每一个vhost都会有一个默认交换机)
                    //channel.ExchangeDeclare("amq.direct", ExchangeType.Direct,true);

                    //创建一个队列  bool durable(持久化), bool exclusive(专有的), bool autoDelete(自动删除)
                    //channel.QueueDeclare("TestQueue", true, false, false, null);
                    //不做绑定的话,使用默认的交换机。
                    //channel.QueueBind("TestQueue", "amq.direct", "MyRoutKey", null);

                    //发布消息
                    var body = Encoding.UTF8.GetBytes("西伯利亚的狼");


                    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
                }

                Console.WriteLine("Hello World!");
                Console.ReadKey();
            }
        }

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            Console.WriteLine("Channel_BasicReturn");
        }

private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionShutdown");
        }

private static void Connetion_ConnectionUnblocked(object sender, EventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionUnblocked");
        }

private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionBlocked");
        }

private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionRecoveryError");
        }

private static void Connetion_RecoverySucceeded(object sender, EventArgs e)
{
    Console.WriteLine("Connetion_RecoverySucceeded");
}

private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
{
    Console.WriteLine("Connetion_CallbackException");
}
复制代码

 

 

场景分析

 

 

消息持久化

Broker持久化、交换机持久化、队列持久化  。目的是维持重启后  这些东西的存在。

消息持久化,才是把消息持久化到硬盘中,因为消息在队列中,所以需要队列持久化。

设置消息持久化,需要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).  默认的就是持久化。

复制代码
//发布消息
var body = Encoding.UTF8.GetBytes("西伯利亚的狼");
BasicProperties pro = new BasicProperties();
pro.DeliveryMode = 2;

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: pro, body: body);
复制代码

 

消费者消费消息

为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息并且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就可以删除掉这条已经处理的消息任务。但如果消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会重新将消息入队,如果有另外一个消费端在线,就会快速的重新发送到另外一个消费端。

RabbitMQ中没有消息超时的概念,只有当消费端关闭或奔溃时,RabbitMQ才会重新分发消息。

复制代码
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "192.168.140.161";
factory.Port = 5672;
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "TestVHost";
//创建connetion
using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //构造消费者实例
        var consumer = new EventingBasicConsumer(channel);
        //绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模拟耗时
            Console.WriteLine(" [x] Done");
            // 主要改动的是将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        //启动消费者
        channel.BasicConsume(queue: "TestQueue", autoAck: false, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //构造消费者实例
        var consumer = new EventingBasicConsumer(channel);
        //绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模拟耗时
            Console.WriteLine(" [x] Done");
        };
        //启动消费者
        channel.BasicConsume(queue: "TestQueue", autoAck: true, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}
复制代码

 

 

消费负载均衡

1、当一个队列有多个消费者时,队列会以循环(round-robin)的方式发送给消费者。每条消息只会给一个订阅的消费者。

2、默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者将获得相同数量的消息。这种分发消息的方式叫做循环(round-robin)。

3、RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙情况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理一般任务处于空置状态,而只是它们分配的任务数量一样。

但我们可以通过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。

分库分表模式

比如说客户积分同步。

一般电商中这种数据量比较大,及时性比较高。

ID 编号 1-10000的用户积分表更放在队列1,10001-20000放在队列2,不同的消费者消费不同队列。以此类推…

RPC

第一步,主要是进行远程调用的客户端需要指定接收远程回调的队列,并申明消费者监听此队列。
第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。

远程调用客户端:

复制代码
//申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };
复制代码

远程调用服务端:

复制代码
//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
复制代码

 

RabbitMQ如何保证消息不丢失? - 简书

mikel阅读(1927)

来源: RabbitMQ如何保证消息不丢失? – 简书

image.png

按照RabbitMQ正常使用流程,生产者会发送一条消息到RabbitMQ服务器,消费者接收到消息进行消费。但是在实际情况下,生产者很有可能在到达RabbitMQ服务器后,由于服务器的某种原因导致消息丢失(因为RabbitMQ默认是将消息存储在内存中),一旦丢失就是再也找不到了。

那么我们如何保证消息不丢失呢?

RabbitMQ有相应的持久化机制,可以将Exchange、Queue、Message全部持久化到磁盘。

那如果在将消息持久化到磁盘的过程中服务器挂了呢?

那么则需要通过数据保护机制来保证我们的消息一定能存储到磁盘,如果不成功,消息生产者则一直发送这条消息。

在RabbitMq中有两种数据保护机制:

1. 事物机制:

当消息达到服务器,开启事物,只有当消息存储完毕才提交事物,向生产者发送成功通知。如果失败,也会向生产者发送失败消息,生产者接收失败消息则继续发送此消息。在这个过程中生产者需要同步等待,所以,事物机制虽然可以保证消息可靠性,但是采用的是同步方式,会造成性能下降。

2. confirm机制:

一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知。生产者在发送完消息后不会等待回应,所以confirm机制性能相对比事物机制高。

如何开启RabbtiMQ的confirm模式:

需要在配置文件中配置如下:

rabbitmq:
  publisher-confirms: true

如何开启队列持久化:

在声明队列时,设置durable属性为true

image

消息默认就是持久化到磁盘的。

具体生产者代码:

@Component

public class MessageSenderimplements RabbitTemplate.ConfirmCallback {

@Autowired

    private RabbitTemplaterabbitTemplate;

    @Autowired

    private RedisTemplateredisTemplate;

    public MessageSender(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

        rabbitTemplate.setConfirmCallback(this);

    }

@Override

    public void confirm(CorrelationData correlationData, boolean ack, String s) {

if (ack) {//ack : 成功或失败的布尔值

            //成功

            //在消息发送时,将消息存入redis中,方便消息发送失败时,从redis中取值

            //correlationData.getId()在发送消息时,需要生成的唯一标识

            redisTemplate.delete(correlationData.getId());

        }else {

//失败

            //从redis中获取参数

            Map map =redisTemplate.opsForHash().entries("message_" + correlationData.getId());

            String exchange = map.get("exchange");

            String message = map.get("message");

            String routingKey = map.get("routingKey");

            //重新发送消息

            rabbitTemplate.convertAndSend(exchange, routingKey, message);

        }

}

//自定义发送消息方法

    public void sendMessage(String exchange, String routingKey, String message) {

//设置消息的唯一标识,CorrelationData会在confirm方法中作为参数传给我们

        CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());

        //将消息存入redis

        redisTemplate.opsForValue().set(correlationData.getId(), message);

        //将本次发送消息的元信息存入redis,方便后面失败重新发送

        Map metaDataMap =new HashMap<>();

        metaDataMap.put("exchange", exchange);

        metaDataMap.put("routingKey", routingKey);

        metaDataMap.put("message", message);

        redisTemplate.opsForHash().putAll("message_" + correlationData.getId(), metaDataMap);

        //发送消息时,需要把correlationData对象带过去

        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);

    }

}

作者:养一只tom猫
链接:https://www.jianshu.com/p/ca65323dd22f
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

rabbitmq - C# Convert ReadOnlyMemory to byte[] - Stack Overflow

mikel阅读(2224)

来源: rabbitmq – C# Convert ReadOnlyMemory to byte[] – Stack Overflow

Given ReadOnlyMemory Struct I want to convert the stream into a string

I have the following code:

var body = ea.Body; //ea.Body is of Type ReadOnlyMemory<byte>
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);

And it gives the following error. I am using the latest C# with .NET CORE 3.1

enter image description here

Which is funny because I am literally copy pasting the Hello World example of a major product called RabbitMQ and it doesn’t compile.

 

解决方法:

I updated the RabbitMQ.Client package and had the same problem at my Consumer_Received method:

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        // Code

I checked BasicDeliverEventArgs and saw that Body is now a ReadOnlyMemory type:
public ReadOnlyMemory<byte> Body { get; set; }

As Jeff said, RabbitMQ changed their API, so I think this changed from previous tutorials we had on internet.

To fix, I only had to transform my Body message into a Array (into Consumer_Received method):
var message = Encoding.UTF8.GetString(e.Body.ToArray());

C#调用RabbitMQ实现消息队列 - jack_Meng - 博客园

mikel阅读(725)

来源: C#调用RabbitMQ实现消息队列 – jack_Meng – 博客园

前言

我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的。

所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始。

关于消息队列

其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的。

那么,接收数据的方式是什么呢?自然是端口监听啦。

那消息队列是什么就很好解释了?

它就是端口监听,接到数据后,将数据排列起来。

那这件事,我们不用中间件能做吗?

当然能做啦,写个TCP/UDP/Socket的软件就可以做啦。

举个简单的例子,如下图:

既然自己可以做消息队列,那为什么要用RabbitMQ?

因为,RabbitMQ成熟的开源中间件,可靠性有保证,bug少,性能也非常好。

C#代码默认是使用托管内存的,所以,想写出媲美RabbitMQ性能的消息队列,就必须离开我们常用的托管内存,使用非托管内存,但这个代价就太大了;而且最终能否达到RabbitMQ的性能水平还是个未知数。

还有就是RabbitMQ除了基础的消息队列管理,还有很多很强大的额外功能,而自己开发消息队列,很难如此尽善尽美。

—————————————————————————————————-

我们还会发现,在消息队列里有很多概念,什么消息总线啊,什么工作队列啊等等。

要怎么理解这些概念呢?

很简单,不要去理解。这些概念其实是人家代码架构的模式,不要去理解他们,【记】就完了,人家的中间件就是按照这个模式工作的。

比如,我写了一个接收消息的总控制器,然后我为他命名为总线,那这个控制器就是总线,没有理由,这就是定义。

准备工作

首先,我们访问官网【https://www.rabbitmq.com/】,点击Get Started。

然后,网站会自动跳转到当前首页Get Started的锚点位置,如下图:

Get Started锚点:

然后我们点击DownLoad+Installation,进入到下载界面。

在下载页面中,我们找到安装指南,然后在点击官网推荐的Windows系统的安装包,如下图:

现在,我们进入了Windows安装指南界面了。

首先,我们看一下预览信息,如下图:

在预览里,我们得知,安装RabbitMQ有两种方法,一种是使用Chocolatey安装,一种是使用官方安装包安装。

Chocolatey是什么呢?随手百度一下,原来他是一个软件包管理工具,也就是说,Chocolatey是类似于Nuget的一种工具。

由于Chocolatey的使用,我不是很熟悉,所以,这里选择使用官方安装包安装。

点击【Using the official installer】,我们进入了【Using the official installer】对应的锚点,如下图。

在【Using the official installer】段落里找到有推荐标志的安装包,然后下载。

下载完成后,我们可以得到这样一个安装包,如下图:

除了下载安装包,我们还会发现,在【Using the official installer】段落里,有提醒我们,RabbitMQ是有依赖的,依赖一个Erlang语言的框架(类似于C#语言的NetFramework)。

我们可以发现,在依赖的段落里,官网非常坑的给出了三个链接网址,如下:

supported version of Erlang:https://www.rabbitmq.com/which-erlang.html

Windows installer:https://www.erlang.org/downloads

Erlang Solutions:https://www.erlang-solutions.com/resources/download.html

因为,我们是无法通过文字描述来判断,哪一个是真的依赖框架的下载地址,所以只好每个都点击进去看看。。。

打开网址后发现,在后两个网址中都可以找到框架下载地址,但第二个地址明显更友好一点,所以我们在第二个网址内下载Erlang的框架。

下载完成得到如下图文件:

PS:这里下载的是OTP的22.1的版本,我的理解是Erlang等于C#语言,而OTP等于NetFramework。

安装Erlang\OTP

首先,我们运行otp_win64_22.1.exe,安装依赖框架Erlang\OTP。

安装完成后,设置环境变量如下:

然后运行CMD,输入erl,测试安装是否成功,如下图:

安装成功。

安装rabbitmq-server

安装完依赖后,我们接着安装rabbitmq-server-3.8.0.exe。

【rabbitmq-server-3.8.0.exe】?从这个文件名上,我们发现了一个问题,那就是,我们即将安装的RabbitMQ,是一个服务端啊。

什么?服务端?难道还有客户端???

其实这也很好理解,想一下最开始我举的那个例子,消息队列是需要一个监听端口的服务端的,然后客户端向这个服务端发送请求。

这样是不是就很好的理解RabbitMQ了呢:)

—————————————————————————————————-

安装完RabbitMQ服务端后,我们还是启动CMD,用命令行来查看下安装状态。

首先输入下面的命令,将路径定位到RabbitMQ的路径下:

【CD /D C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.0\sbin】

然后输入rabbitmqctl status查看状态。

启动管理工具的命令行:rabbitmq-plugins enable rabbitmq_management。

启动成功后,在浏览器输入地址http://127.0.0.1:15672/,进入管理页面,账户密码都是guest。

RabbitMQ还有很多常用命令,大家可以自行百度。

—————————————————————————————————-

到此,RabbitMQ服务端的环境配置好了,正常情况,这些配置应该在服务器进行,但我为了测试方便,就把服务端也安装在本机了,因此我下面调用RabbitMQ时,连接的主机IP都是localhost。

RabbitMQ应用

首先创建两个控制台应用程序,KibaRabbitMQSend和KibaRabbitMQReceived。

然后引入RabbitMQ的开源类库。

在C#里使用RabbitMQ开源类库非常简单,可以去官网下载一个.NET版本的RabbitMQ客户端类库,也可以直接在Nuget上搜索RabbitMQ,然后安装,如下图:

KibaRabbitMQSend

安装完RabbitMQ开源类库后,我们编写代码,实现向RabbitMQ服务器发送消息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
    factory.UserName = "guest";//默认用户名,用户可以在服务端自定义创建,有相关命令行
    factory.Password = "guest";//默认密码
    using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。
    {
        //创建一个通道,这个就是Rabbit自己定义的规则了,如果自己写消息队列,这个就可以开脑洞设计了
        //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
        using (var channel = connection.CreateModel())
        {
             channel.QueueDeclare("kibaQueue"falsefalsefalsenull);//创建一个名称为kibaqueue的消息队列
             var properties = channel.CreateBasicProperties();
             properties.DeliveryMode = 1;
             string message = "I am Kiba518"//传递的消息内容
             channel.BasicPublish("""kibaQueue", properties, Encoding.UTF8.GetBytes(message)); //生产消息
             Console.WriteLine($"Send:{message}");
        }
    }
}

运行代码。

然后我们使用命令行rabbitmqctl list_queues,去RabbitMQ的服务器查看当前消息队列,如下图:

可以看到,我们的消息已经发送成功了。

KibaRabbitMQReceived

现在我们编写接收消息代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "guest";
    factory.Password = "guest";
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("kibaQueue"falsefalsefalsenull);
            /* 这里定义了一个消费者,用于消费服务器接受的消息
             * C#开发需要注意下这里,在一些非面向对象和面向对象比较差的语言中,是非常重视这种设计模式的。
             * 比如RabbitMQ使用了生产者与消费者模式,然后很多相关的使用文章都在拿这个生产者和消费者来表述。
             * 但是,在C#里,生产者与消费者对我们而言,根本算不上一种设计模式,他就是一种最基础的代码编写规则。
             * 所以,大家不要复杂的名词吓到,其实,并没那么复杂。
             * 这里,其实就是定义一个EventingBasicConsumer类型的对象,然后该对象有个Received事件,
             * 该事件会在服务接收到数据时触发。
             */
            var consumer = new EventingBasicConsumer(channel);//消费者
            channel.BasicConsume("kibaQueue"true, consumer);//消费消息
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
            };
        }
    }
}

运行代码。

然后我们使用命令行rabbitmqctl list_queues,去RabbitMQ的服务器查看当前消息队列,如下图:

可以看到,消息已经被使用了。

—————————————————————————————————-

现在我们在发送代码出做一个for循环,看看消息接收速度是什么样的,代码如下,for循环了100次,每次间隔3秒。

1
2
3
4
5
6
7
8
9
10
for (int i = 0; i < 100; i++)
{
    channel.QueueDeclare("kibaQueue"falsefalsefalsenull);//创建一个名称为kibaQueue的消息队列
    var properties = channel.CreateBasicProperties();
    properties.DeliveryMode = 1;
    string message = "I am Kiba518"//传递的消息内容
    channel.BasicPublish("""kibaQueue", properties, Encoding.UTF8.GetBytes(message)); //生产消息
    Console.WriteLine($"Send:{message}");
    Thread.Sleep(3000);
}

效果图如下:

可以看到,发送消息和接收消息,几乎是同步的,效果非常理想。

服务器端应用

在上文,我们的RabbitMQ服务是安装在我的本机上的;现在我们把服务移植到服务器上,然后再来测试一下。

在服务器端安装RabbitMQ和在本机安装的步骤是一样的,但是安装完成后,我们需要设置下防火墙的入站规则和出站规则,将5672的UDP端口开放一下。

为什么要开放端口是5672?因为RabbitMQ的默认的消息接收和发送端口就是5672,我们可以使用断点查看一下。

如上图,可以看到,在我们没有设置端口的时候,Endpoint的端口的默认值是5672。

配置完端口后,我们修改代码中的HostName为我们的服务器地址,如下。

1
factory.HostName = "1.1.1.1";

重新运行代码,会发现在运行到factory.CreateConnection()的时候,系统提示一个异常【RabbitMQ.Client.Exceptions.BrokerUnreachableException:“None of the specified endpoints were reachable”】,如下图:

这是因为我们使用的账号是guest,guest账号默认是不支持远程连接的。

解决办法很简单,新建一个账户即可。

创建用户

在服务器端打开浏览器,输入http://127.0.0.1:15672/,进入管理页面。

点击菜单栏的Admin选项,进入用户管理界面创建用户kiba,密码123456,如下图:

创建完用户后,得到如下界面。

如上图所示,刚刚创建的用户还没有任何访问权限。

现在我们点击用户名,进入权限管理页面设置权限。

如上图所示,页面默认为我们设置了一个可读,可写,可管理配置的权限;所以,我们只要点击Set premission就可以了。

设置完权限,我们回到用户管理页面。

如上图所示,权限设置成功。

现在我们回到代码,修改用户名密码如下。

1
2
3
factory.HostName = "1.1.1.1";
factory.UserName = "kiba";
factory.Password = "123456";

运行代码,不再抛异常,接受发送消息正常。

—————————————————————————————————-

设置用户权限也可以通过命令的方式设置,如下:

rabbitmqctl set_permissions -p “/” kiba “.” “.” “.*”

—————————————————————————————————-

到此C#调用RabbitMQ实现消息队列就讲完了。

代码已经传到Github上了,欢迎大家下载。

Github地址:https://github.com/kiba518/KibaRabbitMQ

 

 

 

出处:https://www.cnblogs.com/kiba/p/11703073.html

一个C#操作RabbitMQ的完整例子 - yswenli - 博客园

mikel阅读(910)

一个C#操作RabbitMQ的完整例子

来源: 一个C#操作RabbitMQ的完整例子 – yswenli – 博客园

 

一、下载RabbitMQ

http://www.rabbitmq.com/install-windows.html

 

二、下载OTP

http://www.erlang.org/downloads

 

三、安装OTP、RabbitMQ

四、配置RabbitMQ

找到bat的目录

执行相关命令

 

1.添加用户密码 rabbitmqctl add_user wenli wenli

 2.设置wenli为管理员 rabbitmqctl set_user_tags wenli administrator

3.启动RabbitMQ的web管理 rabbitmq-plugins enable rabbitmq_management

 4.创建virtual host 

5.设置用户权限

点击用户名进行设置

将virtual hosts 权限赋给用户wenli

 

6.创建Exchanges

 

五.创建C# console

1.下载RabbitMQ驱动 https://github.com/yswenli/Wenli.Data.RabbitMQ/releases/tag/Release1.0.0

2.添加引用     

3.添加配置

4.测试代码:

 

复制代码
 1 using System;
 2 using System.Text;
 3 using System.Threading;
 4 using System.Threading.Tasks;
 5 
 6 namespace Wenli.Data.RabbitMQ.Console
 7 {
 8     using Console = System.Console;
 9 
10     class Program
11     {
12         static void Main(string[] args)
13         {
14             Console.Title = "Wenli.Data.RabbitMQ.Console";
15             Console.WriteLine("正连接到mq");
16 
17             try
18             {
19                 Test();
20             }
21             catch (Exception ex)
22             {
23                 Console.WriteLine("err:" + ex.Message + ex.Source + ex.StackTrace);
24             }
25 
26             Console.Read();
27         }
28 
29 
30         static void Test()
31         {
32 
33             var topic = "testtopic";
34 
35             var cnn = RabbitMQBuilder.Get(MQConfig.Default).GetConnection();
36 
37             var operation = cnn.GetOperation(topic);
38 
39             Console.WriteLine("正连接到订阅【" + topic + "】");
40 
41             operation.Subscribe();
42 
43             Console.WriteLine("正在入队");
44 
45             Task.Factory.StartNew(() =>
46             {
47                 while (true)
48                 {
49                     operation.Enqueue(Encoding.UTF8.GetBytes(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + "     hello!"));
50                     Thread.Sleep(1);
51                 }
52             });
53 
54 
55 
56 
57             Console.WriteLine("正在出队");
58 
59 
60 
61             Task.Factory.StartNew(() =>
62             {
63                 while (true)
64                 {
65                     var result = operation.Dnqueue();
66 
67                     if (result == null)
68                     {
69                         Thread.Sleep(1);
70                     }
71                     else
72                     {
73                         Console.WriteLine(Encoding.UTF8.GetString(result));
74                     }
75                 }
76             });
77 
78             Console.ReadLine();
79 
80             Console.WriteLine("正在取消订阅");
81 
82             operation.UnSubscribe();
83 
84             Console.WriteLine("测试完成");
85         }
86     }
87 }
复制代码

5.运行结果:

至此C# 成功操作Rabbitmq完成

 

 

转载请标明本文来源:http://www.cnblogs.com/yswenli/p/7446919.html
更多内容欢迎star作者的github:https://github.com/yswenli/Wenli.Data.RabbitMQ
如果发现本文有什么问题和任何建议,也随时欢迎交流~

 

nginx 和 IIS 实现负载均衡 - 章为忠 - 博客园

mikel阅读(714)

来源: nginx 和 IIS 实现负载均衡 – 章为忠 – 博客园

Nginx的作用和优点,这里不必多说,今天主要是nginx负载均衡实验,把做的步骤记录下来,作为一个学习笔记吧,也可以给大家做下参考。

  1.Nginx安装
1.下载地址:http://nginx.org/en/download.html

2.解压到后在window的cmd窗口,输入如下图所示的命令,进入到nginx目录,使用“start nginx.exe ”进行nginx的安装,如下图所示:


安装成功后,在“任务管理器”中会看到“nginx.exe”进程。

3.在浏览器地址栏输入:127.0.0.1,会看到nginx欢迎界面。说明Nginx已经安装成功。

  2.站点搭建及配置

1.搭建两个iis站点
新建一个站点下只有一个简单的index页面,将两个站点都部署到本机了,分别绑定了8097和8098两个端口。

2.修改nginx配置信息,nginx的配置信息,都在nginx.conf ,这个文件中配置。

a.修改nginx监听端口,修改http server下的listen节点值
listen 8096;

b.在http节点下添加upstream(服务器集群),server设置的是集群服务器的信息,我这里搭建了两个站点,配置了两条信息。

#服务器集群名称为test.com
upstream test.com {
server 127.0.0.1: 8097;
server 127.0.0.1: 8098;
}

c.在http节点下找到location节点修改

location / {
root html;
index index.aspx index.html index.htm; #修改主页为index.aspx
#其中test.com 对应着upstream设置的集群名称
proxy_pass http:// test.com;
#设置主机头和客户端真实地址,以便服务器获取客户端真实IP
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

修改完成配置文件之后,重启nginx服务,

  3.运行结果
访问http://localhost:8096/index.aspx

可以看到,我们的请求被分发到了8097站点和8098站点,说明负载均衡搭建成功了。

停掉8098站点,刷新页面,则请求会分发给8097 站点, 说明其中一个站点挂了,只要还有一个站点是好的,系统仍然能够继续提供服务。

  4.session共享

a.使用数据库保存session信息,可以查看本人前一篇文章:数据库实现多站点共享Session

b.使用nginx将同一ip的请求分配到固定服务器,修改如下。ip_hash会计算ip对应hash值,然后分配到固定服务器,(这个还没试验过)

upstream test.com {
server 127.0.0.1: 8097;
server 127.0.0.1: 8098;
ip_hash;
}

c.搭建一台Redis服务器,对session的读取都从该Redis服务器上读取。

 

注意:nginx作为负载均衡服务器时候,无法正常加载css和js这些文件而出现这样的问题,通过一番搜索和查找,修改nginx下的nginx.conf配置文件才得以正常显示,修改的配置如下:

 

RabbitMQ学习系列(六): RabbitMQ 高可用集群 - 章为忠 - 博客园

mikel阅读(670)

来源: RabbitMQ学习系列(六): RabbitMQ 高可用集群 – 章为忠 – 博客园

前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html

本来一直想写一个介绍RabbitMQ高可用的集群的文章。不过,后来发现园子里,有个已经RabbitMQ大牛写了,关于高可用集群的文章了。特别巧合的是,还是以前公司的同事。所以,这里就不啰嗦。直接引用过来吧。原文地址:http://www.cnblogs.com/flat_peach/archive/2013/04/07/3004008.html

RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡。

Rabbit模式大概分为以下三种:单一模式、普通模式、镜像模式

单一模式:最简单的情况,非集群模式。

没什么好说的。

普通模式:默认的集群模式。

对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构。

当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。

所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。

该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。

如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,然后就没有然后了……

镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。

该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。

所以在对可靠性要求较高的场合中适用(后面会详细介绍这种模式,目前我们搭建的环境属于该模式)

 

了解集群中的基本概念:

RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。

一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制的,一个例外是,那些当前只属于创建它的节点的消息队列,尽管它们可见且可被所有节点读取。rabbitmq节点可以动态的加入到集群中,一个节点它可以加入到集群中,也可以从集群环集群会进行一个基本的负载均衡。
集群中有两种节点:
1 内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
2 磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。

思路:

那么具体如何实现RabbitMQ高可用,我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。

架构图如下:图片来自http://www.nsbeta.info

 
          上述图里是3个RabbitMQ运行在同一主机上,分别用不同的服务端口。当然我们的生产实际里,多个RabbitMQ肯定是运行在不同的物理服务器上,否则就失去了高可用的意义。

 

 

集群模式配置

设计架构可以如下:在一个集群里,有4台机器,其中1台使用磁盘模式,另2台使用内存模式。2台内存模式的节点,无疑速度更快,因此客户端(consumer、producer)连接访问它们。而磁盘模式的节点,由于磁盘IO相对较慢,因此仅作数据备份使用,另外一台作为反向代理。

四台服务器hostname分别为:queue 、panyuntao1、panyuntao2、panyuntao3(ip:172.16.3.110)

配置RabbitMQ集群非常简单,只需要几个命令,配置步骤如下:

step1:queue、panyuntao1、panyuntao2做为RabbitMQ集群节点,分别安装RabbitMq-Server ,安装后分别启动RabbitMq-server

启动命令  # Rabbit-Server start ,安装过程及启动命令参见:http://www.cnblogs.com/flat_peach/archive/2013/03/04/2943574.html

step2:在安装好的三台节点服务器中,分别修改/etc/hosts文件,指定queue、panyuntao1、panyuntao2的hosts,如:

172.16.3.32 queue

172.16.3.107 panyuntao1

172.16.3.108 panyuntao2

还有hostname文件也要正确,分别是queue、panyuntao1、panyuntao2,如果修改hostname建议安装rabbitmq前修改。

请注意RabbitMQ集群节点必须在同一个网段里,如果是跨广域网效果就差。

step3:设置每个节点Cookie

Rabbitmq的集群是依赖于erlang的集群来工作的,所以必须先构建起erlang的集群环境。Erlang的集群中各节点是通过一个magic cookie来实现的,这个cookie存放在 /var/lib/rabbitmq/.erlang.cookie 中,文件是400的权限。所以必须保证各节点cookie保持一致,否则节点之间就无法通信。
-r——–. 1 rabbitmq rabbitmq 20 3月 5 00:00 /var/lib/rabbitmq/.erlang.cookie
将其中一台节点上的.erlang.cookie值复制下来保存到其他节点上。或者使用scp的方法也可,但是要注意文件的权限和属主属组。
我们这里将queue中的cookie 复制到 panyuntao1、panyuntao2中,先修改下panyuntao1、panyuntao2中的.erlang.cookie权限
#chmod 777  /var/lib/rabbitmq/.erlang.cookie 
将queue的/var/lib/rabbitmq/.erlang.cookie这个文件,拷贝到panyuntao1、panyuntao2的同一位置(反过来亦可),该文件是集群节点进行通信的验证密钥,所有节点必须一致。拷完后重启下RabbitMQ。
复制好后别忘记还原.erlang.cookie的权限,否则可能会遇到错误
#chmod 400 /var/lib/rabbitmq/.erlang.cookie 
设置好cookie后先将三个节点的rabbitmq重启
# rabbitmqctl stop
# rabbitmq-server start
 
 step4:停止所有节点RabbitMq服务,然后使用detached参数独立运行,这步很关键,尤其增加节点停止节点后再次启动遇到无法启动都可以参照这个顺序
     queue# rabbitmqctl stop
     panyuntao1# rabbitmqctl stop
     panyuntao2# rabbitmqctl stop
 
     queue# rabbitmq-server -detached
     panyuntao1# rabbitmq-server -detached
     panyuntao2# rabbitmq-server -detached
 
     分别查看下每个节点
     queue# rabbitmqctl cluster_status
          Cluster status of node rabbit@queue …

[{nodes,[{disc,[rabbit@queue]}]},
{running_nodes,[rabbit@queue]},
{partitions,[]}]
…done.

 
     panyuntao1# rabbitmqctl cluster_status
          Cluster status of node rabbit@panyuntao1…

[{nodes,[{disc,[rabbit@panyuntao1]}]},

{running_nodes,[rabbit@panyuntao1]},

{partitions,[]}]
…done.

panyuntao2# rabbitmqctl cluster_status

          Cluster status of node rabbit@panyuntao2

[{nodes,[{disc,[rabbit@panyuntao2]}]},

{running_nodes,[rabbit@panyuntao2]},

{partitions,[]}]
…done.

 
step4:将panyuntao1、panyuntao2作为内存节点与queue连接起来,在panyuntao1上,执行如下命令:
          panyuntao1# rabbitmqctl stop_app

panyuntao1# rabbitmqctl join_cluster –ram rabbit@queue   

panyuntao1# rabbitmqctl start_app

panyuntao2# rabbitmqctl stop_app
panyuntao2# rabbitmqctl join_cluster –ram rabbit@queue   (上方已经将panyuntao1与queue连接,也可以直接将panyuntao2与panyuntao1连接,同样而已加入集群中)
panyuntao2# rabbitmqctl start_app
上述命令先停掉rabbitmq应用,然后调用cluster命令,将panyuntao1连接到,使两者成为一个集群,最后重启rabbitmq应用。在这个cluster命令下,panyuntao1、panyuntao2是内存节点,queue是磁盘节点(RabbitMQ启动后,默认是磁盘节点)。
queue 如果要使panyuntao1或panyuntao2在集群里也是磁盘节点,join_cluster 命令去掉–ram参数即可
      #rabbitmqctl join_cluster rabbit@queue   

只要在节点列表里包含了自己,它就成为一个磁盘节点。在RabbitMQ集群里,必须至少有一个磁盘节点存在。

step5:在queue、panyuntao1、panyuntao2上,运行cluster_status命令查看集群状态:

[root@queue ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@queue …
[{nodes,[{disc,[rabbit@queue]},{ram,[rabbit@panyuntao2,rabbit@panyuntao1]}]},
{running_nodes,[rabbit@panyuntao2,rabbit@panyuntao1,rabbit@queue]},
{partitions,[]}]
…done.

[root@panyuntao1 rabbitmq]# rabbitmqctl cluster_status
Cluster status of node rabbit@panyuntao1 …
[{nodes,[{disc,[rabbit@queue]},{ram,[rabbit@panyuntao2,rabbit@panyuntao1]}]},
{running_nodes,[rabbit@panyuntao2,rabbit@queue,rabbit@panyuntao1]},
{partitions,[]}]
…done.
[root@panyuntao2 rabbitmq]# rabbitmqctl cluster_status
Cluster status of node rabbit@panyuntao2 …
[{nodes,[{disc,[rabbit@queue]},{ram,[rabbit@panyuntao2,rabbit@panyuntao1]}]},
{running_nodes,[rabbit@panyuntao1,rabbit@queue,rabbit@panyuntao2]},
{partitions,[]}]
…done.
这时我们可以看到每个节点的集群信息,分别有两个内存节点一个磁盘节点
step6:往任意一台集群节点里写入消息队列,会复制到另一个节点上,我们看到两个节点的消息队列数一致:(如何发送消息参见:http://www.cnblogs.com/flat_peach/archive/2013/03/04/2943574.html

root@panyuntao2 :~# rabbitmqctl list_queues -p hrsystem

Listing queues …
test_queue 10000
…done.

root@panyuntao1 :~# rabbitmqctl list_queues -p hrsystem
Listing queues …
test_queue 10000
…done.
root@queue:~# rabbitmqctl list_queues -p hrsystem
Listing queues …
test_queue 10000
…done.
-p参数为vhost名称
          这样RabbitMQ集群就正常工作了,
 
       这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
      为什么RabbitMQ不将队列复制到集群里每个节点呢?这与它的集群的设计本意相冲突,集群的设计目的就是增加更多节点时,能线性的增加性能(CPU、内存)和容量(内存、磁盘)。理由如下:

1. storage space: If every cluster node had a full copy of every queue, adding nodes wouldn’t give you more storage capacity. For example, if one node could store 1GB of messages, adding two more nodes would simply give you two more copies of the same 1GB of messages.

2. performance: Publishing messages would require replicating those messages to every cluster node. For durable messages that would require triggering disk activity on all nodes for every message. Your network and disk load would increase every time you added a node, keeping the performance of the cluster the same (or possibly worse).

当然RabbitMQ新版本集群也支持队列复制(有个选项可以配置)。比如在有五个节点的集群里,可以指定某个队列的内容在2个节点上进行存储,从而在性能与高可用性之间取得一个平衡。

  镜像模式配置
上面配置RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,虽然该模式解决一部分节点压力,但队列节点宕机直接导致该队列无法使用,只能等待重启,所以要想在队列节点宕机或故障也能正常使用,就要复制队列内容到集群里的每个节点,需要创建镜像队列。
我们看看如何镜像模式来解决复制的问题,从而提高可用性
     step1:增加负载均衡器
 

关于负载均衡器,商业的比如F5的BIG-IP,Radware的AppDirector,是硬件架构的产品,可以实现很高的处理能力。但这些产品昂贵的价格会让人止步,所以我们还有软件负载均衡方案。互联网公司常用的软件LB一般有LVS、HAProxy、Nginx等。LVS是一个内核层的产品,主要在第四层负责数据包转发,使用较复杂。HAProxy和Nginx是应用层的产品,但Nginx主要用于处理HTTP,所以这里选择HAProxy作为RabbitMQ前端的LB。

HAProxy的安装使用非常简单,在Centos下直接yum install haproxy,然后更改/etc/haproxy/haproxy.cfg 文件即可,文件内容大概如下:

   #———————————————————————

defaults
mode                    http
log                     global
option                  httplog
option                  dontlognull
option http-server-close
option forwardfor       except 127.0.0.0/8
option                  redispatch
retries                 3
timeout http-request    10s
timeout queue           1m
timeout connect         10s
timeout client          1m
timeout server          1m
timeout http-keep-alive 10s
timeout check           10s
maxconn                 3000

 
    listen rabbitmq_cluster 0.0.0.0:5672
    mode tcp
    balance roundrobin
    server   rqslave1 172.16.3.107:5672 check inter 2000 rise 2 fall 3   
    server   rqslave2 172.16.3.108:5672 check inter 2000 rise 2 fall 3 
#  server   rqmaster 172.16.3.32:5672 check inter 2000 rise 2 fall 3  
#———————————————————————
负载均衡器会监听5672端口,轮询我们的两个内存节点172.16.3.107、172.16.3.108的5672端口,172.16.3.32为磁盘节点,只做备份不提供给生产者、消费者使用,当然如果我们服务器资源充足情况也可以配置多个磁盘节点
,这样磁盘节点除了故障也不会影响,除非同时出故障。
step2:配置策略
 
使用Rabbit镜像功能,需要基于rabbitmq策略来实现,政策是用来控制和修改群集范围的某个vhost队列行为和Exchange行为
在cluster中任意节点启用策略,策略会自动同步到集群节点

# rabbitmqctl set_policy -p hrsystem ha-allqueue"^" '{"ha-mode":"all"}'

这行命令在vhost名称为hrsystem创建了一个策略,策略名称为ha-allqueue,策略模式为 all 即复制到所有节点,包含新增节点,
策略正则表达式为 “^” 表示所有匹配所有队列名称。
例如rabbitmqctl set_policy -p hrsystem ha-allqueue "^message" '{"ha-mode":"all"}'
注意:"^message” 这个规则要根据自己修改,这个是指同步”message”开头的队列名称,我们配置时使用的应用于所有队列,所以表达式为”^”
官方set_policy说明参见
set_policy [-p vhostpath] {name} {pattern} {definition} [priority]
ha-mode:
ha-mode ha-params Result
all (absent) Queue is mirrored across all nodes in the cluster. When a new node is added to the cluster, the queue will be mirrored to that node.
exactly count Queue is mirrored to count nodes in the cluster. If there are less than count nodes in the cluster, the queue is mirrored to all nodes. If there are more than countnodes in the cluster, and a node containing a mirror goes down, then a new mirror will not be created on another node. (This is to prevent queues migrating across a cluster as it is brought down.)
nodes node names Queue is mirrored to the nodes listed in node names. If any of those node names are not a part of the cluster, this does not constitute an error. If none of the nodes in the list are online at the time when the queue is declared then the queue will be created on the node that the declaring client is connected to.
step3:
创建队列时需要指定ha 参数,如果不指定x-ha-prolicy 的话将无法复制
 
下面C#代码片段
 using ( var bus = RabbitHutch.CreateBus(ConfigurationManager .ConnectionStrings[“RabbitMQ”].ToString()))
            {
                bus.Subscribe< TestMessage>(“word_subscriber” , message => RunTable(message),x=>x.WithArgument(“x-ha-policy” , “all”));
                Console.WriteLine(“Subscription Started. Hit any key quit” );
                Console.ReadKey();
            }
 
step4:
客户端使用负载服务器172.16.3.110 (panyuntao3)发送消息,队列会被复制到所有节点,当然策略也可以配置制定某几个节点,这时任何节点故障 、或者重启将不会影响我们正常使用某个队列
 到这里我们完成了高可用配置(所有节点都宕机那没有办法了)。
使用rabbitmq管理端可以看到集群镜像模式中对列状态
  

参考:

http://www.rabbitmq.com/clustering.html

http://www.rabbitmq.com/ha.html

http://www.rabbitmq.com/parameters.html#policies

http://www.nsbeta.info/archives/555

http://blog.csdn.net/linvo/article/details/7793706

RabbitMQ学习系列(五): RPC 远程过程调用 - 章为忠 - 博客园

mikel阅读(626)

来源: RabbitMQ学习系列(五): RPC 远程过程调用 – 章为忠 – 博客园

前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html

不过,最近有朋友问我,RabbitMQ RPC 是干嘛的,有什么用。

其实,RabbitMQ RPC 就是通过消息队列(Message Queue)来实现rpc的功能,就是,客户端向服务端发送定义好的Queue消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

1.RabbitMQ RPC的特点

  • Message Queue把所有的请求消息存储起来,然后处理,和客户端解耦。
  • Message Queue引入新的结点,系统的可靠性会受Message Queue结点的影响。
  • Message Queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。

所以对于有同步返回需求,Message Queue是个不错的方向。

2.普通PRC的特点

  • 同步调用,对于要等待返回结果/处理结果的场景,RPC是可以非常自然直觉的使用方式。当然RPC也可以是异步调用。
  • 由于等待结果,客户端会有线程消耗。

如果以异步RPC的方式使用,客户端线程消耗可以去掉。但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

3.适用场合说明

  • 希望同步得到结果的场合,RPC合适。
  • 希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模拟本地调用。异步的方式编程比较复杂。
  • 不希望客户端受限于服务端的速度等,可以使用Message Queue。

4.RabbitMQ RPC工作流程:

 

基本概念:

Callback queue 回调队列客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

Correlation id 关联标识客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

流程说明

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

 5.完整代码:

  1. 创建两个控制台程序,作为RPC Server和RPC Client, 引用 RabbitMQ.Client,

  2. RPC Server

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",
                                     noAck: false,
                                     consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");

                while (true)
                {
                    string response = null;
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "",
                                             routingKey: props.ReplyTo,
                                             basicProperties: replyProps,
                                             body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                         multiple: false);
                    }
                }
            }
        }

        /// <summary>
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers,
        /// and it's probably the slowest recursive implementation possible.
        /// </summary>
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            Thread.Sleep(1000 * 10);

            return n;
        }
    }
复制代码

 

3. RPC Client

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            for (int i = 0; i < 10; i++)
            {
                Stopwatch watch = new Stopwatch();

                watch.Start();

                var rpcClient = new RPCClient();

                Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));

                var response = rpcClient.Call(i.ToString());

                Console.WriteLine(" [.] Got '{0}'", response);

                rpcClient.Close();

                watch.Stop();

                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }

            Console.WriteLine(" complete!!!! ");


            Console.ReadLine();
        }
    }

    class RPCClient
    {
        private IConnection connection;
        private IModel channel;
        private string replyQueueName;
        private QueueingBasicConsumer consumer;

        public RPCClient()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
        }

        public string Call(string message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;

            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "",
                                 routingKey: "rpc_queue",
                                 basicProperties: props,
                                 body: messageBytes);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    return Encoding.UTF8.GetString(ea.Body);
                }
            }
        }

        public void Close()
        {
            connection.Close();
        }
    }
复制代码

4.分别运行Server和Client

 

6.最后

1.参照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

2.本文源代码下载,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

3.博客原地址:http://fpeach.com/post/2016/12/01/RabbitMQ%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-RPC-%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8.aspx

RabbitMQ学习系列(四): 几种Exchange 模式 - 章为忠 - 博客园

mikel阅读(798)

来源: RabbitMQ学习系列(四): 几种Exchange 模式 – 章为忠 – 博客园

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ。今天说些理论的东西,Exchange 的几种模式。

 

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

 

RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。 header模式在实际使用中较少,本文只对前三种模式进行比较。

 

一. Fanout Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

复制代码
     /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="change"></param>
        private static void ProducerMessage(MyMessage msg)
        {
            var advancedBus = CreateAdvancedBus();

            if (advancedBus.IsConnected)
            {
                var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);

                advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
            }
            else
            {
                Console.WriteLine("Can't connect");
            }

        }

        /// <summary>
        /// 消费者
        /// </summary>
        private static void ConsumeMessage()
        {
            var advancedBus = CreateAdvancedBus();
            var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);

            var queue = advancedBus.QueueDeclare("user.notice.wangwu");
            advancedBus.Bind(exchange, queue, "user.notice.wangwu");
            advancedBus.Consume(queue, registration =>
            {
                registration.Add<MyMessage>((message, info) => { Console.WriteLine("Body: {0}", message.Body); });
            });
        }
复制代码

 

复制代码
public static IAdvancedBus CreateAdvancedBus()
{
    // 消息服务器连接字符串
    string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
    if (connString == null || connString == string.Empty)
    {
        throw new Exception("messageserver connection string is missing or empty");
    }

    return RabbitHutch.CreateBus(connString).Advanced;
}
复制代码

 

 

  二. Direct Exchange

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

复制代码
     /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="change"></param>
        private static void ProducerMessage(MyMessage msg)
        {
            var advancedBus = CreateAdvancedBus();

            if (advancedBus.IsConnected)
            {
                var queue = advancedBus.QueueDeclare("user.notice.zhangsan");

                advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
            }
            else
            {
                Console.WriteLine("Can't connect");
            }

        }

        /// <summary>
        /// 消费者
        /// </summary>
        private static void ConsumeMessage()
        {
            var advancedBus = CreateAdvancedBus();

            var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Direct);

            var queue = advancedBus.QueueDeclare("user.notice.lisi");

            advancedBus.Bind(exchange, queue, "user.notice.lisi");

            advancedBus.Consume(queue, registration =>
            {
                registration.Add<MyMessage>((message, info) =>
                {
                    Console.WriteLine("Body: {0}", message.Body);
                });
            });
        }
复制代码

 

三. Topic Exchange

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,

Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。

所以,Topic Exchange 使用非常灵活。

复制代码
     /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="change"></param>
        private static void ProducerMessage(MyMessage msg)
        {
            //// 创建消息bus
            IBus bus = CreateBus();

            try
            {
                bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常 
            }

            bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
        }

        /// <summary>
        /// 消费者
        /// </summary>
        private static void ConsumeMessage(MyMessage msg)
        {
            //// 创建消息bus
            IBus bus = CreateBus();

            try
            {
                bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine(msg.MessageBody), x => x.WithTopic("user.notice.#"));
            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常 
            }
        }
复制代码

 

这个是RabbitMQ 的实际使用的几个场景,熟悉了这个,基本上rabbitmq 也就了解了。http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

 

至此,Rabbitmq 几种Exchange 模式已经介绍完了,实际使用过程中,我们会更具不同的场景,来使用不同的exchange 模式。

查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html