上一节学习了Pulsar中的生产者、消费者、订阅和订阅类型。 当一个消费者连接到Pulsar时,会创建一个订阅(Subscription),订阅有4种类型:独占模式、灾备模式、共享模式和基于key的共享模式。 订阅是命名好的配置规则,指导消息如何投递给消费者。生产者和消费者是连接到Pulsar的客户端,上节使用pulsar-client连接到Pulsar集群完成了消费者和生产者的测试。 真实场景中的生产者和消费者是我们开发的程序,从本节开始将学习如何使用各种编程语言开发Pulsar的生产者和消费者。

Pulsar为很多编程语言提供了简单易用的客户端API库,封装了客户端与Pulsar Broker之间的通信细节。Pulsar官方提供了Java, Go, Python, C++, Node.js, C#语言的客户端库,提供了Websocket的API。 除了官方正式发布的客户端库外,还有很多第三方的客户端可供选择,例如Rust, Scala, Haskell等。

本节将学习使用Pulsar的Java客户端库。可以使用Pulsar Java Client创建Pulsar的生产者、消费者、Message Reader,还能完成对Pulsar的管理任务。 本节基于java的构建工具gradle创建的java项目,并在junit单元测试代码中演示pulsar java client的使用。在项目中添加pulsar-client依赖到build.gradle文件中。

1
implementation 'org.apache.pulsar:pulsar-client:2.8.1'

1.创建PulsarClient

要使用Java客户端创建Pulsar的生产者或消费者,需要先创建一个PulsarClient对象连接到Pulsar Broker。 创建连接到Broker的PulsarClient对象需要用的pulsar协议的URL。

Pulsar使用自定义二进制协议在Producers/Consumers和Brokers之间进行通信。前面我们使用docker容器运行单机版pulsar时对外映射了两个端口,8080是pulsar的web端口,而6650则是broker提供给生产者和消费者连接的Pulsar自定义二进制协议TCP端口。

1
2
3
docker ps
CONTAINER ID   IMAGE                       COMMAND                  CREATED      STATUS      PORTS                                                                                  NAMES
e9cb8dd13c27   apachepulsar/pulsar:2.8.1   "bin/pulsar standalo…"   2 days ago   Up 2 days   0.0.0.0:6650->6650/tcp, :::6650->6650/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   pulsar

Pulsar协议的URL示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
pulsar://localhost:6650

# 多个broker
pulsar://localhost:6550,localhost:6651,localhost:6652

# 生产级别的pulsar集群
pulsar://pulsar.us-west.example.com:6650

# 使用TLS认证时的URL
pulsar+ssl://pulsar.us-west.example.com:6651

Pulsar使用自定义二进制协议在Producers/Consumers和Brokers之间进行通信。Pulsar自定义的二进制协议是基于protobuf的,遵循渐进性学习的原则,这里不展开,具体看查看Pulsar文档中的https://pulsar.apache.org/docs/zh-CN/develop-binary-protocol/

1
2
3
4
5
6
@Test
public void testCreatePulsarClient() throws PulsarClientException {
    try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://192.168.2.13:6650").build()) {
        System.out.println(client);
    }
}

从上面的测试代码可以看出,PulsarClient对象使用builder模式创建,builder的方法除了serviceUrl指定pulsar URL外,还有其他的方法用于配置认证等其他配置操作。 PulsarClient的创建为我们隐藏了客户端连接到Pulsar Broker的所有细节,例如自动重试等。

还有十分重要的一点就是: PulsarClient实例对象是线程安全,也就是说在一个项目中只需创建一个PulsarClient就可以创建和管理多个Producer和Consumer。 这也能够更好将其与Spring框架相整合。

2.使用PulsarClient创建生产者

接下来演示使用PulsarClient创建一个Producer,指定一个Topic,并发送消息到Topic。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Test
public void testProducer() throws PulsarClientException {
    try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://192.168.2.13:6650").build()) {
        try (Producer<byte[]> producer = client.newProducer().topic("persistent://study/app1/topic-1").create()) {
            producer.newMessage()
                    .key("msgKey1")
                    .value("hello".getBytes(StandardCharsets.UTF_8))
                    .property("p1", "v1")
                    .property("p2", "v2")
                    .send();
        }
    }
}

上面的代码使用PulsarClient创建了一个Producer,创建Producer最简单的方式是设置一下Topic就行。接下来使用Producer创建了一个消息,指定了消息的Key,并为这个消息设置了两个属性,最后将消息发出。 消息的属性作为消息的元数据可以用来为消息添加一些有用的信息,例如消息是什么时间发送的,是由谁发送的等等。 消息的Key的指定就很有用了,在上一节学习订阅类型的时候,生产者可以使用基于消息Key的共享订阅模式,在这个模式下多个Consumer可以绑定在一个订阅上,同时具有相同相同key的消息被交付给相同的Consumer。 另外如果使用了分区Topic,生产者发送的消息会以消息Key做Hash进行路由,相同Key的消息会被发送到相同的分区。

3.使用PulsarClient创建消费者

使用PulsarClient通过指定主题Topic和订阅Subscription来创建Consumer。

1
2
3
4
Consumer consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscribe();

上面的代码subscribe()方法将尝试使用指定的订阅名my-subscription将消费者连接到名称为my-topic的这个主题上:

  • 如果订阅已经存在并绑定了其他的消费者且订阅类型是独占模式的,则subscribe方法会抛出异常。
  • 如果是第一次使用的订阅名连接到主题,会自动创建订阅,每当创建新的订阅时,默认情况下它的位置被定位到Topic的末尾,即该订阅上的消费者从创建订阅后发布到Topic中的第一条消息开始消费。
  • 如果通过一个之前已经存在的订阅连接到主题,则消费者将从订阅中最早未确认的消息开始消费。

3.1 同步接收消息

一个常用的消费模式是让消费者在while循环中监听Topic,下面的代码演示了消费者在while循环中持续监听Topic,打印接收到的消息内容,然后确认消息已被处理。如果处理逻辑失败发生了任何异常,将发送一个否定确认(negative ack),在稍后的时间消息会重新发给消费者进行重试。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void testConsumerWhile() throws PulsarClientException {
    try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://192.168.2.13:6650").build()) {
        Consumer<byte[]> consumer = client.newConsumer()
                .topic("persistent://study/app1/topic-1")
                .subscriptionName("sub-2")
                .subscriptionType(SubscriptionType.Exclusive) // 订阅类型: 独占模式
                .subscribe();

        while (true) {
            // 等待一个消息
            Message<byte[]> msg = consumer.receive();
            try {
                // 处理消息
                System.out.println("Message received: " + new String(msg.getData()));
                // 处理完成发送确认ACK, 通知Broker消息可以被删除
                consumer.acknowledge(msg);
            } catch (Exception e) {
                // 处理失败,发送否定确认(negative ack),在稍后的时间消息会重新发给消费者进行重试
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}

3.2 异步接收消息

在while循环中监听Topic,并使用consumer.receive()接收方法是同步的方式。用来检索消息的receive()方法是阻塞方法,它会无限期地阻塞等待新消息到来。 这个模式在消息的数量较少且对消息从发布到消息之间的延迟不敏感的场景下可以使用。但同步不是一个好的方式,更好的方法是以异步的形式处理。

1
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

异步接收操作返回包装在CompletableFuture中的Message。

3.3 使用org.apache.pulsar.client.api.MessageListener接收

可以使用MessageListener监听接收,在这种方式下,Pulsar Consumer会自动创建用于运行messagelistener实例的线程池。 使用MessageListener可以轻松实现多Consumer(多线程)共享订阅消费Topic中的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testConsumerMessageListener() throws PulsarClientException, InterruptedException {
    try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://192.168.2.13:6650").build()) {
        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
            .topic("persistent://study/app1/topic-1")
            .subscriptionName("sub-4")
            .subscriptionType(SubscriptionType.Shared) // 订阅类型: 共享模式
            .messageListener((c, msg) -> {
                try {
                    System.out.println(c.getConsumerName() + " received: " + new String(msg.getData()));
                    c.acknowledge(msg);
                } catch (Exception e) {
                    c.negativeAcknowledge(msg);
                }
            });
        for (int i =0;i<4;i++) {
            consumerBuilder.consumerName("testConsumerMessageListener-" + i).subscribe();
        }
        Thread.sleep(TimeUnit.MINUTES.toMillis(1));
    }
}

3.4 批量接收

在创建Consumer时,可以设置Consumer的批量接收策略:

1
2
3
4
5
6
7
8
9
Consumer consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .batchReceivePolicy(BatchReceivePolicy.builder()
             .maxNumMessages(100)
             .maxNumBytes(1024 * 1024)
             .timeout(200, TimeUnit.MILLISECONDS)
             .build())
        .subscribe();

例如上面设置的批量接收策略为: 消息数量达到100条,消息的字节数达到1024K,等待超时达到200毫秒,三个条件满足任意一个即可。 默认批量接收策略是:

1
2
3
4
5
BatchReceivePolicy.builder()
    .maxNumMessage(-1)
    .maxNumBytes(10 * 1024 * 1024)
    .timeout(100, TimeUnit.MILLISECONDS)
    .build();

根据批量接收策略,使用consumer的batchReceive()方法可以一次接收多条消息。

1
2
3
4
5
Messages messages = consumer.batchReceive();
for (Object message : messages) {
  // do something
}
consumer.acknowledge(messages)

3.5 多主题订阅

消费者除了订阅单个主题外,还可以使用多主题订阅订阅多个主题。 要使用多主题订阅, 可以提供一个主题正则表达式(regex)或主题List。 如果通过regex选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
        .subscriptionName(subscription);

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
        .topicsPattern(allTopicsInNamespace)
        .subscribe();

// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
        .topicsPattern(someTopicsInNamespace)
        .subscribe();

在上面的示例中,消费者订阅了能够匹配主题名称正则模式的持久主题。 如果希望消费者订阅所有可以匹配主题名称模式的持久和非持久主题,需要将subscriptionTopicsMode设置为RegexSubscriptionMode.AllTopics

1
2
3
4
5
6
Pattern pattern = Pattern.compile("public/default/.*");
pulsarClient.newConsumer()
        .subscriptionName("my-sub")
        .topicsPattern(pattern)
        .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
        .subscribe();

还可以通过明确的主题列表订阅多个主题,主题列表中的主题可以跨命名空间。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
List<String> topics = Arrays.asList(
        "topic-1",
        "topic-2",
        "topic-3"
);

Consumer multiTopicConsumer = consumerBuilder
        .topics(topics)
        .subscribe();

// Alternatively:
Consumer multiTopicConsumer = consumerBuilder
        .topic(
            "topic-1",
            "topic-2",
            "topic-3"
        )
        .subscribe();

3.6 死信策略

当消费者处理消息失败时,如果消费者给了Broker否定确认(negative ack),或者Broker在预先设置的时间内没有收到确认ACK,Broker可以将消息重新发送给消息者。 这就相当于"重试"逻辑,如果重试成功,消息会被正常消费。但总会有重试无法成功的情况,因此不能无限的重试下去。

关于消费者处理消息时的异常处理有以下三种选择:

  • 第一种方法是捕获任何异常,无论是否发生异常都简单地确认这些消息已成功处理,这实际上是忽略了处理失败的消息。这种方法只适用于业务上允许消息丢失的场景。
  • 第二种方法是上面说的无限重试,消息处理成功发确认ACK,消息处理失败捕获异常时发送否定确认(negative ack)。这种方法可能会导致失败消息的无限重新传递循环,可能会引起消息堵塞,导致后边的消息无法被消费。
  • 第三种方法是将有问题的消息路由到一个单独的主题,称为死信主题(Dead Letter Topic)。这样就能避免无限的重新传递循环引起消息堵塞,同时死信主题中保留的消息可以在后续进行进一步有程序自动处理或者人工检查和处理。

在创建消费者时可以为其设置死信策略,示例代码如下:

1
2
3
4
5
6
7
Consumer consumer = client.newConsumer()
    .topic("persistent://study/app1/topic-1")
    .subscriptionName("sub5")
    .deadLetterPolicy(DeadLetterPolicy.builder()
       .maxRedeliverCount(10)
       .deadLetterTopic("persistent://study/app1/dlt-1"))
    .subscribe();

在设置死信策略时可以设置消息的最大重发次数,也就是说消息处理失败时,重试的次数,如果超过设置的最大次数,则将会被发送到死信主题。

4.使用Pulsar Client创建消息Reader

使用消息Reader可以由用户自己手动在Topic中定位,读取想要读取的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
        .topic(topic)
        .startMessageId(id) // .startMessageId(MessageId.earliest)
        .create();

while (true) {
    Message message = reader.readNext();
    // Process message
}

参考