欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Kafka配置JAAS

时间:2023-04-19
Kafka配置JAAS

推荐本地测试使用,生产环境尽量使用KAFKA监听,或者将此类方法添加定时开关与生产者同步避免循环打印

public static void main(String[] args) { Properties props = new Properties(); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";"); props.put("bootstrap.servers", "***"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("group.id", "***"); props.put("session.timeout.ms", "60000"); props.put("max.poll.records", 1000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism","PLAIN"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("***")); //以下根据业务场景进行逻辑处理 while (true) { long startTime = System.currentTimeMillis(); ConsumerRecords records = consumer.poll(1000); System.out.println(System.currentTimeMillis() - startTime); System.out.println("recieve message number is"+ records.count()); for (ConsumerRecord record : records) { System.out.printf("success" + "offset =%d,key =%s,value =%s,partition = %d %n", record.offset(), record.key(), record.value(), record.partition()); } } }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。