主流消息系统都会提供很多好用的特性支持各种业务场景,死信队列, 延迟队列这些词在使用这些消息系统时经常被提到。 Pulsar作为下一代云原生消息系统肯定也是支持这些特性的。Pulsar中没有队列的概念, 前面在学习使用Java和Go开发Pulsar的Consumer时,在创建Consumer时可以为其设置死信策略,并指定死信Topic。 本节将学习Pulsar中的延迟消息投递功能。

延迟消息投递及其适用场景

延迟消息投递特性是Pulsar 2.4.0开始引入的新功能。 Pulsar的延迟投递功能是指Producer将一个延迟消息发送到一个Topic中,Broker将延迟消息暂存到临时存储,延迟跟踪服务(Delayed tracker service)将检查消息是否到期,将到期的消息进行投递。

pulsar-delay-msg-delivery.png

延迟消息投递是暂缓消息的处理,在将来某个时间点触发投递,适用很多业务场景:

  • 服务请求异常重试: 例如如果服务请求异常,可以将异常请求延迟投递,5分钟后重试
  • 取消超时订单: 用户购买商品,但一直没有付款,所以需要提醒用户定期付款,超过时间关闭订单
  • 预约提醒功能: 会议预约,在会议开始前半小时发送通知提醒

适用延迟消息投递

Pulsar支持时间跨度超大的延迟消息,例如一个月甚至一年。在同一Topic里即可以支持延迟消息,也支持非延迟消息。 在Pulsar中使用延迟消息投递有两种方式:

  • 指定多长时间后投递deliverAfter
  • 指定在将来某个时间点投递deliverAt

延迟消息投递的功能是在Producer中使用的,在使用Producer发送消息到Topic时为消息指定deliverAfterdeliverAt的属性,下面是使用Go语言编写Producer的代码示例:

1
2
3
4
5
6
7
8
9
......
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: topicName,
})
......
_, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
    Payload:      []byte(fmt.Sprintf("test")),
    DeliverAfter: 30 * time.Minute,
})

下面是Java代码的示例:

1
2
3
4
5
6
7
producer.newMessage()
        .deliverAfter(30, TimeUnit.MINUTES)
        .value(order)
        .key(order.orderID.toString())
        .property("p1", "v1")
        .property("p2", "v2")
        .send();

参考