来源: 22、The Advanced API 高级API – 困兽斗 – 博客园
EasyNetQ的使命是为RabbitMQ消息传递提供最简单的API。核心IBus接口有意避免暴露AMQP概念:如交换器、绑定、队列。相反,EasyNetQ实现一个默认基于消息的class type的“交换器+绑定+队列”拓扑结构。
有些场景下,需要能配置自定义的“交换器+绑定+队列”拓扑。EasyNetQ的The Advanced API 就可以提供这些功能。这个高级API对AMQP标准有很好的理解。
高级API是通过IAdvancedBus接口提供的,你可以通过IBus的Advanced属性获得一个IAdvancedBus接口的实例。
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;
一、如何声明交换器
要声明一个交换器(在RabbitMQ上),你可以使用EasyNetQ.IAdvancedBus接口的ExchangeDeclare方法,方法原型:
IExchange ExchangeDeclare(
string name,
string type,
bool passive = false,
bool durable = true,
bool autoDelete = false,
bool @internal = false,
string alternateExchange = null,
bool delayed = false);
形参的含义如下:
name: 欲创建的交换器名The name of the exchange you want to create
type: 欲创建交换器类型,必须是AMQP标准里定义的类型,你可以通过ExchangeType类的静态属性安全地指定它。
passive: 指定为true时,如果该名字的交换器之前不存在,不会创建它,而是抛出异常。(默认 false)
durable: 交换器是否可持久。(默认 true)
autoDelete: 当最后一个队列解绑定后,该交换器是否自动删除。(默认 false)
internal: 指定为true时,该交换器不能直接被发布者使用,而只能被其他普通交换器绑定使用。(默认 false)
alternateExchange: 替代交换器名。如果无法路由消息,就将消息路由到该交换器。
delayed: 指定为true时,声明一个x-delayed-type交换器,用于路由延迟消息。
小例子:
//创建一个直接交换器
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);
//创建一个主题交换器
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Topic);
//创建一个扇出交换器
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Fanout);
获取RabbitMQ默认的交换器:
var exchange = Exchange.GetDefault();
二、如何声明队列
要声明一个消息队列(在RabbitMQ上),你可以使用EasyNetQ.IAdvancedBus接口的QueueDeclare方法,方法原型:
IQueue QueueDeclare(
string name,
bool passive = false,
bool durable = true,
bool exclusive = false,
bool autoDelete = false,
int? perQueueMessageTtl = null,
int? expires = null,
byte? maxPriority = null,
string deadLetterExchange = null,
string deadLetterRoutingKey = null,
int? maxLength = null,
int? maxLengthBytes = null);
形参含义如下:
name: 队列名
passive: 如果该队列之前不存在,不创建它,而是抛出异常。(默认 false)
durable: 队列是否可持久。(默认 true)
exclusive: 是否当前连接专用。(默认 false)
autoDelete: 是否自动删除队列,一旦所有消费者断开连接。(默认 false)
perQueueMessageTtl: 在被丢弃之前,消息应该在队列中保留多长时间(毫秒)。(默认 null,即不设置)
expires: 在自动删除队列之前,该队列应该保持未使用状态多长时间(毫秒)。(默认 null,即不设置)
maxPriority: 指定队列应该支持的最大消息优先级。
deadLetterExchange: 指定在被RabbitMQ服务器自动删除之前,交换的名称是否保持未占用状态。
deadLetterRoutingKey: 如果设置了,将使用指定的路由键路由消息,如果没有设置,消息将使用它们最初发布的同一路由键进行路由。
maxLength: 队列中能够存放的ready消息的最大数量。 一旦超限,为了给新来消息腾位置,队首消息将被丢弃或者成为死信。
maxLengthBytes: 队列最大字节数。 一旦超限,为了给新来消息腾位置,队首消息将被丢弃或者成为死信。
请注意RabbitMQ对待上面两个maxLength的行为,它们并不像人们想象那样。有人可能以为超限后RabbitMQ会拒绝接收(生产者)更多的消息,然而RabbitMQ文档指出一旦超限,队首消息将被丢弃或者成为死信,要为新来的消息腾地方。
小例子:
// 声明一个持久化队列
var queue = advancedBus.QueueDeclare("my_queue");
// declare a queue with message TTL of 10 seconds:
var queue = advancedBus.QueueDeclare("my_queue", perQueueMessageTtl:10000);
要声明一个“未命名”的独占队列,(实际上由RabbitMQ为之产生一个队列名),请调用QueueDeclare() 无参重载方法:
var queue = advancedBus.QueueDeclare();
请注意,EasyNetQ的自动消费者重连接逻辑不能用于“独占队列”。
三、绑定
你可以像这样把一个队列绑定到一个交换器:
var queue = advancedBus.QueueDeclare("my.queue"); //队列
var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic); //主题交换器
var binding = advancedBus.Bind(exchange, queue, "A.*");//绑定,指定路由键
要指定一个队列和一个交换之间的多个绑定,只需多次调用Bind方法:
var queue = advancedBus.QueueDeclare("my.queue"); //声明队列
var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic); //声明交换器
advancedBus.Bind(exchange, queue, "A.B"); //绑定, 主题设置为 A.B
advancedBus.Bind(exchange, queue, "A.C"); //绑定,主题设置为 A.C
你也可以把一个交换器绑定到另一个交换器上,穿成串
var sourceExchange = advancedBus.ExchangeDeclare("my.exchange.1", ExchangeType.Topic); //源交换器
var destinationExchange = advancedBus.ExchangeDeclare("my.exchange.2", ExchangeType.Topic); //目标交换器
var queue = advancedBus.QueueDeclare("my.queue"); //声明队列
advancedBus.Bind(sourceExchange, destinationExchange, "A.*"); //把源交换器绑定到目标交换器
advancedBus.Bind(destinationExchange, queue, "A.C"); //把目标交换器绑定到队列
注意上面穿成串后,目标交换器收到A主题和 *(任一个字母)的主题消息;而队列只能收到A和C的主题消息。
四、发布
高级发布方法允许指定你要把消息发布到哪个交换器上,它还允许访问消息的AMQP标准的basic属性。
高级API要求将你的消息封装到Message类对象中
var myMessage = new MyMessage {Text = "Hello from the publisher"};
var message = new Message<MyMessage>(myMessage);
Message类使你可以访问AMQP的basic属性,例如:
message.Properties.AppId = "my_app_id";
message.Properties.ReplyTo = "my_reply_queue";
最后你只要调用Publish方法发布你的消息,在下例我们发布到默认交换器
bus.Publish(Exchange.GetDefault(), queueName, false, false, message);
一个重载Publish方法允许你绕过EasyNetQ的消息序列化,直接创建你自己的字节数组作为消息。
var properties = new MessageProperties();
var body = Encoding.UTF8.GetBytes("Hello World!");
bus.Publish(Exchange.GetDefault(), queueName, false, false, properties, body);
五、消费
使用IAdvancedBus接口的Consume方法,就可以消费队列中的消息。
IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage) where T : class;
onMessage 委托是你要提供的消息处理方法。
正如上面的“发布”那一节所描述的,IMessage可以让你访问消息和它的MessageProperties。而这里MessageRecivedInfo提供了关于消息被消费的上下文的额外信息:
public class MessageReceivedInfo
{
public string ConsumerTag { get; set; }
public ulong DeliverTag { get; set; }
public bool Redelivered { get; set; }
public string Exchange { get; set; }
public string RoutingKey { get; set; }
}
onMessage委托返回一个Task,该任务允许你编写非阻塞的异步处理程序。
该消费方法返回一个IDisposable接口实例,调用该实例的Dispose方法,可以撤销该消费者。
如果你仅仅需要同步处理消息,你可以调用同步的Consume重载方法:
IDisposable Consume<T>(IQueue queue, Action<IMessage<T>, MessageReceivedInfo> onMessage) where T : class;
如果要绕过EasyNetQ的消息序列化,调用下面的Consume重载方法,提供一个字节数组(作为消息):
void Consume(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
在下面示例中,我们正在消费队列“myqueue”中的原始字节数组(即消息):
var queue = advancedBus.QueueDeclare("my_queue"); //声明队列
advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Got message: '{0}'", message);
}));
你可以调用另一个重载的Consume方法,让单个消费者可选地注册多个处理委托:
IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers);
IHandlerRegistration 接口如下所示:
public interface IHandlerRegistration
{
/// <summary>
/// 添加一个异步处理委托Add an asynchronous handler
/// </summary>
/// <typeparam name="T">消息类型The message type</typeparam>
/// <param name="handler">处理委托The handler</param>
/// <returns></returns>
IHandlerRegistration Add<T>(Func<IMessage<T>, MessageReceivedInfo, Task> handler)
where T : class;
/// <summary>
/// 添加一个同步处理委托Add a synchronous handler
/// </summary>
/// <typeparam name="T">消息类型The message type</typeparam>
/// <param name="handler">处理委托The handler</param>
/// <returns></returns>
IHandlerRegistration Add<T>(Action<IMessage<T>, MessageReceivedInfo> handler)
where T : class;
/// <summary>
/// 如果设置为true,如果没有适合的处理委托,将会抛出异常。
/// 设置为false,返回一个无操作(什么也不做)的委托。(默认 true)
/// Set to true if the handler collection should throw an EasyNetQException when no
/// matching handler is found, or false if it should return a noop handler.
/// Default is true.
/// </summary>
bool ThrowOnNoMatchingHandler { get; set; }
}
在下面例子中,我们注册了两个不同的处理委托:一个处理MyMessage类型消息,另一个处理MyOtherMessage类型消息:
bus.Advanced.Consume(queue, x => x
.Add<MyMessage>((message, info) =>
{
Console.WriteLine("Got MyMessage {0}", message.Body.Text);
countdownEvent.Signal();
})
.Add<MyOtherMessage>((message, info) =>
{
Console.WriteLine("Got MyOtherMessage {0}", message.Body.Text);
countdownEvent.Signal();
})
);
更多信息请参阅这篇博客文章:
http://mikehadlow.blogspot.co.uk/2013/11/easynetq-multiple-handlers-per-consumer.html
6、从队列获取单条消息
要从队列中获得单条消息,请使用IAdvancedBus.Get() 方法:
IBasicGetResult<T> Get<T>(IQueue queue) where T : class;
AMQP文档说:“该方法使用同步对话直接访问队列中的消息,该对话是为特定类型的应用程序设计的,而同步功能比性能更重要。”
不要在循环中调用Get方法访问消息队列。在一般场景中,我想你一定只喜欢使用Consume方法。
IBasicGetResult接口如下所示:
/// <summary>
/// AdvancedBus.Get 方法获取的结果
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IBasicGetResult<T> where T : class
{
/// <summary>
/// 消息是否可用。True if a message is availabe, false if not.
/// </summary>
bool MessageAvailable { get; }
/// <summary>
/// The message retreived from the queue.
/// This property will throw a MessageNotAvailableException if no message
/// was available. You should check the MessageAvailable property before
/// attempting to access it.你应该先检查MessageAvailable属性值,再读取该属性值。
/// </summary>
IMessage<T> Message { get; }
}
注意:在读取Message属性前,你应该总是先检查MessageAvailable属性值是否为true才行(避免抛出异常),如下例所示:
var queue = advancedBus.QueueDeclare("get_test"); //声明队列
advancedBus.Publish(Exchange.GetDefault(), "get_test", false, false,
new Message<MyMessage>(new MyMessage{ Text = "Oh! Hello!" })); //发布消息
var getResult = advancedBus.Get<MyMessage>(queue); //获取单条消息
if (getResult.MessageAvailable) //如果消息可用
{
Console.Out.WriteLine("Got message: {0}", getResult.Message.Body.Text);
}
else
{
Console.Out.WriteLine("Failed to get message!");
}
要访问二进制消息,请使用非泛型的Get方法:
IBasicGetResult Get(IQueue queue);
非泛型的IBasicGetResult接口定义如下:
public interface IBasicGetResult
{
byte[] Body { get; }
MessageProperties Properties { get; }
MessageReceivedInfo Info { get; }
}
7、消息类型必须匹配
EasyNetQ 高级API要求订阅者只接收泛型类型参数指定的类型的消息。在上例中,只接收类型MyMessage类型的消息。
但是,EasyNetQ不担保你发布错误类型的消息给订阅者。比如:我可以很容易地设置一个 “交换器-绑定-队列”拓扑来发布NotMyMessage类型的消息,而用上面的处理程序接收它。
如果接收到错误类型的消息,EasyNetQ会抛出EasyNetQInvalidMessageTypeException 异常,如下:
EasyNetQ.EasyNetQInvalidMessageTypeException: Message type is incorrect. Expected 'EasyNetQ_Tests_MyMessage:EasyNetQ_Tests', but was 'EasyNetQ_Tests_MyOtherMessage:EasyNetQ_Tests'
at EasyNetQ.RabbitAdvancedBus.CheckMessageType[TMessage](MessageProperties properties) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 217
at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1`1.<Subscribe>b__0(Byte[] body, MessageProperties properties, MessageReceivedInfo messageRecievedInfo) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 131
at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass6.<Subscribe>b__5(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, Byte[] body) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 176
at EasyNetQ.QueueingConsumerFactory.HandleMessageDelivery(BasicDeliverEventArgs basicDeliverEventArgs) in D:\Source\EasyNetQ\Source\EasyNetQ\QueueingConsumerFactory.cs:line 85
8、事件
当通过RabbitHutch方法实例化一个IBus接口实例时,您可以指定一个AdvancedBusEventHandlers委托。
这个类包含一个事件处理委托属性,用于在 IAdvancedBus中的每个事件,提供了在bus实例化之前指定事件处理程序的方法。
不需要使用它,因为一旦创建了bus,仍然可以添加事件处理程序。
但是,你想要抓到 RabbitAdvancedBus.的首次已连接事件,你必须使创建 AdvancedBusEventHandlers 委托,注册已连接事件Connected
这是因为bus在其构造函数返回前只尝试连接一次。如果连接成功,会触发RabbitAdvancedBus.OnConnected事件。
var bus = RabbitHutch.CreateBus("host=localhost", new AdvancedBusEventHandlers(connected: (s, e) =>
{
var advancedBus = (IAdvancedBus)s;
Console.WriteLine(advancedBus.IsConnected); // This will print true.
}));