apache pulsar订阅类型:KeyShared与Shared行为分析
apache pulsar订阅类型:KeyShared与Shared行为分析
apache pulsar订阅类型:KeyShared与Shared行为分析
订阅类型
参见以下图片
生产者
- 生产者参数
Topic
: 指定消息的发送目的地
- 消息
- 消息发送方式:
send
,阻塞式发送 - 消息参数
pulsar.ProducerMessage
Payload
: 实际上携带的数据部分,比如我想发送hello world
,放在该字段中即可
- 消息发送方式:
消费者
- 消费者参数
Topic
: 指定订阅的topicSubscriptionName
: 指定订阅的名字Type
: 订阅类型,有以下四种Exclusive
Shared
Failover
KeyShared
订阅行为分析
订阅行为分析皆基于生产者和消费都往同一个Topic
发送和收取消息。
以下将消费者简记为C
,某一个消费者记作Ci
,比如C1
,C2
,C3
。
将生产者简记为P
,某一个生产者记作Pi
,比如P1
,P2
,P3
。
生产者P
表示泛指,表示生产者这个身份或者生产者这个群体。
消费者C
表示泛指,表示消费者这个身份或者消费者这个群体。
- 消费者的
SubscriptionName
相同,比如C1.SubscriptionName="my-sub1"
,C2.SubscriptionName="my-sub1"
- 即同一
SubscriptionName
下不允许指定不同的Type
类型,比如C1.Type=Shared C2.Type=KeyShared
。C1
、C2
的Type
必须是相同的。 - 同一
SubscriptionName
下不同Type
类型的消息发送模式如下:Shared模式
下:消息的发送是随机的- 假定有生产者
P1
,消费者C1
,C2
P1
往Topic
发送消息10
条消息,假定编号为0-9
- 消费者收到消息的可能性结果是随机的,以下结果都是可能的
C1
收到0 2 5
; C2收到1 3 4 6 7 8 9
C1
收到0 1 2 3 4 5
;C2
收到6 7 8 9
- 以下省略其他可能
- 假定有生产者
KeyShared模式
下:若消息一开始就分配给了某个消费者,后续的消息就会一直发往这个消费者- 假定有生产者
P1
,消费者C1
,C2
P1
往Topic
发送消息10
条消息,假定编号为0-9
- 假定
C1
收到消息了(会收到全部消息:0-9
),那么生产者后续生产的消息会全部发往C1
,由C1
进行处理,C2
则收不到任何消息 - 此时如果
C1
宕机了,那么消息才会被发到C2
由C2
处理
- 假定有生产者
- 即同一
- 消费者的
SubscriptionName
不同,比如C1.SubscriptionName="my-sub1"
,C2.SubscriptionName="my-sub2"
- 生产者生产的消息都会发送到两个订阅区,比如
my-sub1
,my-sub2
,然后由订阅区消费者的Type类型决定该订阅区中的消息该怎么发送 - 同一个订阅区(即
SubscriptionName
相同)下不允许指定不同的Type
类型 - 相同订阅区的消息发送模式
Shared模式
与KeyShared模式
参照上面所阐述的案例
- 生产者生产的消息都会发送到两个订阅区,比如
生产者消费定义
角色 | Topic | SubscriptionName |
---|---|---|
生产者 producer | my-topic | - |
消费者 sharedConsumer1 | my-topic | my-sub |
消费者 sharedConsumer2 | my-topic | my-sub |
消费者 keySharedConsumer1 | my-topic | my-sub2 |
消费者 keySharedConsumer2 | my-topic | my-sub2 |
“-” 表示不指定SubscriptionName,生产者不需要指定该参数,消费者需要
结果
下面给出我本机运行的结果。代码就不贴了,关于如何定义生产者和消费者的代码请参考官方文档:https://pulsar.apache.org/docs/2.11.x/client-libraries-go/#producer-operations
producer
发送0-8总共9条消息sharedConsumer1
收到消息0 3 4 5topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-0, ID=35:63:0, publishTime=2023-04-06 11:37:07.769 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-3, ID=35:66:0, publishTime=2023-04-06 11:37:07.795 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-4, ID=35:67:0, publishTime=2023-04-06 11:37:07.799 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-5, ID=35:68:0, publishTime=2023-04-06 11:37:07.803 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil>
sharedConsumer2
收到消息1 2 6 7 8topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-1, ID=35:64:0, publishTime=2023-04-06 11:37:07.783 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-2, ID=35:65:0, publishTime=2023-04-06 11:37:07.791 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-6, ID=35:69:0, publishTime=2023-04-06 11:37:07.807 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-7, ID=35:70:0, publishTime=2023-04-06 11:37:07.81 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-8, ID=35:71:0, publishTime=2023-04-06 11:37:07.814 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil>
keySharedConsumer1
没有收到任何消息keySharedConsumer2
收到消息 0 1 2 3 4 5 6 7 8topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-0, ID=35:63:0, publishTime=2023-04-06 11:37:07.769 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-1, ID=35:64:0, publishTime=2023-04-06 11:37:07.783 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-2, ID=35:65:0, publishTime=2023-04-06 11:37:07.791 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-3, ID=35:66:0, publishTime=2023-04-06 11:37:07.795 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-4, ID=35:67:0, publishTime=2023-04-06 11:37:07.799 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-5, ID=35:68:0, publishTime=2023-04-06 11:37:07.803 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-6, ID=35:69:0, publishTime=2023-04-06 11:37:07.807 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-7, ID=35:70:0, publishTime=2023-04-06 11:37:07.81 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil> topic:persistent://public/default/my-topic, producerName:standalone-30-7, properties:map[], payload:hello-8, ID=35:71:0, publishTime=2023-04-06 11:37:07.814 +0800 CST, eventTime=1970-01-01 08:00:00 +0800 CST, key=my-key, orderingKey=, redeliveryCount=0, isReplicated=false, replicatedFrom=, schemaVersion=, index=<nil>, brokerPublishTime=<nil>