Apache Pulsar学习笔记05: Pulsar的生产者、消费者、订阅和订阅模式

Apache Pulsar学习笔记05: Pulsar的生产者、消费者、订阅和订阅模式

📅 2021-10-26 | 🖱️
🔖 pulsar

上一节学习了Pulsar的逻辑架构,Pulsar在逻辑结构上由租户(Tenant)、命名空间(Namespace)、主题(Topic)组成。 Pulsar是一个多租户系统,租户可以跨集群分布,租户代表了组织中特定的业务单元、产品线、核心功能,对应组织中的不同部门或团队负责。 Pulsar中消息的读写都是面向Topic的,支持分区Topic,分区Topic内部实现为等于分区数量的N个内部Topic,多个Broker就可以为多个内部Topic服务,均匀分布负载,分区发布由Pulsar自动管理对用户透明。

Pulsar是建立在发布订阅模式上的,Producer将消息发布到Topic,Consumer订阅Topic,处理接收消息,并在处理完成时发送确认(ACK)。 本节将学习Pulsar发布订阅相关的4个核心概念:ProducerConsumerSubscription(订阅)和Subscription(订阅模式),并使用Pulsar的命令行客户端工具pulsar-client进行实操测试。

生产者(Producer) #

Producer是一个连接到Pulsar Broker(或Pulsar Proxy)上的进程,Producer发送消息到Topic。

消费者(Consumer) #

Consumer也是一个连接到Pulsar Broker(或Pulsar Proxy)上的进程,是从Broker接收消息的进程,Consumer成功处理了一条消息后需要向Broker发送确认(ACK),以让Broker知道消息已经被接收和处理,如果Broker在预先设置的时间范围内没有收到确认(ACK), Broker可以将消息重新发送订阅该Topic的Consumer。

Consume成功处理了消息,需要发送确认给Broker,通知Broker可以丢弃这个条消息了。 消息的确认可以一个接一个,也可以累积确认(cumulative ack)。累积确认时,Consumer只需要确认最后一条它收到的消息。 所有之前(包含此条)的消息都将被确认,都不会被再次重发给那个消费者。

Plusar中消息的cumulative ack和RabbitMQ中消息的multiple ack相似

订阅和订阅模式 #

一旦创建订阅,即使Consumer已断开连接,Pulsar仍然可以保存所有消息。 只有在Consumer确认消息被成功处理后,保留下来的消息才会被丢弃。 订阅是命名好的配置规则,指导消息如何投递给消费者

Consumer订阅Topic的时候,通过订阅模式来控制消息的使用模式,指定如何将消息投递给一个组一个的或多个的Consumer。 一个Topic可以同时支持多个订阅,同一个Topic上的不同订阅可以使用不同的订阅模式。

Pulsar支持4种订阅模式:

  • exclusive(独占模式)
  • failover(故障转移模式,也叫灾备模式)
  • shared(共享模式)
  • key-shared(基于key的共享模式)

注意无论使用哪种订阅模式,消息都是按照他们被接收到的顺序投递的。

exclusive(独占模式) #

独占模式,只能有一个Consumer绑定到订阅上。如果多于一个Consumer尝试以使用相同的订阅订阅Topic,就会抛出异常且无法连接。

pulsar-exclusive-subscriptions.png

独占模式的使用场景是: 当需要保证每个消息只被一个已知的使用者处理一次时,可以使用此模式。

failover(灾备模式) #

在灾备订阅模式中,多个Consumer可以绑定到同一个订阅上, Consumer将会按字典顺序排序,第一个Consumer被初始化为唯一接受消息的消费者,被称为Master Consumer。 当Master Consumer断开时,所有的未被确认和后续进入的消息将会被投递给下一个Consumer。

pulsar-failover-subscriptions.png

灾备模式的使用场景是: 当需要单一处理语义和使用者的高可用性时,这种类型的订阅非常有用。例如如果希望消费者服务在发生系统故障时继续处理消息,即希望在第一个消费者服务实例因任何原因而失败时由另一个消费者服务实例接管。

shared(共享模式) #

在共享模式中,多个Consumer可以绑定到同一个订阅上。 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当一个消费者断开连接时,所有已经投递给它但还没有被确认的消息将被重新投递,分发给其它存活的消费者。

pulsar-shared-subscriptions.png

使用共享模式有以下两个限制:

  • 消息的顺序无法保证
  • 不可以使用累积确认(cumulative ack)

key-shared(基于key的共享模式) #

key-shared模式是共享模式的一种特例,它也允许多个Consumer可以绑定到同一个订阅上,与共享模式中的round robin轮询消费消息不同,key-shared模式增加了一个辅助key,确保具有相同key的消息被交付给相同的消费者。即具有相同key的消息被分组在一起,交付给相同的消费者。

使用pulsar-client #

pulsar-client是Pulsar提供的用于进行生产者和消费者测试的命令行工具。下面我们使用第3节中运行在本地docker容器中的单机pulsar环境来进行一下生产者和消费者测试。

 1docker ps
 2CONTAINER ID   IMAGE                       COMMAND                  CREATED        STATUS        PORTS                                                                                  NAMES
 3e9cb8dd13c27   apachepulsar/pulsar:2.8.1   "bin/pulsar standalo…"   46 hours ago   Up 46 hours   0.0.0.0:6650->6650/tcp, :::6650->6650/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   pulsar
 4
 5docker exec -it e9cb8dd13c27 sh
 6cd /pulsar/bin
 7
 8./pulsar-admin topics create persistent://study/app1/topic-1
 9./pulsar-admin topics list study/app1
10"persistent://study/app1/topic-1"

根据上面的命令输出,已经在租户study的命名空间app1中创建了一个名称为topic-1的topic.

使用pulsar-client启动一个消费者进程:

1./pulsar-client consume \
2persistent://study/app1/topic-1 \
3--num-messages 0 \
4--subscription-name sub-1 \
5--subscription-type Exclusive

上面使用pulsar-client启动了一个消费者进程,订阅了persistent://study/app1/topic-1这个Topic,--num-messages指定需要消费的消息数量,设置为0表示一直消费。--subscription-name指定了创建订阅的名称,subscription-type指定订阅的类型为Exclusive,即采用的订阅模式是独占模式。

在另外一个终端中使用pulsar-client启动一个生产者进程,向persistent://study/app1/topic-1这个Topic发送2次内容为helloworld的消息:

1./pulsar-client produce persistent://study/app1/topic-1 --num-produce 2 --messages "helloworld"

生产者进程发送2个helloworld的消息后就停止了,在消费者端控制台打印出了2次helloworld的消息,说明消费者进程收到并消费了消息。

1----- got message -----
2key:[null], properties:[], content:helloworld
3----- got message -----
4key:[null], properties:[], content:helloworld

接下来可以继续使用pulsar-client测试一下不同订阅模式的例子,这里略过……

参考 #

© 2025 青蛙小白 | 总访问量 | 总访客数