欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

利用JavaAPI实现Kafka的生产者消费者

时间:2023-04-18
一、功能描述

利用Java连接Kafka,通过API实现生产者和消费者,对于Kafka生产或者消费数据。将日志信息进行输出。

二、依赖导入

首先,创建一个简单的maven的工程并将依赖导入

org.apache.kafkakafka-clients${kafka.version}log4jlog4j1.2.17org.slf4jslf4j-log4j121.7.33

三、日志配置

#指定log4j的输出信息log4j.rootLogger=INFO, stdout, logfile#指定log4j的标准输出log4j.appender.stdout=org.apache.log4j.ConsoleAppender#指定log4j的标准输出的样式log4j.appender.stdout.layout=org.apache.log4j.PatternLayout#指定标准输出的转换的格式log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n#指定日志文件的输出log4j.appender.logfile=org.apache.log4j.FileAppender#指定log4j的输出路径文件名log4j.appender.logfile.File=log/hd.log#指定日志日志输出样式log4j.appender.logfile.layout=org.apache.log4j.PatternLayout#指定日志文件的转换格式log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

四、基于Zookeeper的消费者

//进行导包import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Arrays;import java.util.Iterator;import java.util.Properties;public class ZkConsumer { public static void main(String[] args) { //初始化配置信息 Properties config = new Properties(); //定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092"); //定义分组信息,相当于kafka脚本命令的-group config.put(ConsumerConfig.GROUP_ID_CONFIG,"group01"); //定义数据偏移量配置,配置信息有:earliest、latest、none和anything else四种配置 config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //定义自动提交时间,时间单位为ms config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,500); //定义是否开启自动提交 config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //定义消费者的键的反序列化的配置 config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //定义消费者的值的反序列化配置 config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //初始化存放消费者的队列 KafkaConsumer consumer=new KafkaConsumer<>(config); //订阅主题 consumer.subscribe(Arrays.asList("group01-test01")); //循环遍历进行数据获取 while(true){ //迭代器遍历消费者数据 Iterator> it = consumer.poll(Duration.ofMillis(500)).iterator(); //如果还有数据 if(it.hasNext()) { //遍历消费者数据,并数据拼接起来 do { ConsumerRecord record = it.next(); StringBuilder builder = new StringBuilder(); builder.append(record.topic()); builder.append("t"); builder.append(record.partition()); builder.append("t"); builder.append(record.offset()); builder.append("t"); builder.append(record.timestamp()); builder.append("t"); builder.append(record.key()); builder.append("t"); builder.append(record.value()); builder.append("t"); System.out.println(builder.toString()); } while (it.hasNext()); } } //consumer.close(); }}

五、基于Zookeeper的生产者

//导包import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class ZkProducer { public static void main(String[] args) { //初始化配置 Properties config = new Properties(); //定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092"); //定义批次大小信息 config.put(ProducerConfig.BATCH_SIZE_CONFIG,5); //生产者将在请求传输之间到达的任何记录组合成一个批处理请求。 config.put(ProducerConfig.LINGER_MS_CONFIG,1000); //定义确认策略,配置信息有:0、1和all,默认一般为all config.put(ProducerConfig.ACKS_CONFIG,"all"); //定义失败重试的次数 config.put(ProducerConfig.RETRIES_CONFIG,3); //producer -Event Stream->kafka server(java object) //定义生产者键的serialization序列化 config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); //定义生产者的值的序列化 config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//初始化生产者队列 KafkaProducer producer = new KafkaProducer(config);//定义主题 final String TOPIC="kb16-test02"; //定义偏移量 final int PART=0; for (int i = 0; i < 100; i++) { //传入数据进行封装 ProducerRecord record = new ProducerRecord<>(TOPIC,PART,System.currentTimeMillis(),i,"happy new year"+i); //向kafka发送数据 producer.send(record); } //关闭生产者 producer.close(); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。