Apache Pulsar学习笔记07: Pulsar的Go客户端库
2021-10-27
上一节介绍了如何使用Pulsar的Java客户端开发Pulsar的Producer、Consumer和Reader,Pulsar使用二进制协议在Producer/Consumer和Brokers之间通信,在开发Consumer时可以使用同步或异步的方式接收消息,同时支持批量接收消息和多主体订阅的特性,消费者在处理消息时还要考虑根据具体业务场景考虑使用死信策略。本节将介绍Pulsar Go Client的使用,演示使用Pulsar的Go客户端库开发Producer和Consumer。
下面在一个使用go mod创建的Go项目中,使用Go的单元测试演示pulsar go client的使用,在go.mod
中加pulsar go client的依赖:
1require github.com/apache/pulsar-client-go v0.6.0
1.创建Client #
使用Go开发的程序连接到Pulsar Broker,需要先创建一个pulsar.Client
接口的实例。
1package main
2
3import (
4 "testing"
5 "time"
6
7 "github.com/apache/pulsar-client-go/pulsar"
8)
9
10func TestNewClient(t *testing.T) {
11 client, err := pulsar.NewClient(pulsar.ClientOptions{
12 URL: "pulsar://192.168.2.13:6650",
13 OperationTimeout: 30 * time.Second,
14 ConnectionTimeout: 30 * time.Second,
15 })
16 if err != nil {
17 t.Fatal(err)
18 }
19 t.Log(client)
20 client.Close()
21}
通过pulsar.NewClient
创建出pulsar.Client
接口的实例,这个接口的定义如下:
1// Client represents a pulsar client
2type Client interface {
3 // CreateProducer Creates the producer instance
4 // This method will block until the producer is created successfully
5 CreateProducer(ProducerOptions) (Producer, error)
6
7 // Subscribe Creates a `Consumer` by subscribing to a topic.
8 //
9 // If the subscription does not exist, a new subscription will be created and all messages published after the
10 // creation will be retained until acknowledged, even if the consumer is not connected
11 Subscribe(ConsumerOptions) (Consumer, error)
12
13 // CreateReader Creates a Reader instance.
14 // This method will block until the reader is created successfully.
15 CreateReader(ReaderOptions) (Reader, error)
16
17 // TopicPartitions Fetches the list of partitions for a given topic
18 //
19 // If the topic is partitioned, this will return a list of partition names.
20 // If the topic is not partitioned, the returned list will contain the topic
21 // name itself.
22 //
23 // This can be used to discover the partitions and create {@link Reader},
24 // {@link Consumer} or {@link Producer} instances directly on a particular partition.
25 TopicPartitions(topic string) ([]string, error)
26
27 // Close Closes the Client and free associated resources
28 Close()
29}
2.创建Producer #
有了pulsar.Client
对象后,就可以调用它的CreateProducer
来创建Producer。
1func TestProducer(t *testing.T) {
2 client, err := pulsar.NewClient(pulsar.ClientOptions{
3 URL: "pulsar://192.168.2.13:6650",
4 OperationTimeout: 30 * time.Second,
5 ConnectionTimeout: 30 * time.Second,
6 })
7 if err != nil {
8 t.Fatal(err)
9 }
10 defer client.Close()
11
12 producer, err := client.CreateProducer(pulsar.ProducerOptions{
13 Name: "TestProducer-Go",
14 Topic: "persistent://study/app1/topic-1",
15 })
16 if err != nil {
17 t.Fatal(err)
18 }
19 defer producer.Close()
20 msg := &pulsar.ProducerMessage{
21 Key: "msgKey1",
22 Payload: []byte("hello go"),
23 Properties: map[string]string{
24 "p1": "v1",
25 "p2": "v2",
26 },
27 }
28 msgID, err := producer.Send(context.Background(), msg)
29 if err != nil {
30 t.Fatal(err)
31 }
32 t.Log(msgID)
33}
上面的代码创建了一个Producer,创建了一个消息并将消息发出。
3.创建消费者 #
3.1 使用consumer.Receive()接收消息 #
使用consumer.Receive()
接收消息的常用模式是在一个循环中监听Topic,下面的代码演示了消费者在for循环中持续监听Topic,打印接收到的消息内容,然后确认消息已被处理。如果处理逻辑失败发生了任何异常,将发送一个否定确认(negative ack),在稍后的时间消息会重新发给消费者进行重试。
1func TestConsumer(t *testing.T) {
2 client, err := pulsar.NewClient(pulsar.ClientOptions{
3 URL: "pulsar://192.168.2.13:6650",
4 OperationTimeout: 30 * time.Second,
5 ConnectionTimeout: 30 * time.Second,
6 })
7 if err != nil {
8 t.Fatal(err)
9 }
10 defer client.Close()
11
12 consumer, err := client.Subscribe(pulsar.ConsumerOptions{
13 Topic: "persistent://study/app1/topic-1",
14 SubscriptionName: "sub-2", // 订阅名称
15 Type: pulsar.Exclusive, // 订阅类型: 独占模式
16 })
17 if err != nil {
18 t.Fatal(err)
19 }
20 defer consumer.Close()
21
22 for {
23 msg, err := consumer.Receive(context.Background())
24 if err != nil {
25 t.Fatal(err)
26 }
27 if err := processMsg(msg); err != nil {
28 consumer.Nack(msg)
29 } else {
30 consumer.Ack(msg)
31 }
32
33 }
34}
35
36func processMsg(msg pulsar.Message) error {
37 fmt.Printf("consume: %s \n", msg.Payload())
38 return nil
39}
3.2 使用consumer.Chan()接收消息 #
consumer.Chan()
返回的是一个channel,可以从这个channel中接收消息,从pulsar-client-go v0.6.0的源码中可以看出目前的consumer.Receive()是基于channel实现的。
1func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
2 for {
3 select {
4 case <-c.closeCh:
5 return nil, newError(ConsumerClosed, "consumer closed")
6 case cm, ok := <-c.messageCh:
7 if !ok {
8 return nil, newError(ConsumerClosed, "consumer closed")
9 }
10 return cm.Message, nil
11 case <-ctx.Done():
12 return nil, ctx.Err()
13 }
14 }
15}
16
17func (c *consumer) Chan() <-chan ConsumerMessage {
18 return c.messageCh
19}
consumer.Receive()中对channel关闭和context cancel的情况做了封装。 如果直接使用consumer.Chan()话,需要我们自己处理和channel的交互,但也提供了最大的灵活性。
3.3 多主题订阅 #
pulsar-client-go同样支持多主题订阅,只需要再创建Consumer时,通过pulsar.ConsumerOptions的Topics
或TopicsPattern
字段指定。
1type ConsumerOptions struct {
2 ...
3 Topics []string
4 TopicsPattern string
5 ...
6}
Topics指定多个主题的名称,TopicsPattern指定主题名称的正则表达式,如果通过regex选择主题, 则所有主题都必须位于同一Pulsar命名空间中。
3.4 死信策略 #
pulsar-client-go同样支持设置死信策略:
1consumer, err := client.Subscribe(pulsar.ConsumerOptions{
2 Topic: "persistent://study/app1/topic-1",
3 SubscriptionName: "sub-2",
4 Type: pulsar.Exclusive,
5 DLQ: &pulsar.DLQPolicy{
6 MaxDeliveries: 10, // 最大重发次数
7 DeadLetterTopic: "persistent://study/app1/dlt-1", // 死信Topic
8 },
9 })
在设置死信策略时可以设置消息的最大重发次数,也就是说消息处理失败时,重试的次数,如果超过设置的最大次数,则将会被发送到死信主题。
4.创建Reader #
pulsar-client-go同样支持创建Reader,可以由用户自己手动在Topic中定位,读取想要读取的消息。
1reader, err := client.CreateReader(pulsar.ReaderOptions{
2 Topic: "topic-1",
3 StartMessageID: pulsar.EarliestMessageID(),
4 })
5if err != nil {
6 log.Fatal(err)
7}
8defer reader.Close()
9
10for reader.HasNext() {
11 msg, err := reader.Next(context.Background())
12 if err != nil {
13 log.Fatal(err)
14 }
15
16 fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
17 msg.ID(), string(msg.Payload()))
18}
备注 #
本文中的测试代码在Linux环境下编译并且可以正常运行。
在MacOS Big Sur 11.6运行时,在pulsar.NewClient
时,出现warning: 'SecTrustedApplicationCreateFromPath' is deprecated: first deprecated in macOS 10.15 - No longer supported [-Wdeprecated-declarations]
的错误,按照https://github.com/apache/pulsar-client-go/issues/407中描述的在go.mod
中替换依赖replace github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
后可以创建pulsar.Client
了,但在创建Produer时出现fatal error: unexpected signal during runtime execution [signal SIGSEGV: segmentation violation code=0x1 addr=0xb01dfacedebac1e pc=0x7fff204c6c9e] runtime stack: runtime: unexpected return pc for runtime.sigpanic
的错误,应该是cgo调用出现错误了。
pulsar最早的GO客户端是基于CGO的Pulsar CGo client github.com/apache/pulsar/pulsar-client-go/pulsar
已经废弃不建议使用了,而本文中使用的是Pulsar Go client github.com/apache/pulsar-client-go/pulsar
,号称是纯Go的,在Mac下还是遇到了cgo的问题,说明目前Pulsar Go client(v0.6.0)还不是太成熟且对MacOS支持的不太好。