时间过得比较久了,在开始今天的学习之前先回顾一下前面已经学习的13节的内容。

0.阶段复习

Pulsar是一个支持多租户的、高性能的、分布式的Pub-Sub消息系统。

  • 了解Pulsar的架构。Pulsar提供了一个比Cluster更高级别的抽象Instance。
    • 一个Pulsar Instance由多个Pulsar Cluster组成
    • 一个Instance中的Cluster之间可以相互跨地域复制数据
  • 单个Pulsar集群由以下部分组成:
    • Pulsar Proxy: 是无状态的,Proxy作为集群的智能路由层,是负责Pulsar客户端与Pulsar集群交互的统一网关
    • Pulsar Brokers: 也是无状态的,是集群的服务层,Proxy会将客户端的访问请求转发到正确的Broker上。Broker作为服务层与Pulsar的存储层进行交互
    • Bookies: 一个集群有多个Bookie节点(组成Bookeeper集群)负责消息的持久化存储
    • Zookeeper: 用于集群级别的配置和协调,并存储Pulsar集群的所有元数据
  • 以docker容器运行单机Pulsar
    • 学习使用命令行工具pulsar-admin创建tenant、namespace、topic
    • 了解Pulsar Admin REST API
  • tenant、namespace、topic的基本概念
    • Pulsar基于租户、命名空间、主题的逻辑层次结构支持多租户
    • 分区Topic的概念
    • Topic URL格式
    • 持久化Topic和非持久化Topic的概念
  • 生产者和消费者、订阅和订阅模式
    • Pulsar支持: exclusive(独占), failover(故障转移/灾备), shared(共享), key-shared(基于key的共享模式) 4中订阅模式
    • 使用命令行工具pulsar-client进行生产者和消费者测试
  • 使用Pulsar Java客户端库创建生产者、消费者、Reader
    • 消费者端可以使用"同步接收消息", “异步接收消息”, “MessageListener接收” 3种模式,其中MessageListener自带线程池
    • 创建消费者时可以设置消费者的批量接收策略
    • 多主题订阅: 设置单个消费者订阅多个主题
    • 消费异常处理可以使用"无限重试", “捕获并忽略异常”, “死信主题(Dead Letter Topic)“三种方式
    • 使用消息Reader可以由用户自己手动在Topic中定位,读取想要读取的消息
  • 使用Pulsar Go客户端库
    • 消费者端支持consumer.Receive()consumer.Chan()两种方式消费消息。前者对channel关闭和context cancel的情况做了封装,后者要我们自己处理和channel的交互,但也提供了最大的灵活性。
    • 多主题订阅
    • 死信策略和死信主题
    • 使用消息Reader
  • 使用Pulsar Schema管理消息数据的类型安全性
  • Web图形化管理工具Pulsar Manager
  • 延迟消息投递特性
    • 指定多长时间后投递deliverAfter
    • 指定在将来某个时间点投递deliverAt
  • 分区Topic和路由模式
  • 认证和授权
    • 开启JWT身份认证
    • 授权和权限管理

前面的学习一直是基于以docker容器启动的单机Pulsar。今天将学习使用Helm在Kubernetes集群中部署Pulsar集群。

1.环境准备

这里使用的Kubernetes集群的版本是1.22.4,Helm的版本是3.7.1。

1.1 Pulsar集群组件和K8S Node节点规划

下面做一下Pulsar集群各个组件部署节点的规划。使用Pulsar官方的Helm Chart部署时,可选择部署各个组件。 在后边的配置中将禁用监控相关的组件(promethues, grafana等),我们这里选择以后尝试使用外部的全局Prometheus实现对Pulsar集群的监控。

本节选择部署的集群组件如下:

  • proxy: 无状态, 但pulsar的helm chart使用StatefulSet部署
  • broker: 无状态, 但pulsar的helm chart使用StatefulSet部署
  • bookie: 有状态, pulsar的helm chart使用StatefulSet部署
  • zookeeper: 有状态, pulsar的helm chart使用StatefulSet部署
  • recovery: 无状态, 但pulsar的helm chart使用StatefulSet部署
  • toolset: 无状态, 但pulsar的helm chart使用StatefulSet部署
  • pulsar-manager: 无状态, pulsar的helm chart使用Deployment部署

注意, pulsar-managers虽然是无状态的,但因为它需要使用PostgreSQL数据库,pulsar-managers的docker镜像中内置一个PostgreSQL, 这个我们在后边的配置中将改为使用集群外部的PostgreSQL。

下面说一下以上各个组件的部署节点选择。

  • 对于proxy, broker, recovery, toolset, pulsar-manager这5个无状态组件,可以让k8s将其调度到任意节点上。
  • 对于bookie, zookeeper这2个有状态组件,需要我们根据其存储卷的类型,将其规划到合适的k8s节点。

我们在线上环境对于有状态服务的部署,在存储卷的选择上,为了更好的性能,一般都是选择Local Persistent Volumes 在。因此,如果你在规划一个线上版本的Pulsar集群部署的话,对于bookie和zookeeper肯定需要单独的独立的k8s节点,并使用这些节点上创建的Local PV。 例如,一个线上生产可用的Pulsar集群可能规划如下:

  • pulsar zookeeper集群至少需要3个独立的k8s节点, 在这些节点上创建zookeeper的local pv
  • pulsar bookeeper集群(bookie节点组成)根据规划的容量需要N个独立的k8s节点, 在这些节点上创建bookie的local pv。如果后续需要扩容增加bookie节点时,只需要有新的创建好local pv的k8s节点,并对bookie的StatefulSet扩容即可。
  • pulsar proxy, broker等无状态服务,只需要有足够的数量的k8s节点,并在需要时按需扩容即可

因本文这里用于实验的k8s集群资源有限,所以尽量将上面各组件在3个k8s节点上混部,将一个用于测试环境的的Pulsar集群规划如下:

k8s节点 部署pulsar组件 备注
node1 zookeeper-0, bookie-0, broker-0, proxy-0 线上环境bookie和zookeeper一定要在单独的节点上
node2 zookeeper-1, bookie-1, broker-1, proxy-1 线上环境bookie和zookeeper一定要在单独的节点上
node3 zookeeper-2, bookie-2, broker-2, proxy-2 线上环境bookie和zookeeper一定要在单独的节点上
node1或node2或node3 recovery-0, toolset-0, pulsar-manager

基于上面测试环境的规划,我们将node1~node3三个节点打上Label和Taint:

1kubectl label node node1 node-role.kubernetes.io/pulsar=pulsar
2kubectl label node node2 node-role.kubernetes.io/pulsar=pulsar
3kubectl label node node3 node-role.kubernetes.io/pulsar=pulsar
4kubectl taint nodes node1 dedicated=pulsar:NoSchedule
5kubectl taint nodes node2 dedicated=pulsar:NoSchedule
6kubectl taint nodes node3 dedicated=pulsar:NoSchedule
  • Label node-role.kubernetes.io/pulsar=pulsar用于标记节点是专门用于运行pulsar集群组件的k8s节点。
  • Taint dedicated=pulsar:NoSchedule被打到节点上后,默认配置下k8s集群中的其他不属于pulsar集群组件的pod将不会被调度到这3个节点上,而后边我们将要部署的pulsar组件上将会使用Toleration配置允许dedicated=pulsar:NoSchedule的Taint。
  • 注意这里只是根据测试环境Pulsar集群的规划,做了上面的Label和Taint的设置,如果是生产环境,这里的Label和Taint应该做更合理和细粒度的规划,确保实现上面生产可用Pulsar集群的Node节点规划

1.2 Pulsar集群组件容器镜像准备

前面我们选择要部署Pulsar集群的proxy, broker, bookie, zookeeper, recovery, toolset, pulsar-manager 7大组件。

其中proxy, broker, bookie, zookeeper, recovery, toolset的官方容器镜像都是apachepulsar/pulsar-all。 pulsar-manager的官方镜像是apachepulsar/pulsar-manager

本文使用的pulsar官方的helm chart https://github.com/apache/pulsar-helm-chart/releases

pulsar-helm-chart的版本为2.7.7,该版本中pulsar的版本为2.7.4, pulsar-manager版本为v0.1.0:

  • apachepulsar/pulsar-all:2.7.4
  • apachepulsar/pulsar-manager:v0.1.0

注意因为pulsar-manager:v0.1.0有这个ISSUE https://github.com/apache/pulsar-helm-chart/issues/133中描述的问题,所以在后边的部署将镜像pulsar-manager:v0.1.0更换成了pulsar-manager:v0.2.0

为了提高效率,这里将apachepulsar/pulsar-all:2.7.4和apachepulsar/pulsar-manager:v0.2.0这两个镜像转存到了k8s集群所使用的私有镜像仓库中,例如:

  • harbor.example.com/library/apachepulsar/pulsar-all:2.7.4
  • harbor.example.com/library/apachepulsar/pulsar-manager:v0.2.0

1.3 创建JWT认证所需的K8S Secret

这里部署的Pulsar集群需要在安全上开通JWT认证。根据前面学习的内容,JWT支持通过两种不同的秘钥生成和验证Token:

  • 对称秘钥:
    • 使用单个Secret key来生成和验证Token
  • 非对称秘钥:包含由私钥和公钥组成的一对密钥
  • 使用Private key生成Token
  • 使用Public key验证Token

推荐使用非对称密钥的方式,需要先生成密钥对,再用秘钥生成token。因为Pulsar被部署在K8S集群中,在K8S集群中存储这些秘钥和Token的最好的方式是使用K8S的Secret。

pulsar-helm-chart专门提供了一个prepare_helm_release.sh脚本,可以用来生成这些Secret。

下面我们将pulsar-helm-chart的源码clone到K8S的控制节点上(kubectl和helm可用的节点):

1git clone -b pulsar-2.7.7 --depth 1 https://github.com/apache/pulsar-helm-chart.git
2cd pulsar-helm-chart/

执行下面的命令生成秘钥对和Token的Secret的Manifest:

1./scripts/pulsar/prepare_helm_release.sh \
2    -n pulsar \
3    -k pulsar \
4    -l

上面的命令中:

  • -n指定的生成Secret Manifest中安装的命名空间,这里我是将其部署到K8S中的pulsar namespace中,所以指定为pulsar,当然也可以指定部署到其他的namespace中。
  • -k指定的是使用helm部署时的helm release名称,这里指定为pulsar。
  • -l指定只将生成的内容输出达到本地,而不会自动部署到K8S中。比较喜欢这种手动的方式,因为一切比较可控。
  • 注意这个脚本还有一个-s,--symmetric参数,如果给这个参数的话,JWT认证将使用对称秘钥的方式,这里没有给这个参数,就使用非对称秘钥的方式。

执行上面的脚本会输出以下内容:

 1generate the token keys for the pulsar cluster
 2---
 3The private key and public key are generated to ... successfully.
 4apiVersion: v1
 5data:
 6  PRIVATEKEY: <...>
 7  PUBLICKEY: <...>
 8kind: Secret
 9metadata:
10  creationTimestamp: null
11  name: pulsar-token-asymmetric-key
12  namespace: pulsar
13generate the tokens for the super-users: proxy-admin,broker-admin,admin
14generate the token for proxy-admin
15---
16pulsar-token-asymmetric-key
17apiVersion: v1
18data:
19  TOKEN: <...>
20  TYPE: YXN5bW1ldHJpYw==
21kind: Secret
22metadata:
23  creationTimestamp: null
24  name: pulsar-token-proxy-admin
25  namespace: pulsar
26generate the token for broker-admin
27---
28pulsar-token-asymmetric-key
29apiVersion: v1
30data:
31  TOKEN: <...>
32  TYPE: YXN5bW1ldHJpYw==
33kind: Secret
34metadata:
35  creationTimestamp: null
36  name: pulsar-token-broker-admin
37  namespace: pulsar
38generate the token for admin
39---
40pulsar-token-asymmetric-key
41apiVersion: v1
42data:
43  TOKEN:  <...>
44  TYPE: YXN5bW1ldHJpYw==
45kind: Secret
46metadata:
47  creationTimestamp: null
48  name: pulsar-token-admin
49  namespace: pulsar
50-------------------------------------
51
52The jwt token secret keys are generated under:
53    - 'pulsar-token-asymmetric-key'
54
55The jwt tokens for superusers are generated and stored as below:
56    - 'proxy-admin':secret('pulsar-token-proxy-admin')
57    - 'broker-admin':secret('pulsar-token-broker-admin')
58    - 'admin':secret('pulsar-token-admin')

从输出可以看出,该脚本生成了4个K8S Secret的Manifest:

  • pulsar-token-asymmetric-key这个Secret中是用于生成Token和验证Token的私钥和公钥
  • pulsar-token-proxy-admin这个Secret中是用于proxy的超级用户角色Token
  • pulsar-token-broker-admin这个Secret中是用于broker的超级用户角色Token
  • pulsar-token-admin这个Secret中是用于管理客户端的超级用户角色Token

接下来手动将这4个Secret使用kubectl apply创建到K8S的pulsar命名空间中。 创建完成后,可以使用kubectl找到它们:

1kubectl get secret -n pulsar | grep pulsar-token
2pulsar-token-admin                        Opaque                    2      5m
3pulsar-token-asymmetric-key               Opaque                    2      5m
4pulsar-token-broker-admin                 Opaque                    2      5m
5pulsar-token-proxy-admin                  Opaque                    2      5m

1.4 创建Zookeeper和Bookie的Local PV

根据部署Pulsar的K8S节点的规划,下面需要为zookeeper, bookie所在的节点在K8S上创建Local Persistent Volume。

注意每个zookeeper节点需要一个data的local volume,每个bookie节点需要journal和ledgers共两个local volume。

在创建Local PV之前,需要确认一下k8s中存在StorageClasslocal-storage,如果没有可以使用下面的manifest创建。

1apiVersion: storage.k8s.io/v1
2kind: StorageClass
3metadata:
4  name: local-storage
5provisioner: kubernetes.io/no-provisioner
6volumeBindingMode: WaitForFirstConsumer
7reclaimPolicy: Retain

注意现在的K8S中不在直接提供local volume的provisioner,这里也没有使用provisioner,因此后续对local volume的创建和管理都是需要K8S集群管理员的手动进行。 也是说目前Kubernetes核心中不包含对对本地卷进行动态发放和管理的provisioner,如果想要体验动态发放和管理的功能,可以试一下由Rancher提供的Local Path Provisioner

我这里依然使用手动管理的方式,即通过手动在K8S节点上创建Local Volume,手动绑定Local Volume与Pulsar Zookeeper和Bookie的PVC(PersistentVolumeClaim)之间的关系。

下面,先手动在node1, node2, node3上创建local volume对应的数据目录:

1mkdir -p /home/puslar/data/zookeeper-data
2mkdir -p /home/puslar/data/bookie-data/ledgers
3mkdir -p /home/puslar/data/bookie-data/journal

zookeeper data的local pv的manifest如下:

 1---
 2apiVersion: v1
 3kind: PersistentVolume
 4metadata:
 5  name: pulsar-zookeeper-data-pulsar-zookeeper-0
 6spec:
 7  capacity:
 8    storage: 20Gi 
 9  accessModes:
10  - ReadWriteOnce
11  persistentVolumeReclaimPolicy: Retain
12  storageClassName: local-storage
13  local:
14    path: /home/puslar/data/zookeeper-data
15  claimRef:
16    name: pulsar-zookeeper-data-pulsar-zookeeper-0
17    namespace: pulsar
18  nodeAffinity:
19    required:
20      nodeSelectorTerms:
21      - matchExpressions:
22        - key: kubernetes.io/hostname
23          operator: In
24          values:
25          - node1
26
27---
28apiVersion: v1
29kind: PersistentVolume
30metadata:
31  name: pulsar-zookeeper-data-pulsar-zookeeper-1
32spec:
33  capacity:
34    storage: 20Gi 
35  accessModes:
36  - ReadWriteOnce
37  persistentVolumeReclaimPolicy: Retain
38  storageClassName: local-storage
39  local:
40    path: /home/puslar/data/zookeeper-data
41  claimRef:
42    name: pulsar-zookeeper-data-pulsar-zookeeper-1
43    namespace: pulsar
44  nodeAffinity:
45    required:
46      nodeSelectorTerms:
47      - matchExpressions:
48        - key: kubernetes.io/hostname
49          operator: In
50          values:
51          - node2
52
53
54---
55apiVersion: v1
56kind: PersistentVolume
57metadata:
58  name: pulsar-zookeeper-data-pulsar-zookeeper-2
59spec:
60  capacity:
61    storage: 20Gi 
62  accessModes:
63  - ReadWriteOnce
64  persistentVolumeReclaimPolicy: Retain
65  storageClassName: local-storage
66  local:
67    path: /home/puslar/data/zookeeper-data
68  claimRef:
69    name: pulsar-zookeeper-data-pulsar-zookeeper-2
70    namespace: pulsar
71  nodeAffinity:
72    required:
73      nodeSelectorTerms:
74      - matchExpressions:
75        - key: kubernetes.io/hostname
76          operator: In
77          values:
78          - node3

上面的manifest仍中将3个Local PV通过nodeAffinity创建并关联到到node1~node3上,同时使用claimRef将这3个Local PV与即将在K8S集群中部署的zookeeper SatefulSet中的PVC绑定。 使用kubectl apply创建上面的manifest。

bookie ledgers和journal的local pv的manifest如下:

  1---
  2apiVersion: v1
  3kind: PersistentVolume
  4metadata:
  5  name: pulsar-bookie-ledgers-pulsar-bookie-0
  6spec:
  7  capacity:
  8    storage: 50Gi 
  9  accessModes:
 10  - ReadWriteOnce
 11  persistentVolumeReclaimPolicy: Retain
 12  storageClassName: local-storage
 13  local:
 14    path: /home/puslar/data/bookie-data/ledgers
 15  claimRef:
 16    name: pulsar-bookie-ledgers-pulsar-bookie-0
 17    namespace: pulsar
 18  nodeAffinity:
 19    required:
 20      nodeSelectorTerms:
 21      - matchExpressions:
 22        - key: kubernetes.io/hostname
 23          operator: In
 24          values:
 25          - node1
 26---
 27apiVersion: v1
 28kind: PersistentVolume
 29metadata:
 30  name: pulsar-bookie-journal-pulsar-bookie-0
 31spec:
 32  capacity:
 33    storage: 50Gi 
 34  accessModes:
 35  - ReadWriteOnce
 36  persistentVolumeReclaimPolicy: Retain
 37  storageClassName: local-storage
 38  local:
 39    path: /home/puslar/data/bookie-data/journal
 40  claimRef:
 41    name: pulsar-bookie-journal-pulsar-bookie-0
 42    namespace: pulsar
 43  nodeAffinity:
 44    required:
 45      nodeSelectorTerms:
 46      - matchExpressions:
 47        - key: kubernetes.io/hostname
 48          operator: In
 49          values:
 50          - node1
 51
 52
 53
 54---
 55apiVersion: v1
 56kind: PersistentVolume
 57metadata:
 58  name: pulsar-bookie-ledgers-pulsar-bookie-1
 59spec:
 60  capacity:
 61    storage: 50Gi 
 62  accessModes:
 63  - ReadWriteOnce
 64  persistentVolumeReclaimPolicy: Retain
 65  storageClassName: local-storage
 66  local:
 67    path: /home/puslar/data/bookie-data/ledgers
 68  claimRef:
 69    name: pulsar-bookie-ledgers-pulsar-bookie-1
 70    namespace: pulsar
 71  nodeAffinity:
 72    required:
 73      nodeSelectorTerms:
 74      - matchExpressions:
 75        - key: kubernetes.io/hostname
 76          operator: In
 77          values:
 78          - node2
 79---
 80apiVersion: v1
 81kind: PersistentVolume
 82metadata:
 83  name: pulsar-bookie-journal-pulsar-bookie-1
 84spec:
 85  capacity:
 86    storage: 50Gi 
 87  accessModes:
 88  - ReadWriteOnce
 89  persistentVolumeReclaimPolicy: Retain
 90  storageClassName: local-storage
 91  local:
 92    path: /home/puslar/data/bookie-data/journal
 93  claimRef:
 94    name: pulsar-bookie-journal-pulsar-bookie-1
 95    namespace: pulsar
 96  nodeAffinity:
 97    required:
 98      nodeSelectorTerms:
 99      - matchExpressions:
100        - key: kubernetes.io/hostname
101          operator: In
102          values:
103          - node2
104
105
106
107
108---
109apiVersion: v1
110kind: PersistentVolume
111metadata:
112  name: pulsar-bookie-ledgers-pulsar-bookie-2
113spec:
114  capacity:
115    storage: 50Gi 
116  accessModes:
117  - ReadWriteOnce
118  persistentVolumeReclaimPolicy: Retain
119  storageClassName: local-storage
120  local:
121    path: /home/puslar/data/bookie-data/ledgers
122  claimRef:
123    name: pulsar-bookie-ledgers-pulsar-bookie-2
124    namespace: pulsar
125  nodeAffinity:
126    required:
127      nodeSelectorTerms:
128      - matchExpressions:
129        - key: kubernetes.io/hostname
130          operator: In
131          values:
132          - node3
133---
134apiVersion: v1
135kind: PersistentVolume
136metadata:
137  name: pulsar-bookie-journal-pulsar-bookie-2
138spec:
139  capacity:
140    storage: 50Gi 
141  accessModes:
142  - ReadWriteOnce
143  persistentVolumeReclaimPolicy: Retain
144  storageClassName: local-storage
145  local:
146    path: /home/puslar/data/bookie-data/journal
147  claimRef:
148    name: pulsar-bookie-journal-pulsar-bookie-2
149    namespace: pulsar
150  nodeAffinity:
151    required:
152      nodeSelectorTerms:
153      - matchExpressions:
154        - key: kubernetes.io/hostname
155          operator: In
156          values:
157          - node3

上面的manifest仍中将6个Local PV通过nodeAffinity创建并关联到到node1~node3上,同时使用claimRef将这3个Local PV与即将在K8S集群中部署的zookeeper SatefulSet中的PVC绑定。 使用kubectl apply创建上面的manifest。

1.5 准备Pulsar Manager的PostgreSQL数据库

这里准备让Pulsar Manager使用外部数据库,需要提前在外部的PostgreSQL中创建好用户和数据库表结构。

创建数据库和用户:

1CREATE USER pulsar_manager WITH PASSWORD '<password>';
2
3CREATE DATABASE pulsar_manager OWNER pulsar_manager;
4
5GRANT ALL PRIVILEGES ON DATABASE pulsar_manager to pulsar_manager;
6GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA pulsar_manager TO pulsar_manager;
7ALTER SCHEMA public OWNER to pulsar_manager;
8GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO pulsar_manager;
9GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO pulsar_manager;

创建表结构(建表脚本可以在pulsar-manager的镜像中找到):

  1CREATE TABLE IF NOT EXISTS environments (
  2  name varchar(256) NOT NULL,
  3  broker varchar(1024) NOT NULL,
  4  CONSTRAINT PK_name PRIMARY KEY (name),
  5  UNIQUE (broker)
  6);
  7
  8CREATE TABLE IF NOT EXISTS topics_stats (
  9  topic_stats_id BIGSERIAL PRIMARY KEY,
 10  environment varchar(255) NOT NULL,
 11  cluster varchar(255) NOT NULL,
 12  broker varchar(255) NOT NULL,
 13  tenant varchar(255) NOT NULL,
 14  namespace varchar(255) NOT NULL,
 15  bundle varchar(255) NOT NULL,
 16  persistent varchar(36) NOT NULL,
 17  topic varchar(255) NOT NULL,
 18  producer_count BIGINT,
 19  subscription_count BIGINT,
 20  msg_rate_in double precision  ,
 21  msg_throughput_in double precision    ,
 22  msg_rate_out double precision ,
 23  msg_throughput_out double precision   ,
 24  average_msg_size double precision     ,
 25  storage_size double precision ,
 26  time_stamp BIGINT
 27);
 28
 29CREATE TABLE IF NOT EXISTS publishers_stats (
 30  publisher_stats_id BIGSERIAL PRIMARY KEY,
 31  producer_id BIGINT,
 32  topic_stats_id BIGINT NOT NULL,
 33  producer_name varchar(255) NOT NULL,
 34  msg_rate_in double precision  ,
 35  msg_throughput_in double precision    ,
 36  average_msg_size double precision     ,
 37  address varchar(255),
 38  connected_since varchar(128),
 39  client_version varchar(36),
 40  metadata text,
 41  time_stamp BIGINT,
 42  CONSTRAINT fk_publishers_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
 43);
 44
 45CREATE TABLE IF NOT EXISTS replications_stats (
 46  replication_stats_id BIGSERIAL PRIMARY KEY,
 47  topic_stats_id BIGINT NOT NULL,
 48  cluster varchar(255) NOT NULL,
 49  connected BOOLEAN,
 50  msg_rate_in double precision  ,
 51  msg_rate_out double precision ,
 52  msg_rate_expired double precision     ,
 53  msg_throughput_in double precision    ,
 54  msg_throughput_out double precision   ,
 55  msg_rate_redeliver double precision   ,
 56  replication_backlog BIGINT,
 57  replication_delay_in_seconds BIGINT,
 58  inbound_connection varchar(255),
 59  inbound_connected_since varchar(255),
 60  outbound_connection varchar(255),
 61  outbound_connected_since varchar(255),
 62  time_stamp BIGINT,
 63  CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
 64);
 65
 66CREATE TABLE IF NOT EXISTS subscriptions_stats (
 67  subscription_stats_id BIGSERIAL PRIMARY KEY,
 68  topic_stats_id BIGINT NOT NULL,
 69  subscription varchar(255) NULL,
 70  msg_backlog BIGINT,
 71  msg_rate_expired double precision     ,
 72  msg_rate_out double precision ,
 73  msg_throughput_out double precision   ,
 74  msg_rate_redeliver double precision   ,
 75  number_of_entries_since_first_not_acked_message BIGINT,
 76  total_non_contiguous_deleted_messages_range BIGINT,
 77  subscription_type varchar(16),
 78  blocked_subscription_on_unacked_msgs BOOLEAN,
 79  time_stamp BIGINT,
 80  UNIQUE (topic_stats_id, subscription),
 81  CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
 82);
 83
 84CREATE TABLE IF NOT EXISTS consumers_stats (
 85  consumer_stats_id BIGSERIAL PRIMARY KEY,
 86  consumer varchar(255) NOT NULL,
 87  topic_stats_id BIGINT NOT NUll,
 88  replication_stats_id BIGINT,
 89  subscription_stats_id BIGINT,
 90  address varchar(255),
 91  available_permits BIGINT,
 92  connected_since varchar(255),
 93  msg_rate_out double precision ,
 94  msg_throughput_out double precision   ,
 95  msg_rate_redeliver double precision   ,
 96  client_version varchar(36),
 97  time_stamp BIGINT,
 98  metadata text
 99);
100
101CREATE TABLE IF NOT EXISTS tokens (
102  token_id BIGSERIAL PRIMARY KEY,
103  role varchar(256) NOT NULL,
104  description varchar(128),
105  token varchar(1024) NOT NUll,
106  UNIQUE (role)
107);
108
109CREATE TABLE IF NOT EXISTS users (
110  user_id BIGSERIAL PRIMARY KEY,
111  access_token varchar(256),
112  name varchar(256) NOT NULL,
113  description varchar(128),
114  email varchar(256),
115  phone_number varchar(48),
116  location varchar(256),
117  company varchar(256),
118  expire BIGINT,
119  password varchar(256),
120  UNIQUE (name)
121);
122
123CREATE TABLE IF NOT EXISTS roles (
124  role_id BIGSERIAL PRIMARY KEY,
125  role_name varchar(256) NOT NULL,
126  role_source varchar(256) NOT NULL,
127  description varchar(128),
128  resource_id BIGINT NOT NULL,
129  resource_type varchar(48) NOT NULL,
130  resource_name varchar(48) NOT NULL,
131  resource_verbs varchar(256) NOT NULL,
132  flag INT NOT NULL
133);
134
135CREATE TABLE IF NOT EXISTS tenants (
136  tenant_id BIGSERIAL PRIMARY KEY,
137  tenant varchar(255) NOT NULL,
138  admin_roles varchar(255),
139  allowed_clusters varchar(255),
140  environment_name varchar(255),
141  UNIQUE(tenant)
142);
143
144CREATE TABLE IF NOT EXISTS namespaces (
145  namespace_id BIGSERIAL PRIMARY KEY,
146  tenant varchar(255) NOT NULL,
147  namespace varchar(255) NOT NULL,
148  UNIQUE(tenant, namespace)
149);
150
151CREATE TABLE IF NOT EXISTS role_binding(
152  role_binding_id BIGSERIAL PRIMARY KEY,
153  name varchar(256) NOT NULL,
154  description varchar(256),
155  role_id BIGINT NOT NULL,
156  user_id BIGINT NOT NULL
157);

上面已经做好了部署的准备工作,下面将使用Helm在K8S集群中部署Pulsar集群。

2.使用Helm在K8S中部署Pulsar

从https://github.com/apache/pulsar-helm-chart/releases下载pulsar helm chart 2.7.7到K8S的控制节点上(kubectl和helm可用)。

1https://github.com/apache/pulsar-helm-chart/releases/download/pulsar-2.7.7/pulsar-2.7.7.tgz

2.1 定制编写helm chart的values.yaml

定制编写helm chart的values.yaml文件如下, 定制的内容比较多,具体见下面文件的注释:

  1auth:
  2  authentication:
  3    enabled: true  # 开启jwt认证
  4    provider: "jwt"
  5    jwt:
  6      usingSecretKey: false # jwt认证使用非对称秘钥对
  7  authorization:
  8    enabled: true # 开启授权
  9  superUsers:
 10    # broker to broker communication
 11    broker: "broker-admin"
 12    # proxy to broker communication
 13    proxy: "proxy-admin"
 14    # pulsar-admin client to broker/proxy communication
 15    client: "admin"
 16
 17
 18components: # 启用的组件
 19  autorecovery: true
 20  bookkeeper: true
 21  broker: true
 22  functions: true
 23  proxy: true
 24  pulsar_manager: true
 25  toolset: true
 26  zookeeper: true
 27
 28monitoring: # 关闭监控组件, 后续尝试使用外部Prometheus对pulsar集群进行监控
 29  grafana: false
 30  prometheus: false
 31  node_exporter: false
 32
 33
 34volumes:
 35  local_storage: true # 数据卷使用local storage
 36
 37
 38
 39proxy: # proxy的配置(这里是测试环境, 将proxy也调度到node1或node2或node3)
 40  nodeSelector:
 41    node-role.kubernetes.io/pulsar: pulsar
 42  tolerations:
 43  - key: "dedicated"
 44    operator: "Equal"
 45    value: "pulsar"
 46    effect: "NoSchedule"
 47  configData:
 48     PULSAR_PREFIX_authenticateMetricsEndpoint: "false"
 49
 50
 51broker: # broker的配置(这里是测试环境, 将proxy也调度到node1或node2或node3)
 52  nodeSelector:
 53    node-role.kubernetes.io/pulsar: pulsar
 54  tolerations:
 55  - key: "dedicated"
 56    operator: "Equal"
 57    value: "pulsar"
 58    effect: "NoSchedule"
 59  
 60
 61zookeeper: # broker的配置
 62  replicaCount: 3
 63  tolerations:
 64  - key: "dedicated"
 65    operator: "Equal"
 66    value: "pulsar"
 67    effect: "NoSchedule"
 68  volumes:
 69    data: # 配置使用local pv, 需要与前面手动创建的local pv信息一致
 70      local_storage: true
 71      size: 20Gi
 72
 73
 74bookkeeper: # bookkeeper的配置
 75  replicaCount: 3
 76  tolerations:
 77  - key: "dedicated"
 78    operator: "Equal"
 79    value: "pulsar"
 80    effect: "NoSchedule"
 81  volumes:
 82    journal: # 配置使用local pv, 需要与前面手动创建的local pv信息一致
 83      local_storage: true
 84      size: 50Gi
 85  ledgers:  # 配置使用local pv, 需要与前面手动创建的local pv信息一致
 86      local_storage: true
 87      size: 50Gi
 88
 89pulsar_manager: # pulsar_manager的配置(这里是测试环境, 将pulsar_manager也调度到node1或node2或node3)
 90  replicaCount: 1
 91  admin:
 92    # 文档中描述这里是pulsar manager web界面登录用户密码,但实际上当使用外部PostgreSQL数据库时,这里需要指定PostgreSQL的数据库和密码,不知道是否是pulsar-helm-chart 2.7.7的问题
 93    user: pulsar_manager
 94    password: 05aM3Braz_M4RWpn
 95  configData:
 96    DRIVER_CLASS_NAME: org.postgresql.Driver
 97    URL: jdbc:postgresql://<ip>:5432/pulsar_manager
 98    # 文档中描述这里PostgreSQL数据库的密码,但实际上这里不能指定USERNAME和PASSWORD, 不知道是否是pulsar-helm-chart 2.7.7的问题
 99    # USERNAME: pulsar_manager
100    # PASSWORD: 05aM3Braz_M4RWpn
101    LOG_LEVEL: INFO
102    ## 开启JWT认证后, 这里需要指定pulsar-token-admin这个Secret中的JWT Token
103    JWT_TOKEN: <jwt token...>
104
105
106autorecovery: # autorecovery的配置(这里是测试环境, 将autorecovery也调度到node1或node2或node3)
107  replicaCount: 1
108  nodeSelector:
109    node-role.kubernetes.io/pulsar: pulsar
110  tolerations:
111  - key: "dedicated"
112    operator: "Equal"
113    value: "pulsar"
114    effect: "NoSchedule"
115
116toolset: # toolset的配置(这里是测试环境, 将toolset也调度到node1或node2或node3)
117  replicaCount: 1
118  nodeSelector:
119    node-role.kubernetes.io/pulsar: pulsar
120  tolerations:
121  - key: "dedicated"
122    operator: "Equal"
123    value: "pulsar"
124    effect: "NoSchedule"
125
126
127images: # 对个组件使用私有镜像仓库的配置
128  imagePullSecrets:
129  - regsecret # 私有镜像仓库的image pull secret, 需要提前在k8s命名空间中创建
130  autorecovery:
131    repository: harbor.example.com/library/apachepulsar/pulsar-all
132    tag: 2.7.4
133  bookie:
134    repository: harbor.example.com/library/apachepulsar/pulsar-all
135    tag: 2.7.4
136  broker:
137    repository: harbor.example.com/library/apachepulsar/pulsar-all
138    tag: 2.7.4
139  functions:
140    repository: harbor.example.com/library/apachepulsar/pulsar-all
141    tag: 2.7.4
142  proxy:
143    repository: harbor.example.com/library/apachepulsar/pulsar-all
144    tag: 2.7.4
145  pulsar_manager:
146    repository: harbor.example.com/library/apachepulsar/pulsar-manager
147    tag: v0.2.0
148  zookeeper:
149    repository: harbor.example.com/library/apachepulsar/pulsar-all
150    tag: 2.7.4
151
152pulsar_metadata:
153  component: pulsar-init
154  image:
155    # the image used for running `pulsar-cluster-initialize` job
156    repository: harbor.example.com/library/apachepulsar/pulsar-all
157    tag: 2.7.4

因为当前在pulsar-helm-chart 2.7.7 中好像不支持为pulsar-init设置私有仓库的imagePullSecret,所以下面为pulsar namespace中的default servcieaccount 添加上imagePullSecret。

1kubectl patch serviceaccount default -p '{"imagePullSecrets": [{"name": "regsecret"}]}' -n pulsar

2.2 使用helm install安装pulsar

定制完value.yaml之后,使用下面的命令向K8S集群部署pulsar。

1helm install \
2    --values values.yaml \
3    --set initialize=true \
4    --namespace pulsar \
5    pulsar pulsar-2.7.7.tgz

安装完成后使用下面的命令查看一下两个初始化job pulsar-pulsar-init和pulsar-bookie-init的pod状态为Complete:

1kubectl get pod -n pulsar  | grep init
2pulsar-bookie-init--1-h65bp              0/1     Completed   0               5m14s
3pulsar-pulsar-init--1-t4thq              0/1     Completed   0               5m5s

使用下面的命令查看一下pulsar集群各个组件的Pod状态全部都为Running:

 1kubectl get pod -n pulsar -l cluster=pulsar -o wide
 2NAME                                     READY   STATUS    RESTARTS      AGE   IP              NODE   NOMINATED NODE   READINESS GATES
 3pulsar-bookie-0                          1/1     Running   0             14m   10.244.226.91   node1    <none>           <none>
 4pulsar-bookie-1                          1/1     Running   0             14m   10.244.63.90    node2    <none>           <none>
 5pulsar-bookie-2                          1/1     Running   0             14m   10.244.46.92    node3    <none>           <none>
 6pulsar-broker-0                          1/1     Running   0             14m   10.244.226.90   node1    <none>           <none>
 7pulsar-broker-1                          1/1     Running   0             14m   10.244.63.89    node2    <none>           <none>
 8pulsar-broker-2                          1/1     Running   0             14m   10.244.46.90    node3    <none>           <none>
 9pulsar-proxy-0                           1/1     Running   0             14m   10.244.226.93   node1    <none>           <none>
10pulsar-proxy-1                           1/1     Running   0             14m   10.244.63.91    node2    <none>           <none>
11pulsar-proxy-2                           1/1     Running   0             14m   10.244.46.93    node3    <none>           <none>
12pulsar-pulsar-manager-7b98666cff-5626f   1/1     Running   0             14m   10.244.63.88    node2    <none>           <none>
13pulsar-recovery-0                        1/1     Running   0             14m   10.244.46.89    node3    <none>           <none>
14pulsar-toolset-0                         1/1     Running   0             14m   10.244.46.91    node3    <none>           <none>
15pulsar-zookeeper-0                       1/1     Running   0             14m   10.244.226.92   node1    <none>           <none>
16pulsar-zookeeper-1                       1/1     Running   0             14m   10.244.63.92    node2    <none>           <none>
17pulsar-zookeeper-2                       1/1     Running   0             13m   10.244.46.94    node3    <none>           <none>

如果后边调整了values.yaml,需要更新部署时,使用下面的命令:

1helm upgrade pulsar pulsar-2.7.7.tgz \
2    --namespace pulsar \
3    -f values.yaml

2.3 在toolset pod中测试创建tenant, namespace和topic

toolset pod中包含了各种管理和测试pulsar的命令行工具,例如pulsar-admin, pulsar-client等。

下面进入toolset pod中,使用pulsar-admin命令行工具测试一下tenant, namespace和topic的创建,进一步确认pulsar集群工作正常。

 1kubectl exec -it -n pulsar pulsar-toolset-0 -- /bin/bash
 2
 3bin/pulsar-admin tenants create test-tenant
 4
 5bin/pulsar-admin tenants list
 6"public"
 7"pulsar"
 8"test-tenant"
 9
10
11bin/pulsar-admin namespaces create test-tenant/test-ns
12
13bin/pulsar-admin namespaces list test-tenant
14"test-tenant/test-ns"
15
16bin/pulsar-admin topics create-partitioned-topic test-tenant/test-ns/test-topic -p 3
17
18bin/pulsar-admin topics list-partitioned-topics test-tenant/test-ns
19"persistent://test-tenant/test-ns/test-topic"

2.4 创建pulsar-manager的管理员用户并登录查看

下面测试一下pulsar manager是否可以使用。

前面使用helm chart部署的pulsar集群,在k8s中创建了下面7个Service。

1kubectl get svc -l app=pulsar -n pulsar
2NAME                    TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
3pulsar-bookie           ClusterIP      None             <none>        3181/TCP,8000/TCP                     40m
4pulsar-broker           ClusterIP      None             <none>        8080/TCP,6650/TCP                     40m
5pulsar-proxy            LoadBalancer   10.104.105.137   <pending>     80:31970/TCP,6650:32631/TCP           40m
6pulsar-pulsar-manager   LoadBalancer   10.110.207.9     <pending>     9527:32764/TCP                        40m
7pulsar-recovery         ClusterIP      None             <none>        8000/TCP                              40m
8pulsar-toolset          ClusterIP      None             <none>        <none>                                40m
9pulsar-zookeeper        ClusterIP      None             <none>        8000/TCP,2888/TCP,3888/TCP,2181/TCP   40m

从上面命令的输出可以看出,bookie, broker, recovery, toolset, zookeeper这5个Service的类型都是ClusterIP的,并且cluser-ip为None,都是Headless的Service,因为它们只需要在k8s集群内部使用。

pulsar-proxy和pulsar-pulsar-manager为LoadBalancer类型,并且都配置了NodePort,提供了从K8S集群外部访问的能力。

从集群外部访问pulsar-manager的地址是http://node1:32764,第一次访问pulsar manager之前,需要为其创建一个管理用户:

1CSRF_TOKEN=$(curl http://node1:32764/pulsar-manager/csrf-token)
2curl \
3   -H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
4   -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
5   -H "Content-Type: application/json" \
6   -X PUT http://node1:32764/pulsar-manager/users/superuser \
7   -d '{"name": "admin", "password": "pulsar", "description": "test", "email": "[email protected]"}'

上面的命令为pulsar-manager创建用户名为admin, 密码为pulsar的管理用户。使用该用户就可以登录pulsar manager。

pulsar-manager-in-k8s.png

备注, 在线上使用时,尽量避免以NodePort暴露服务,这里的pulsar-manager的Service可以修改为CluserIP类型,并关闭NodePort,同时创建Ingress,以Ingress+域名的形式暴露出来。 看了一下pulsar-helm-chart也是支持的,只是目前pulsar-helm-chart 2.7.7中创建Ingress时,使用的是apiVersion: extensions/v1beta1 API,这个API从k8s 1.19被标记为废弃,在k8s 1.22已被移除。 所以要直接是使用pulsar-helm-chart创建Ingress的话,需要等待pulsar-helm-chart的更新。

参考