欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

TDMQ/pulsargolang快速入门教程

时间:2023-05-13
TDMQ/pulsar golang 快速入门教程 架构: pulsar 对比 kafka kafka

kafka 由 zookeeper 和 broker 集群注册, broker 集群负责计算和储存消息, zookeeper 为注册中心(Kafka2.8就能不依赖zookeeper独立运行了, 部署还是比较方便的)

pulsar

pulsar 比 kafka 的架构更为复杂, 部署也是更加复杂

pulsar 是计算储存分离架构, 计算使用 broker 集群(是无状态的) 储存使用 bookeeper 集群, broker 计算要使用 bookeeper 的数据都是要内置 bookeeper 客户端pulsar 的分离架构更具有伸缩性pulsar 至今最新版本还是需要依赖于 zookeeper pulsar 的 4 种消费模型

独占模式(Exclusive): 一个 topic 只能有一个消费者订阅, 多个消费者订阅就会报错灾备模式(Failover): 一个 topic 可以多个消费者订阅, 但是只有一个生效, 其他的作为容灾备份使用共享订阅(Shared): 最常用的订阅模式, 一个 topic 可以被多个消费者订阅, 每个消息轮询发给其中的一个消费者key_shared: 共享订阅 + key 限制, 每个消息只会发送给绑定相同 key 的消费者

这里一般使用共享订阅 shared

TDMQ 使用教程

pulsar 的搭建十分繁琐, 如果搭建单机版本的话就根本发挥不了 pulsar 的高可用的特性

下面的示例是基于 腾讯云的 TDMQ (底层是 pulsar) 共享订阅模式的的消息传输

1、创建 虚拟消息队列 TDMQ

打开腾讯云控制台 -> 搜索 tdmq -> 进入消息队列 TDMQ -> 集群管理 -> 新建集群

新建的是虚拟集群,按量计费, 做测试开发一般够用的

如果你的开发服务器是再 vpc 内网里面就不建议接入公网, 更安全且拥有更高的带宽、如果要接入公网你需提交申请

2、创建命名空间

命名空间 -> 选择当前集群为新建的集群 -> 新建命名空间

输入命名空间的名称

选择消息的 TTL 默认 1 天, TTL 过期了消费者还是没有 ACK 该消息, 消息就会过期

消息保留策略一般就是消费及删除

3、创建角色并授权 在 TDMQ Pulsar 版控制台的 角色管理 页面,选择地域和刚刚创建好的集群,单击新建进入新建角色页面。填写角色名称和说明,单击提交完成角色创建。进入 命名空间 页面,在刚刚创建的命名空间中,单击操作列的配置权限进入命名空间的权限列表。在配置权限页面,单击添加角色,将刚刚创建的角色添加进来,分配生产和消费的权限。

在命名空间这里配置权限

添加角色

我们选择刚刚添加的角色, 然后基于 生产消息和消费消息的权限

我们可以只是将 2 个角色分开, 也能防止循环依赖

添加成功

4、创建 topic

点击 topic -> 新建

设置 topic 名称

消息类型

各种消息类型区别如下

分区数: 分区能提升单个 topic 的吞吐量

编写代码: 生产者 Producer

package mainimport ("context""fmt""log""strconv""time""github.com/apache/pulsar-client-go/pulsar")var (//TODO:角色权限的tokentoken = "{token}"//TODO: 消息队列的id/命名空间名称/topic名称topic = "{pulsar-name}/{namespace-name}/{topic-name}")//就是说消费者要先链接才行,//1.游标的问题//2.share模式问题func main() {//1、clientclient, err := pulsar.NewClient(pulsar.ClientOptions{URL: "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)Authentication: pulsar.NewAuthenticationToken(token),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil {log.Fatalf("Could not instantiate Pulsar client: %v", err)} else {fmt.Printf("ok=%#vn", "ok")}defer client.Close()//------producer, err := client.CreateProducer(pulsar.ProducerOptions{Topic: topic,})if err != nil {log.Fatal(err)}for i := 0; i < 100; i++ {_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{Payload: []byte("a" + strconv.Itoa(i)),})}defer producer.Close()if err != nil {fmt.Println("Failed to publish message", err)}fmt.Println("Published message")}

编写代码: 消费者 Consumer

package mainimport ("context""fmt""log""time""github.com/apache/pulsar-client-go/pulsar")var (//TODO:角色权限的tokentoken = "{token}"//TODO: 消息队列的id/命名空间名称/topic名称topic = "{pulsar-name}/{namespace-name}/{topic-name}")//就是说消费者要先链接才行,//1.游标的问题//2.share模式问题func main() {//1、clientclient, err := pulsar.NewClient(pulsar.ClientOptions{URL: "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)Authentication: pulsar.NewAuthenticationToken(token),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil {log.Fatalf("Could not instantiate Pulsar client: %v", err)} else {fmt.Printf("ok=%#vn", "ok")}defer client.Close()//2、consumer -------consumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: topic,SubscriptionName: "{sub-name}", //TODO:消费者的名称,这里可以没有就新建//shared 类型Type: pulsar.Shared,})if err != nil {log.Fatal(err)}defer consumer.Close()for {msg, err := consumer.Receive(context.Background())if err != nil {log.Fatal(err)}fmt.Printf("Received message msgId: %#v -- content: '%s'n",msg.ID(), string(msg.Payload()))consumer.Ack(msg)}}

参数详解

上面的 todo 包含的位置都是需要自己填写删除的

{token}: 角色的授权

{pulsar-name}: 虚拟集群的id

{namespace-name}: 命名空间名称

{topic-name}: topic 名称

{url}: 访问地址

{sub-name}: 消费者的名称, 不存在将会新建

详情查看文档

https://cloud.tencent.com/document/product/1179/44814)

https://pulsar.staged.apache.org/docs/zh-CN

reference

Pulsar真的可以取代Kafka吗 - 知乎 (zhihu.com): https://zhuanlan.zhihu.com/p/370213273

pulsar 官网: https://pulsar.staged.apache.org/docs/zh-CN/standalone-docker

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。