Apache Pulsar学习笔记10: 延迟消息投递
📅 2021-11-02 | 🖱️
🔖 pulsar
主流消息系统都会提供很多好用的特性支持各种业务场景,死信队列
, 延迟队列
这些词在使用这些消息系统时经常被提到。
Pulsar作为下一代云原生消息系统肯定也是支持这些特性的。Pulsar中没有队列的概念, 前面在学习使用Java和Go开发Pulsar的Consumer时,在创建Consumer时可以为其设置死信策略
,并指定死信Topic
。
本节将学习Pulsar中的延迟消息投递功能。
延迟消息投递及其适用场景 #
延迟消息投递特性是Pulsar 2.4.0开始引入的新功能。
Pulsar的延迟投递功能是指Producer
将一个延迟消息发送到一个Topic
中,Broker
将延迟消息暂存到临时存储,延迟跟踪服务(Delayed tracker service)将检查消息是否到期,将到期的消息进行投递。
延迟消息投递是暂缓消息的处理,在将来某个时间点触发投递,适用很多业务场景:
- 服务请求异常重试: 例如如果服务请求异常,可以将异常请求延迟投递,5分钟后重试
- 取消超时订单: 用户购买商品,但一直没有付款,所以需要提醒用户定期付款,超过时间关闭订单
- 预约提醒功能: 会议预约,在会议开始前半小时发送通知提醒
使用延迟消息投递 #
Pulsar支持时间跨度超大的延迟消息,例如一个月甚至一年。在同一Topic里即可以支持延迟消息,也支持非延迟消息。 在Pulsar中使用延迟消息投递有两种方式:
- 指定多长时间后投递
deliverAfter
- 指定在将来某个时间点投递
deliverAt
延迟消息投递的功能是在Producer中使用的,在使用Producer发送消息到Topic时为消息指定deliverAfter
或deliverAt
的属性,下面是使用Go语言编写Producer的代码示例:
1......
2producer, err := client.CreateProducer(pulsar.ProducerOptions{
3 Topic: topicName,
4})
5......
6_, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
7 Payload: []byte(fmt.Sprintf("test")),
8 DeliverAfter: 30 * time.Minute,
9})
下面是Java代码的示例:
1producer.newMessage()
2 .deliverAfter(30, TimeUnit.MINUTES)
3 .value(order)
4 .key(order.orderID.toString())
5 .property("p1", "v1")
6 .property("p2", "v2")
7 .send();