Apache Pulsar学习笔记08: 使用Pulsar Schema管理消息数据的类型安全性
2021-10-29
前面两节分别介绍了如何使用Pulsar Java客户端库和Go客户端库开发Producer、Consumer。 目前主流的消息中间件都不负责消息在从生产者到消费者上下游传递过程中的类型安全性,而由客户端自己负责消息的序列化和反序列操作来保障消息传递的类型安全。 Pulsar也支持这种客户端的方法,生产者将具体类型的消息对象序列化成字节数组发送到Topic,消费者从Topic接收字节数组并反序列化为具体类型的消息对象。 除了由客户端负责消息类型安全性的方法,Pulsar还提供了一种服务端的方法即Pulsar Schema,本节将学习如何使用Pulsar Schema管理消息数据的类型安全性。
Pulsar Schema介绍 #
Pulsar有一个内置的Schema Registry,允许客户端为每个Topic上传消息数据的Schema。这样Producer和Consumer就可以通知Pulsar要通过Topic传输什么类型的数据。 使用Schema,Pulsar可以强制执行类型安全,确保生产者和消费者保持同步,客户端不需要再手动进行消息的序列化和反序列化,而由Pulsar Schema在后台执行这些操作。
Pulsar Schema是在Topic上存储和执行的,而不是存在命名空间和租户中的。
Pulsar使用SchemaInfo
数据结构来定义Pulsar Schema,在Go客户端中对应的是pulsar.SchemaInfo
结构体:
1// Encapsulates data around the schema definition
2type SchemaInfo struct {
3 Name string
4 Schema string
5 Type SchemaType
6 Properties map[string]string
7}
在Java客户端库中对应的是org.apache.pulsar.common.schema.SchemaInfo
接口:
1@InterfaceAudience.Public
2@InterfaceStability.Stable
3public interface SchemaInfo {
4
5 String getName();
6
7 /**
8 * The schema data in AVRO JSON format.
9 */
10 byte[] getSchema();
11
12 /**
13 * The type of schema (AVRO, JSON, PROTOBUF, etc..).
14 */
15 SchemaType getType();
16
17 /**
18 * Additional properties of the schema definition (implementation defined).
19 */
20 Map<String, String> getProperties();
21
22 String getSchemaDefinition();
23}
可以看出一个SchemaInfo由以下字段组成:
字段 | 说明 |
---|---|
name | schema的名称(字符串) |
type | schema的类型,如STRING,如果为空字符串则表示是自定义的Schema |
schema | schema的数据(payload) |
properties | 用户定义的schema的属性,属性可能是与schema相关联的githash,环境字符串(如dev或prod)等等 |
Pulsar内置了很多Schema类型,可分为两类: 基础类型和复杂类型
基础类型 | 说明 |
---|---|
BOOLEAN | 二进制值 |
INT8 | 8 位带符号整数 |
INT16 | 16 位带符号整数 |
INT32 | 32 位带符号整数 |
INT64 | 64 位带符号整数 |
FLOAT | 单精度(32位)IEEE 754 浮点数 |
DOUBLE | 双精度(64位)IEEE 754 浮点数 |
BYTES | 8 位无符号字节序列 |
STRING | Unicode 字符序列 |
TIMESTAMP (DATE, TIME) | 表示特定时间点的逻辑类型(精度为毫秒)。 以 INT64 值存储自 1970年1月1日,00:00:00 GMT 起的毫秒数。 |
INSTANT | 时间线上的单个瞬时点,精度为纳秒 |
LOCAL_DATE | 表示日期的不可变对象,通常为“年-月-日”的格式 |
LOCAL_TIME | 表示时间的不可变对象,通常为“时-分-秒”的格式。 时间可精确到纳秒级精度。 |
LOCAL_DATE_TIME | 用来表示日期及时间的不可变对象,通常为“年-月-日-时-分-秒”格式 |
- 对于基础类型,SchemaInfo中的
schema
字段不存储任何数据,type
字段指定基础类型名称后就可以明确如何序列化和反序列化数据。 - 对于复杂类型,SchemaInfo中的
schema
字段需要指定schema具体schema定义。
每个用topic存储的SchemaInfo都有一个版本。版本信息用于管理topic内发生的schema更改。
Pulsar的Topic存储并管理SchemaInfo
的版本,使用对应版本的SchemaInfo
产生的消息被标记为对应版本,确保消息被消费时可以使用版本检索对应的SchemaInfo
,并使用正确的schema来反序列化数据。
Pulsar Schema的基本使用 #
管理Pulsar Schema有两种方法: 自动管理和手动管理,具体的内容可查阅官方文档https://pulsar.apache.org/docs/zh-CN/schema-manage/。
这里简单演示一下自动管理的场景,即如果某个schema通过了schema兼容性检查,Pulsar Producer就会自动将此 schema 更新为topic默认创建的schema。这里使用Pulsar的Java Client库开发的生产者和消费者来做一个简单的演示。
要传递的消息的数据结构为:
1public class Order {
2 private String orderID;
3 private String orderName;
4
5 public String getOrderID() {
6 return orderID;
7 }
8
9 public void setOrderID(String orderID) {
10 this.orderID = orderID;
11 }
12
13 public String getOrderName() {
14 return orderName;
15 }
16
17 public void setOrderName(String orderName) {
18 this.orderName = orderName;
19 }
20}
生产者代码如下:
1try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://192.168.2.13:6650").build()) {
2 try (Producer<Order> producer = client.newProducer(Schema.JSON(Order.class)).topic("persistent://study/app1/topic-2").create()) {
3 Order order = new Order();
4 order.setOrderID(1000L);
5 order.setOrderName("the order");
6 producer.newMessage()
7 .value(order)
8 .key(order.orderID.toString())
9 .property("p1", "v1")
10 .property("p2", "v2")
11 .send();
12 }
13}
消费者代码如下:
1try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://192.168.2.13:6650").build()) {
2 Consumer<Order> consumer = client.newConsumer(Schema.JSON(Order.class))
3 .topic("persistent://study/app1/topic-2")
4 .subscriptionName("sub-1")
5 .subscriptionType(SubscriptionType.Exclusive)
6 .subscribe();
7
8 while (true) {
9 Message<Order> msg = consumer.receive();
10 try {
11 System.out.println("Message received: " + msg.getValue().getOrderName());
12 consumer.acknowledge(msg);
13 } catch (Exception e) {
14 consumer.negativeAcknowledge(msg);
15 }
16 }
17}
使用上面的生产者和消费者代码,当生产者发送一条消息,消费者消费之后。使用pulsar-admin
查看一下Topic persistent://study/app1/topic-2
的schema:
1./pulsar-admin schemas get persistent://study/app1/topic-2
2{
3 "version": 0,
4 "schemaInfo": {
5 "name": "topic-2",
6 "schema": {
7 "type": "record",
8 "name": "Order",
9 "namespace": "com.pulsar.showcase.client.PulsarSchemaTest",
10 "fields": [
11 {
12 "name": "orderID",
13 "type": [
14 "null",
15 "long"
16 ]
17 },
18 {
19 "name": "orderName",
20 "type": [
21 "null",
22 "string"
23 ]
24 }
25 ]
26 },
27 "type": "JSON",
28 "properties": {
29 "__alwaysAllowNull": "true",
30 "__jsr310ConversionEnabled": "false"
31 }
32 }
33}
可以看到Topic persistent://study/app1/topic-2
上自动创建了schema。
可以进一步测试将消息类Order
的orderID字段的类型由Long
修改成String
,同步修改生产的代码,使其生产订单ID为字符串类型的消息发送到Broker,会发送失败并抛出了下面的因Schema不兼容而验证失败的异常:
1org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema:
2{
3 "type" : "record",
4 "name" : "Order",
5 "namespace" : "com.pulsar.showcase.client.PulsarSchemaTest",
6 "fields" : [ {
7 "name" : "orderID",
8 "type" : [ "null", "string" ],
9 "default" : null
10 }, {
11 "name" : "orderName",
12 "type" : [ "null", "string" ],
13 "default" : null
14 } ]
15}
16using schema:
17{
18 "type" : "record",
19 "name" : "Order",
20 "namespace" : "com.pulsar.showcase.client.PulsarSchemaTest",
21 "fields" : [ {
22 "name" : "orderID",
23 "type" : [ "null", "long" ],
24 "default" : null
25 }, {
26 "name" : "orderName",
27 "type" : [ "null", "string" ],
28 "default" : null
29 } ]
30}
Schema的版本演进和兼容性 #
Schema不会保持不变,它们会随着应用和服务的迭代而不断更新,以满足新的需求。 Pulsar官方文档https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/中详细介绍了Pulsar Schema如何进行版本演进和保持兼容性的相关内容。 包括: Pulsar对Schema演化提供了怎样的支持;Schema兼容性检查策略;已经在各个兼容性策略下升级客户端(Producer, Consumer)的顺序。