上一节介绍了如何使用Pulsar的Java客户端开发Pulsar的Producer、Consumer和Reader,Pulsar使用二进制协议在Producer/Consumer和Brokers之间通信,在开发Consumer时可以使用同步或异步的方式接收消息,同时支持批量接收消息和多主体订阅的特性,消费者在处理消息时还要考虑根据具体业务场景考虑使用死信策略。本节将介绍Pulsar Go Client的使用,演示使用Pulsar的Go客户端库开发Producer和Consumer。

下面在一个使用go mod创建的Go项目中,使用Go的单元测试演示pulsar go client的使用,在go.mod中加pulsar go client的依赖:

1
require github.com/apache/pulsar-client-go v0.6.0

1.创建Client

使用Go开发的程序连接到Pulsar Broker,需要先创建一个pulsar.Client接口的实例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
	"testing"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func TestNewClient(t *testing.T) {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "pulsar://192.168.2.13:6650",
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		t.Fatal(err)
	}
	t.Log(client)
	client.Close()
}

通过pulsar.NewClient创建出pulsar.Client接口的实例,这个接口的定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Client represents a pulsar client
type Client interface {
	// CreateProducer Creates the producer instance
	// This method will block until the producer is created successfully
	CreateProducer(ProducerOptions) (Producer, error)

	// Subscribe Creates a `Consumer` by subscribing to a topic.
	//
	// If the subscription does not exist, a new subscription will be created and all messages published after the
	// creation will be retained until acknowledged, even if the consumer is not connected
	Subscribe(ConsumerOptions) (Consumer, error)

	// CreateReader Creates a Reader instance.
	// This method will block until the reader is created successfully.
	CreateReader(ReaderOptions) (Reader, error)

	// TopicPartitions Fetches the list of partitions for a given topic
	//
	// If the topic is partitioned, this will return a list of partition names.
	// If the topic is not partitioned, the returned list will contain the topic
	// name itself.
	//
	// This can be used to discover the partitions and create {@link Reader},
	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
	TopicPartitions(topic string) ([]string, error)

	// Close Closes the Client and free associated resources
	Close()
}

2.创建Producer

有了pulsar.Client对象后,就可以调用它的CreateProducer来创建Producer。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func TestProducer(t *testing.T) {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "pulsar://192.168.2.13:6650",
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		t.Fatal(err)
	}
	defer client.Close()

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Name:  "TestProducer-Go",
		Topic: "persistent://study/app1/topic-1",
	})
	if err != nil {
		t.Fatal(err)
	}
	defer producer.Close()
	msg := &pulsar.ProducerMessage{
		Key:     "msgKey1",
		Payload: []byte("hello go"),
		Properties: map[string]string{
			"p1": "v1",
			"p2": "v2",
		},
	}
	msgID, err := producer.Send(context.Background(), msg)
	if err != nil {
		t.Fatal(err)
	}
	t.Log(msgID)
}

上面的代码创建了一个Producer,创建了一个消息并将消息发出。

3.创建消费者

3.1 使用consumer.Receive()接收消息

使用consumer.Receive()接收消息的常用模式是在一个循环中监听Topic,下面的代码演示了消费者在for循环中持续监听Topic,打印接收到的消息内容,然后确认消息已被处理。如果处理逻辑失败发生了任何异常,将发送一个否定确认(negative ack),在稍后的时间消息会重新发给消费者进行重试。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func TestConsumer(t *testing.T) {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "pulsar://192.168.2.13:6650",
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		t.Fatal(err)
	}
	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "persistent://study/app1/topic-1",
		SubscriptionName: "sub-2",          // 订阅名称
		Type:             pulsar.Exclusive, // 订阅类型: 独占模式
	})
	if err != nil {
		t.Fatal(err)
	}
	defer consumer.Close()

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			t.Fatal(err)
		}
		if err := processMsg(msg); err != nil {
			consumer.Nack(msg)
		} else {
			consumer.Ack(msg)
		}

	}
}

func processMsg(msg pulsar.Message) error {
	fmt.Printf("consume: %s \n", msg.Payload())
	return nil
}

3.2 使用consumer.Chan()接收消息

consumer.Chan()返回的是一个channel,可以从这个channel中接收消息,从pulsar-client-go v0.6.0的源码中可以看出目前的consumer.Receive()是基于channel实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
	for {
		select {
		case <-c.closeCh:
			return nil, newError(ConsumerClosed, "consumer closed")
		case cm, ok := <-c.messageCh:
			if !ok {
				return nil, newError(ConsumerClosed, "consumer closed")
			}
			return cm.Message, nil
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}
}

func (c *consumer) Chan() <-chan ConsumerMessage {
	return c.messageCh
}

consumer.Receive()中对channel关闭和context cancel的情况做了封装。 如果直接使用consumer.Chan()话,需要我们自己处理和channel的交互,但也提供了最大的灵活性。

3.3 多主题订阅

pulsar-client-go同样支持多主题订阅,只需要再创建Consumer时,通过pulsar.ConsumerOptions的TopicsTopicsPattern字段指定。

1
2
3
4
5
6
type ConsumerOptions struct {
    ...
	Topics []string
	TopicsPattern string
    ...
}

Topics指定多个主题的名称,TopicsPattern指定主题名称的正则表达式,如果通过regex选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

3.4 死信策略

pulsar-client-go同样支持设置死信策略:

1
2
3
4
5
6
7
8
9
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "persistent://study/app1/topic-1",
		SubscriptionName: "sub-2",
		Type:             pulsar.Exclusive, 
		DLQ: &pulsar.DLQPolicy{
			MaxDeliveries:   10, // 最大重发次数
			DeadLetterTopic: "persistent://study/app1/dlt-1", // 死信Topic
		},
	})

在设置死信策略时可以设置消息的最大重发次数,也就是说消息处理失败时,重试的次数,如果超过设置的最大次数,则将会被发送到死信主题。

4.创建Reader

pulsar-client-go同样支持创建Reader,可以由用户自己手动在Topic中定位,读取想要读取的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
reader, err := client.CreateReader(pulsar.ReaderOptions{
        Topic:          "topic-1",
        StartMessageID: pulsar.EarliestMessageID(),
    })
if err != nil {
    log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
    msg, err := reader.Next(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
        msg.ID(), string(msg.Payload()))
}

备注

本文中的测试代码在Linux环境下编译并且可以正常运行。

在MacOS Big Sur 11.6运行时,在pulsar.NewClient时,出现warning: 'SecTrustedApplicationCreateFromPath' is deprecated: first deprecated in macOS 10.15 - No longer supported [-Wdeprecated-declarations]的错误,按照https://github.com/apache/pulsar-client-go/issues/407中描述的在go.mod中替换依赖replace github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4后可以创建pulsar.Client了,但在创建Produer时出现fatal error: unexpected signal during runtime execution [signal SIGSEGV: segmentation violation code=0x1 addr=0xb01dfacedebac1e pc=0x7fff204c6c9e] runtime stack: runtime: unexpected return pc for runtime.sigpanic的错误,应该是cgo调用出现错误了。

pulsar最早的GO客户端是基于CGO的Pulsar CGo client github.com/apache/pulsar/pulsar-client-go/pulsar已经废弃不建议使用了,而本文中使用的是Pulsar Go client github.com/apache/pulsar-client-go/pulsar,号称是纯Go的,在Mac下还是遇到了cgo的问题,说明目前Pulsar Go client(v0.6.0)还不是太成熟且对MacOS支持的不太好。

参考