【注意】最后更新于 October 27, 2021,文中内容可能已过时,请谨慎使用。
上一节介绍了如何使用Pulsar的Java客户端开发Pulsar的Producer、Consumer和Reader,Pulsar使用二进制协议在Producer/Consumer和Brokers之间通信,在开发Consumer时可以使用同步或异步的方式接收消息,同时支持批量接收消息和多主体订阅的特性,消费者在处理消息时还要考虑根据具体业务场景考虑使用死信策略。本节将介绍Pulsar Go Client的使用,演示使用Pulsar的Go客户端库开发Producer和Consumer。
下面在一个使用go mod创建的Go项目中,使用Go的单元测试演示pulsar go client的使用,在go.mod
中加pulsar go client的依赖:
1
|
require github.com/apache/pulsar-client-go v0.6.0
|
1.创建Client
使用Go开发的程序连接到Pulsar Broker,需要先创建一个pulsar.Client
接口的实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package main
import (
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func TestNewClient(t *testing.T) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://192.168.2.13:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
t.Fatal(err)
}
t.Log(client)
client.Close()
}
|
通过pulsar.NewClient
创建出pulsar.Client
接口的实例,这个接口的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// Client represents a pulsar client
type Client interface {
// CreateProducer Creates the producer instance
// This method will block until the producer is created successfully
CreateProducer(ProducerOptions) (Producer, error)
// Subscribe Creates a `Consumer` by subscribing to a topic.
//
// If the subscription does not exist, a new subscription will be created and all messages published after the
// creation will be retained until acknowledged, even if the consumer is not connected
Subscribe(ConsumerOptions) (Consumer, error)
// CreateReader Creates a Reader instance.
// This method will block until the reader is created successfully.
CreateReader(ReaderOptions) (Reader, error)
// TopicPartitions Fetches the list of partitions for a given topic
//
// If the topic is partitioned, this will return a list of partition names.
// If the topic is not partitioned, the returned list will contain the topic
// name itself.
//
// This can be used to discover the partitions and create {@link Reader},
// {@link Consumer} or {@link Producer} instances directly on a particular partition.
TopicPartitions(topic string) ([]string, error)
// Close Closes the Client and free associated resources
Close()
}
|
2.创建Producer
有了pulsar.Client
对象后,就可以调用它的CreateProducer
来创建Producer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func TestProducer(t *testing.T) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://192.168.2.13:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Name: "TestProducer-Go",
Topic: "persistent://study/app1/topic-1",
})
if err != nil {
t.Fatal(err)
}
defer producer.Close()
msg := &pulsar.ProducerMessage{
Key: "msgKey1",
Payload: []byte("hello go"),
Properties: map[string]string{
"p1": "v1",
"p2": "v2",
},
}
msgID, err := producer.Send(context.Background(), msg)
if err != nil {
t.Fatal(err)
}
t.Log(msgID)
}
|
上面的代码创建了一个Producer,创建了一个消息并将消息发出。
3.创建消费者
3.1 使用consumer.Receive()接收消息
使用consumer.Receive()
接收消息的常用模式是在一个循环中监听Topic,下面的代码演示了消费者在for循环中持续监听Topic,打印接收到的消息内容,然后确认消息已被处理。如果处理逻辑失败发生了任何异常,将发送一个否定确认(negative ack),在稍后的时间消息会重新发给消费者进行重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
func TestConsumer(t *testing.T) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://192.168.2.13:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://study/app1/topic-1",
SubscriptionName: "sub-2", // 订阅名称
Type: pulsar.Exclusive, // 订阅类型: 独占模式
})
if err != nil {
t.Fatal(err)
}
defer consumer.Close()
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
t.Fatal(err)
}
if err := processMsg(msg); err != nil {
consumer.Nack(msg)
} else {
consumer.Ack(msg)
}
}
}
func processMsg(msg pulsar.Message) error {
fmt.Printf("consume: %s \n", msg.Payload())
return nil
}
|
3.2 使用consumer.Chan()接收消息
consumer.Chan()
返回的是一个channel,可以从这个channel中接收消息,从pulsar-client-go v0.6.0的源码中可以看出目前的consumer.Receive()是基于channel实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}
|
consumer.Receive()中对channel关闭和context cancel的情况做了封装。
如果直接使用consumer.Chan()话,需要我们自己处理和channel的交互,但也提供了最大的灵活性。
3.3 多主题订阅
pulsar-client-go同样支持多主题订阅,只需要再创建Consumer时,通过pulsar.ConsumerOptions的Topics
或TopicsPattern
字段指定。
1
2
3
4
5
6
|
type ConsumerOptions struct {
...
Topics []string
TopicsPattern string
...
}
|
Topics指定多个主题的名称,TopicsPattern指定主题名称的正则表达式,如果通过regex选择主题, 则所有主题都必须位于同一Pulsar命名空间中。
3.4 死信策略
pulsar-client-go同样支持设置死信策略:
1
2
3
4
5
6
7
8
9
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://study/app1/topic-1",
SubscriptionName: "sub-2",
Type: pulsar.Exclusive,
DLQ: &pulsar.DLQPolicy{
MaxDeliveries: 10, // 最大重发次数
DeadLetterTopic: "persistent://study/app1/dlt-1", // 死信Topic
},
})
|
在设置死信策略时可以设置消息的最大重发次数,也就是说消息处理失败时,重试的次数,如果超过设置的最大次数,则将会被发送到死信主题。
4.创建Reader
pulsar-client-go同样支持创建Reader,可以由用户自己手动在Topic中定位,读取想要读取的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
|
备注
本文中的测试代码在Linux环境下编译并且可以正常运行。
在MacOS Big Sur 11.6运行时,在pulsar.NewClient
时,出现warning: 'SecTrustedApplicationCreateFromPath' is deprecated: first deprecated in macOS 10.15 - No longer supported [-Wdeprecated-declarations]
的错误,按照https://github.com/apache/pulsar-client-go/issues/407中描述的在go.mod
中替换依赖replace github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
后可以创建pulsar.Client
了,但在创建Produer时出现fatal error: unexpected signal during runtime execution [signal SIGSEGV: segmentation violation code=0x1 addr=0xb01dfacedebac1e pc=0x7fff204c6c9e] runtime stack: runtime: unexpected return pc for runtime.sigpanic
的错误,应该是cgo调用出现错误了。
pulsar最早的GO客户端是基于CGO的Pulsar CGo client github.com/apache/pulsar/pulsar-client-go/pulsar
已经废弃不建议使用了,而本文中使用的是Pulsar Go client github.com/apache/pulsar-client-go/pulsar
,号称是纯Go的,在Mac下还是遇到了cgo的问题,说明目前Pulsar Go client(v0.6.0)还不是太成熟且对MacOS支持的不太好。
参考