分区Topic的概念

前面在学习Topic的时候,已经了解了分区Topic的基本概念。在Pulsar中一个Topic只能由一个Broker提供服务,而单个Topic的吞吐量受限于为其提供服务的Broker的计算能力,这限制了Topic的最大吞吐量。 Pulsar通过分区Topic来提高吞吐量,分区Topic在底层通过N个内部Topic实现,N就是分区的数量。

每个分区(内部的Topic)还是由一个Broker提供服务。 当Producer向分区Topic发送消息时,每条消息被路由到其中的一个Broker上。Pulsar自动处理跨Broker的分区分布。

pulsar-topic-partitioning.png

上图中,示例Topic的分区数为5,只有3个Broker,跨Broker的分区分布由Pulsar自动管理,其中有两个Broker各负责2个分区,第三个负责1个分区。 这个Topic的消息被广播给了两个Consumer。分区Topic和普通的Topic对Consumer来没有什么区别,Consumer通过订阅连接到Topic,订阅类型决定了哪些消息被哪个Consumer消费。 而Producer的路由模式决定了消息被发布到哪个分区中。

  • Producer的路由模式决定消息被发布到分区Topic中的哪个分区
  • Consumer订阅Topic的订阅类型决定哪些消息被哪个Consumer消费

一般在选择分区Topic和路由模式时是从提高吞吐量能力的要求,而选择哪种订阅类型是从应用的业务语义考虑的。因为在选择使用哪种订阅类型时,在分区Topic和普通Topic之间没有区别。

路由模式

当生产者发布消息到分区主题时,不需要特意指定路由模式,默认使用RoundRobinPartition路由模式,将以轮循的方式将消息均匀分布到各个分区。目前支持以下3种路由模式:

  • SinglePartition: 如果没有消息key提供,将会随机选择一个单独的分区来发布消息,可用于将来自特定生产者的消息分组在一起,以在没有键值(key)时维护消息顺序。
  • RoundRobinPartition: 如果没有消息key提供,将以轮循的方式将消息均匀分布到各个分区
  • CustomPartition: 定制实现路由模式,控制消息分发到特定分区

如果提供了消息key,会以key做hash,然后分配消息到指定分区。因此分区中消息的顺序与路由模式和消息的key有关:

  • 当路由模式为SinglePartition或RoundRobinPartition时,按key分区,即所有拥有相同的key的消息有序,将会被发送到相同的分区
  • 按producer,当路由模式为SinglePartition,且消息没有提供key时,来自于相同的Producer的消息有序

使用路由模式

使用Java语言开发Producer发布消息到分区Topic与发布到普通Topic的代码非常相似,不同之处只是在创建Producer时可以设置消息路由模式。

下面的代码创建了一个Producer,并设置其路由模式为SinglePartition:

1
2
3
4
5
Producer<byte[]> producer = pulsarClient.newProducer()
        .topic(topic)
        .messageRoutingMode(MessageRoutingMode.SinglePartition)
        .create();
producer.send("Partitioned topic message".getBytes());

下面的代码演示了使用定制的消息路由器,要使用自定义消息路由器,提供MessageRouter接口的实现,通过实现int choosePartition(Message<?> msg, TopicMetadata metadata)方法,自定义逻辑选择将消息路由到哪个分区:

1
2
3
4
5
6
Producer<byte[]> producer = client.newProducer().topic("persistent://study/app1/topic-4").messageRouter(new MessageRouter() {
    @Override
    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
        return 0; // 所有消息都被发送到分区0
    }
}).create();

使用Go开发Producer时,设置分区路由器的代码示例:

1
2
3
4
5
6
7
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: "persistent://study/app1/topic-4",
    MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
        fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
        return 0
    },
})

参考