上一节介绍了如何使用Pulsar的Java客户端开发Pulsar的Producer、Consumer和Reader,Pulsar使用二进制协议在Producer/Consumer和Brokers之间通信,在开发Consumer时可以使用同步或异步的方式接收消息,同时支持批量接收消息和多主体订阅的特性,消费者在处理消息时还要考虑根据具体业务场景考虑使用死信策略。本节将介绍Pulsar Go Client的使用,演示使用Pulsar的Go客户端库开发Producer和Consumer。

下面在一个使用go mod创建的Go项目中,使用Go的单元测试演示pulsar go client的使用,在go.mod中加pulsar go client的依赖:

1require github.com/apache/pulsar-client-go v0.6.0

1.创建Client

使用Go开发的程序连接到Pulsar Broker,需要先创建一个pulsar.Client接口的实例。

 1package main
 2
 3import (
 4	"testing"
 5	"time"
 6
 7	"github.com/apache/pulsar-client-go/pulsar"
 8)
 9
10func TestNewClient(t *testing.T) {
11	client, err := pulsar.NewClient(pulsar.ClientOptions{
12		URL:               "pulsar://192.168.2.13:6650",
13		OperationTimeout:  30 * time.Second,
14		ConnectionTimeout: 30 * time.Second,
15	})
16	if err != nil {
17		t.Fatal(err)
18	}
19	t.Log(client)
20	client.Close()
21}

通过pulsar.NewClient创建出pulsar.Client接口的实例,这个接口的定义如下:

 1// Client represents a pulsar client
 2type Client interface {
 3	// CreateProducer Creates the producer instance
 4	// This method will block until the producer is created successfully
 5	CreateProducer(ProducerOptions) (Producer, error)
 6
 7	// Subscribe Creates a `Consumer` by subscribing to a topic.
 8	//
 9	// If the subscription does not exist, a new subscription will be created and all messages published after the
10	// creation will be retained until acknowledged, even if the consumer is not connected
11	Subscribe(ConsumerOptions) (Consumer, error)
12
13	// CreateReader Creates a Reader instance.
14	// This method will block until the reader is created successfully.
15	CreateReader(ReaderOptions) (Reader, error)
16
17	// TopicPartitions Fetches the list of partitions for a given topic
18	//
19	// If the topic is partitioned, this will return a list of partition names.
20	// If the topic is not partitioned, the returned list will contain the topic
21	// name itself.
22	//
23	// This can be used to discover the partitions and create {@link Reader},
24	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
25	TopicPartitions(topic string) ([]string, error)
26
27	// Close Closes the Client and free associated resources
28	Close()
29}

2.创建Producer

有了pulsar.Client对象后,就可以调用它的CreateProducer来创建Producer。

 1func TestProducer(t *testing.T) {
 2	client, err := pulsar.NewClient(pulsar.ClientOptions{
 3		URL:               "pulsar://192.168.2.13:6650",
 4		OperationTimeout:  30 * time.Second,
 5		ConnectionTimeout: 30 * time.Second,
 6	})
 7	if err != nil {
 8		t.Fatal(err)
 9	}
10	defer client.Close()
11
12	producer, err := client.CreateProducer(pulsar.ProducerOptions{
13		Name:  "TestProducer-Go",
14		Topic: "persistent://study/app1/topic-1",
15	})
16	if err != nil {
17		t.Fatal(err)
18	}
19	defer producer.Close()
20	msg := &pulsar.ProducerMessage{
21		Key:     "msgKey1",
22		Payload: []byte("hello go"),
23		Properties: map[string]string{
24			"p1": "v1",
25			"p2": "v2",
26		},
27	}
28	msgID, err := producer.Send(context.Background(), msg)
29	if err != nil {
30		t.Fatal(err)
31	}
32	t.Log(msgID)
33}

上面的代码创建了一个Producer,创建了一个消息并将消息发出。

3.创建消费者

3.1 使用consumer.Receive()接收消息

使用consumer.Receive()接收消息的常用模式是在一个循环中监听Topic,下面的代码演示了消费者在for循环中持续监听Topic,打印接收到的消息内容,然后确认消息已被处理。如果处理逻辑失败发生了任何异常,将发送一个否定确认(negative ack),在稍后的时间消息会重新发给消费者进行重试。

 1func TestConsumer(t *testing.T) {
 2	client, err := pulsar.NewClient(pulsar.ClientOptions{
 3		URL:               "pulsar://192.168.2.13:6650",
 4		OperationTimeout:  30 * time.Second,
 5		ConnectionTimeout: 30 * time.Second,
 6	})
 7	if err != nil {
 8		t.Fatal(err)
 9	}
10	defer client.Close()
11
12	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
13		Topic:            "persistent://study/app1/topic-1",
14		SubscriptionName: "sub-2",          // 订阅名称
15		Type:             pulsar.Exclusive, // 订阅类型: 独占模式
16	})
17	if err != nil {
18		t.Fatal(err)
19	}
20	defer consumer.Close()
21
22	for {
23		msg, err := consumer.Receive(context.Background())
24		if err != nil {
25			t.Fatal(err)
26		}
27		if err := processMsg(msg); err != nil {
28			consumer.Nack(msg)
29		} else {
30			consumer.Ack(msg)
31		}
32
33	}
34}
35
36func processMsg(msg pulsar.Message) error {
37	fmt.Printf("consume: %s \n", msg.Payload())
38	return nil
39}

3.2 使用consumer.Chan()接收消息

consumer.Chan()返回的是一个channel,可以从这个channel中接收消息,从pulsar-client-go v0.6.0的源码中可以看出目前的consumer.Receive()是基于channel实现的。

 1func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
 2	for {
 3		select {
 4		case <-c.closeCh:
 5			return nil, newError(ConsumerClosed, "consumer closed")
 6		case cm, ok := <-c.messageCh:
 7			if !ok {
 8				return nil, newError(ConsumerClosed, "consumer closed")
 9			}
10			return cm.Message, nil
11		case <-ctx.Done():
12			return nil, ctx.Err()
13		}
14	}
15}
16
17func (c *consumer) Chan() <-chan ConsumerMessage {
18	return c.messageCh
19}

consumer.Receive()中对channel关闭和context cancel的情况做了封装。 如果直接使用consumer.Chan()话,需要我们自己处理和channel的交互,但也提供了最大的灵活性。

3.3 多主题订阅

pulsar-client-go同样支持多主题订阅,只需要再创建Consumer时,通过pulsar.ConsumerOptions的TopicsTopicsPattern字段指定。

1type ConsumerOptions struct {
2    ...
3	Topics []string
4	TopicsPattern string
5    ...
6}

Topics指定多个主题的名称,TopicsPattern指定主题名称的正则表达式,如果通过regex选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

3.4 死信策略

pulsar-client-go同样支持设置死信策略:

1consumer, err := client.Subscribe(pulsar.ConsumerOptions{
2		Topic:            "persistent://study/app1/topic-1",
3		SubscriptionName: "sub-2",
4		Type:             pulsar.Exclusive, 
5		DLQ: &pulsar.DLQPolicy{
6			MaxDeliveries:   10, // 最大重发次数
7			DeadLetterTopic: "persistent://study/app1/dlt-1", // 死信Topic
8		},
9	})

在设置死信策略时可以设置消息的最大重发次数,也就是说消息处理失败时,重试的次数,如果超过设置的最大次数,则将会被发送到死信主题。

4.创建Reader

pulsar-client-go同样支持创建Reader,可以由用户自己手动在Topic中定位,读取想要读取的消息。

 1reader, err := client.CreateReader(pulsar.ReaderOptions{
 2        Topic:          "topic-1",
 3        StartMessageID: pulsar.EarliestMessageID(),
 4    })
 5if err != nil {
 6    log.Fatal(err)
 7}
 8defer reader.Close()
 9
10for reader.HasNext() {
11    msg, err := reader.Next(context.Background())
12    if err != nil {
13        log.Fatal(err)
14    }
15
16    fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
17        msg.ID(), string(msg.Payload()))
18}

备注

本文中的测试代码在Linux环境下编译并且可以正常运行。

在MacOS Big Sur 11.6运行时,在pulsar.NewClient时,出现warning: 'SecTrustedApplicationCreateFromPath' is deprecated: first deprecated in macOS 10.15 - No longer supported [-Wdeprecated-declarations]的错误,按照https://github.com/apache/pulsar-client-go/issues/407中描述的在go.mod中替换依赖replace github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4后可以创建pulsar.Client了,但在创建Produer时出现fatal error: unexpected signal during runtime execution [signal SIGSEGV: segmentation violation code=0x1 addr=0xb01dfacedebac1e pc=0x7fff204c6c9e] runtime stack: runtime: unexpected return pc for runtime.sigpanic的错误,应该是cgo调用出现错误了。

pulsar最早的GO客户端是基于CGO的Pulsar CGo client github.com/apache/pulsar/pulsar-client-go/pulsar已经废弃不建议使用了,而本文中使用的是Pulsar Go client github.com/apache/pulsar-client-go/pulsar,号称是纯Go的,在Mac下还是遇到了cgo的问题,说明目前Pulsar Go client(v0.6.0)还不是太成熟且对MacOS支持的不太好。

参考