来源: RabbitMQ系列(七)–批量消息和延时消息 – Diamond-Shine – 博客园
批量消息发送模式
批量消息是指把消息放到一个集合统一进行提交,这种方案设计思路是希望消息在一个会话里,比如放到ThreadLocal里的集合,拥有相同
的会话ID,带有这次提交信息的size等属性,最重要的是吧这一批消息进行合并。对于channel就是发送一次消息。这种方式也是希望消费端在消
费的时候,可以进行批量化的消费,针对一个原子业务的操作进行处理,但是不保证可靠性,需要进行补偿机制。
图例:
伪代码思路:
@Data @AllArgsConstructor @NoArgsConstructor public class BatchMessage { private long sessionId; private List<String> messageHolder = new ArrayList<>(); //用来保存message的集合 private int listSize; //集合的条数 }
把message放到ThreadLocal里面使用,这些批量的消息需要同一个sessionId,如果要入库,只是保存sessionId对应的消息集合,而不是每条消息
步骤:
1、首先业务数据入库
2、将批量消息对应的BatchMessage入库,状态为发送中
3、发送message到Broker
4、返回confirm确认
5、修改状态为消费成功
6、。。。。后面不讲了,和之前博客思路一样
延迟消息发送模式
使用场景:
1、在电商平台买到的商品签收后,不点击确认支付,系统自动在一定时间进行支付操作
2、自动超时作废的场景,你的优惠券/红包也有使用时限,也可以用延迟消息机制
实现:
1、DLX和TTL:Consumer订阅DLX,Message发送到原Queue,设置TTL为30分钟。TTL到期,消息发送到DLX,然后被Consumer消费,就可以实现延迟队列
2、rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能
安装、启用插件
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 30*60*1000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);