kafka 由 zookeeper 和 broker 集群注册, broker 集群负责计算和储存消息, zookeeper 为注册中心(Kafka2.8就能不依赖zookeeper独立运行了, 部署还是比较方便的)
pulsarpulsar 比 kafka 的架构更为复杂, 部署也是更加复杂
pulsar 是计算储存分离架构, 计算使用 broker 集群(是无状态的) 储存使用 bookeeper 集群, broker 计算要使用 bookeeper 的数据都是要内置 bookeeper 客户端pulsar 的分离架构更具有伸缩性pulsar 至今最新版本还是需要依赖于 zookeeper pulsar 的 4 种消费模型
独占模式(Exclusive): 一个 topic 只能有一个消费者订阅, 多个消费者订阅就会报错灾备模式(Failover): 一个 topic 可以多个消费者订阅, 但是只有一个生效, 其他的作为容灾备份使用共享订阅(Shared): 最常用的订阅模式, 一个 topic 可以被多个消费者订阅, 每个消息轮询发给其中的一个消费者key_shared: 共享订阅 + key 限制, 每个消息只会发送给绑定相同 key 的消费者
这里一般使用共享订阅 shared
TDMQ 使用教程pulsar 的搭建十分繁琐, 如果搭建单机版本的话就根本发挥不了 pulsar 的高可用的特性
下面的示例是基于 腾讯云的 TDMQ (底层是 pulsar) 共享订阅模式的的消息传输
1、创建 虚拟消息队列 TDMQ打开腾讯云控制台 -> 搜索 tdmq -> 进入消息队列 TDMQ -> 集群管理 -> 新建集群
新建的是虚拟集群,按量计费, 做测试开发一般够用的
2、创建命名空间如果你的开发服务器是再 vpc 内网里面就不建议接入公网, 更安全且拥有更高的带宽、如果要接入公网你需提交申请
命名空间 -> 选择当前集群为新建的集群 -> 新建命名空间
输入命名空间的名称
选择消息的 TTL 默认 1 天, TTL 过期了消费者还是没有 ACK 该消息, 消息就会过期
消息保留策略一般就是消费及删除
3、创建角色并授权 在 TDMQ Pulsar 版控制台的 角色管理 页面,选择地域和刚刚创建好的集群,单击新建进入新建角色页面。填写角色名称和说明,单击提交完成角色创建。进入 命名空间 页面,在刚刚创建的命名空间中,单击操作列的配置权限进入命名空间的权限列表。在配置权限页面,单击添加角色,将刚刚创建的角色添加进来,分配生产和消费的权限。在命名空间这里配置权限
添加角色
我们选择刚刚添加的角色, 然后基于 生产消息和消费消息的权限
我们可以只是将 2 个角色分开, 也能防止循环依赖
添加成功
4、创建 topic点击 topic -> 新建
设置 topic 名称
消息类型
各种消息类型区别如下
分区数: 分区能提升单个 topic 的吞吐量
编写代码: 生产者 Producerpackage mainimport ("context""fmt""log""strconv""time""github.com/apache/pulsar-client-go/pulsar")var (//TODO:角色权限的tokentoken = "{token}"//TODO: 消息队列的id/命名空间名称/topic名称topic = "{pulsar-name}/{namespace-name}/{topic-name}")//就是说消费者要先链接才行,//1.游标的问题//2.share模式问题func main() {//1、clientclient, err := pulsar.NewClient(pulsar.ClientOptions{URL: "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)Authentication: pulsar.NewAuthenticationToken(token),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil {log.Fatalf("Could not instantiate Pulsar client: %v", err)} else {fmt.Printf("ok=%#vn", "ok")}defer client.Close()//------producer, err := client.CreateProducer(pulsar.ProducerOptions{Topic: topic,})if err != nil {log.Fatal(err)}for i := 0; i < 100; i++ {_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{Payload: []byte("a" + strconv.Itoa(i)),})}defer producer.Close()if err != nil {fmt.Println("Failed to publish message", err)}fmt.Println("Published message")}
编写代码: 消费者 Consumerpackage mainimport ("context""fmt""log""time""github.com/apache/pulsar-client-go/pulsar")var (//TODO:角色权限的tokentoken = "{token}"//TODO: 消息队列的id/命名空间名称/topic名称topic = "{pulsar-name}/{namespace-name}/{topic-name}")//就是说消费者要先链接才行,//1.游标的问题//2.share模式问题func main() {//1、clientclient, err := pulsar.NewClient(pulsar.ClientOptions{URL: "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)Authentication: pulsar.NewAuthenticationToken(token),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil {log.Fatalf("Could not instantiate Pulsar client: %v", err)} else {fmt.Printf("ok=%#vn", "ok")}defer client.Close()//2、consumer -------consumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: topic,SubscriptionName: "{sub-name}", //TODO:消费者的名称,这里可以没有就新建//shared 类型Type: pulsar.Shared,})if err != nil {log.Fatal(err)}defer consumer.Close()for {msg, err := consumer.Receive(context.Background())if err != nil {log.Fatal(err)}fmt.Printf("Received message msgId: %#v -- content: '%s'n",msg.ID(), string(msg.Payload()))consumer.Ack(msg)}}
参数详解上面的 todo 包含的位置都是需要自己填写删除的
{token}: 角色的授权
{pulsar-name}: 虚拟集群的id
{namespace-name}: 命名空间名称
{topic-name}: topic 名称
{url}: 访问地址
{sub-name}: 消费者的名称, 不存在将会新建
详情查看文档https://cloud.tencent.com/document/product/1179/44814)
https://pulsar.staged.apache.org/docs/zh-CN
referencePulsar真的可以取代Kafka吗 - 知乎 (zhihu.com): https://zhuanlan.zhihu.com/p/370213273
pulsar 官网: https://pulsar.staged.apache.org/docs/zh-CN/standalone-docker