欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

同一服务Kafka多数据源配置

时间:2023-05-04
同一服务Kafka多数据源配置 一个微服务项目,业务需要,引入两套kafka集群的配置

1.xml文件加入新老Kafka集群配置信息
2.添加KafkaTemplateConfig bean文件
3.注解@Qualifier(“kafkaTemplateNew”)使用

@EnableKafka@Configurationpublic class KafkaTemplateConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka-new.bootstrap-servers}") private String newBootstrapServers; @Bean @Primary public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean(name="kafkaTemplateNew") public KafkaTemplate kafkaTemplateNew() { return new KafkaTemplate<>(newProducerFactory()); } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(true)); } @Bean public ProducerFactory newProducerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(false)); } public Map producerConfigs(boolean isPrimary) { Map props = new HashMap<>(); // 指定多个kafka集群多个地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, isPrimary?bootstrapServers:newBootstrapServers); // 重试次数,0为不启用重试机制 props.put(ProducerConfig.RETRIES_CONFIG, 0); //同步到副本, 默认为1 // acks=0 把消息发送到kafka就认为发送成功 // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 props.put(ProducerConfig.ACKS_CONFIG, "1"); // 生产者空间不足时,send()被阻塞的时间,默认60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 控制批处理大小,单位为字节 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB) props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576); // 键的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 值的序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。 // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none"); return props; }}

@Autowired @Qualifier("kafkaTemplateNew") private KafkaTemplate kafkaTemplateNew;

@Autowired private KafkaTemplate kafkaTemplate;

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。