欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

kafka生产者

时间:2023-07-13
kafka-生产者 1、生产者案例

使用Java程序编写kafka生产者

程序案例:

package cn.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class kafkaProducer { public static Properties initConfig(){ Properties properties = new Properties(); // 配置kafka集群的Broker访问地址 properties.put("bootstrap.servers","hadoop001:9092,hadoop002:9092"); // 设置key和value的序列化方法,使用StringSerializer // 注意:这里必须使用全类名 properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 以上三个参数是必选参数,其他参数可按照实际情况配置 return properties; } public static void main(String[] args){ Properties pro = initConfig(); // 创建kafka生产者 KafkaProducer kafkaProducer = new KafkaProducer(pro); String topic = "log_topic"; // 封装消息 ProducerRecord record = new ProducerRecord(topic, "Hello world"); // 发送消息 kafkaProducer.send(record); // 关闭生产者 kafkaProducer.close(); }}

上面是最简单的生产者案例。

2、配置分析

从上面的配置方法可以看出,我们需要配置bootstrap.server、key.serializer、value.serializer,这些参数名容易写错,因此有ProducerConfig类帮我们写好常量。

例如:

String servers = "hadoop001:9092,hadoop002:9092";properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);

后面的全类名也容易出错,可以使用Class类获取Name

修改后的intiConfig方法:

public static Properties initConfigPro(){ Properties properties = new Properties(); String servers = "hadoop001:9092,hadoop002:9092"; properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); return properties;}

修改后对于编写更加容易方便,只用记住大概的参数即可。

3、消息发送分析

发送消息前,我们需要将需要发送的信息封装到ProducerRecord类中,然后进行发送。

ProducerRecord类常用构造方法:

这里可以看出,我们不光可以发送value,还可以设置key,Header等。

消息发送的方式:发后即忘、同步、异步

发后即忘:

​ 这种方式就是直接向kafka集群发送信息,不管信息是否能够发送成功。因此该种方式的速度最快,性能最后。但在有些场景(出现异常等)会造成数据丢失。

上面的案例程序就是同步发送。

同步发送:生产者对象使用send()方法后会返回Future对象,利用Future对象的get方法检测发送结果。若发送成功,get方法可以获取Recordmetadata对象,里面存储消息的相关信息,若发送失败则会抛出异常。

// 发送消息Future future = kafkaProducer.send(record);try { Recordmetadata recordmetadata = future.get(); // 返回各种元数据信息 System.out.println(recordmetadata.offset());} catch (InterruptedException e) { e.printStackTrace();} catch (ExecutionException e) { e.printStackTrace();}

异步发送:生产者调用send方法时,我们可以指定一个回调函数,服务器返回信息时会调用该方法,在回调方法里,我们可以判断消息的发送状况。

producer.send(new ProducerRecord("topictest", "hello kafka" + i, i) , new Callback() { public void onCompletion(Recordmetadata recordmetadata, Exception e) { // 这里可以进行返回信息的判断,和后续操作。 if(e!=null){ e.printStackTrace(); } } });

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。