欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

kafka-12-Kafka消息时间戳kafkamessagetimestamp

时间:2023-04-16

参考Kafka消息时间戳(kafka message timestamp)

1 Kafka消息的时间戳

在消息中增加了一个时间戳字段和时间戳类型。
目前支持的时间戳类型有两种:CreateTime和LogAppendTime。
前者表示producer创建这条消息的时间;
后者表示broker接收到这条消息的时间(严格来说,是leader broker将这条消息写入到log的时间)。

2 客户端消息格式的变化

ProducerRecord:增加了timestamp字段,允许producer指定消息的时间戳,如果不指定的话使用producer客户端的当前时间。
ConsumerRecord:增加了timestamp字段,允许消费消息时获取到消息的时间戳。

2.1 获取时间戳

from kafka import KafkaConsumerglobal false, null, truefalse = null = true = ''import timedef ms_tostamp(stampint): c = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(stampint/ 1000))) return c + "." + str(stampint)[-3:]consumer = KafkaConsumer(bootstrap_servers=['10.80.62.52:9092'], auto_offset_reset='latest', group_id="group1")consumer.subscribe('yourtest')for msg in consumer: try: if msg.value is not None: print(ms_tostamp(msg.timestamp)) data_json = msg.value.decode() print(data_json) except Exception as e: print(e)print("finish")

输出
2022-02-22 14:32:26.291
aa

2.2 获取最新偏移量的消息时间

from kafka import KafkaConsumer, TopicPartitionglobal false, null, truefalse = null = true = ''import timedef ms_tostamp(stampint): c = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(stampint/ 1000))) return c + "." + str(stampint)[-3:]topic = 'topicname'consumer = KafkaConsumer(bootstrap_servers=['IP:9092'])# (1)获取指定分区的最新偏移量topic_offset_dict = consumer.end_offsets([TopicPartition(topic, 0)])newoffset = topic_offset_dict[TopicPartition(topic, 0)]print("最新偏移量",newoffset)# (2)手动指定分区给consumer去消费consumer.assign([TopicPartition(topic, 0)])# (3)针对分区,从指定抓取的偏移量处开始消费# newoffset-1是因为newoffset的数据还未进来,减去1才是最新的数据consumer.seek(TopicPartition(topic, 0), newoffset-1)for msg in consumer: try: if msg.value is not None: print("消息时间",ms_tostamp(msg.timestamp)) print("本条offset",msg.offset) data_json = msg.value.decode() print("消息内容",data_json) break # 取到一条消息就退出 except Exception as e: print(e)print("finish")consumer.close()

输出

最新偏移量 67661482消息时间 2022-02-22 16:15:06.744本条offset 67661481消息内容 messagefinish

3 监控指定topic的最新消息时间

from kafka import KafkaConsumer, TopicPartitionglobal false, null, truefalse = null = true = ''import timedef ms_tostamp(stampint): c = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(stampint/ 1000))) return c + "." + str(stampint)[-3:]consumer = KafkaConsumer(bootstrap_servers=['10.26.10.113:9092'])#方式一指定topic列表topics = ['SB_01', 'SB_02', 'SB_03', 'SB_50', 'SB_20', 'SB_30', 'SB_08' ]def get_topic_msgNewTime(topics): try: for topic in topics: # (1)获取指定分区的最新偏移量 topic_offset_dict = consumer.end_offsets([TopicPartition(topic, 0)]) newoffset = topic_offset_dict[TopicPartition(topic, 0)] # print("最新偏移量",newoffset) # (2)手动指定分区给consumer去消费 consumer.assign([TopicPartition(topic, 0)]) # (3)针对分区,从指定抓取的偏移量处开始消费 # newoffset-1是因为newoffset的数据还未进来,减去1才是最新的数据 consumer.seek(TopicPartition(topic, 0), newoffset-1) for msg in consumer: if msg.value is not None: newtime_str = ms_tostamp(msg.timestamp) print(topic,"最新消息时间",newtime_str) # print("本条offset",msg.offset) # data_json = msg.value.decode() # print("消息内容",data_json) break # 取到一条消息就退出 except Exception as e: print(e)get_topic_msgNewTime(topics)consumer.close()print("finish")

输出

SB_01 最新消息时间 2022-02-22 16:37:09.234SB_02 最新消息时间 2022-02-22 16:34:39.021SB_03 最新消息时间 2022-02-22 16:35:05.777SB_50 最新消息时间 2022-02-22 16:30:42.775SB_20 最新消息时间 2022-02-22 16:37:20.060SB_30 最新消息时间 2022-02-22 16:37:20.337SB_08 最新消息时间 2022-02-22 16:35:35.935finish

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。