欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

kafka整合springMVC

时间:2023-06-19
kafka整合springMVC(消费者)

由于公司项目是mvc的,所以不得不用mvc进行整合,mvc的xml配置属实头疼,头疼也得开始搞。

开始之前,需要先在虚拟机安装环境,网上教程很多,我就不介绍了,但是有几个配置文件需要注意,如下:

首先是需要关闭虚拟机防火墙

systemctl stop firewalld

kafka的配置文件server.properties,大约三十多行的地方,两个地址,需要填你自己的ip地址(我的虚拟机地址)

还有一个consumer.properties中的,一个consumer group.id

还有一个topic信息,网上大多使用的是如下命令查看所有的topic信息,我的不知道为啥不行,我在zookeeper中进行查看

./kafka-topics.sh --list --zookeeper 192.168.153.134:2181

进入zookeeper安装bin目录,执行如下命令

./zkCli.sh

之后进入命令行

ls /brokers/topics

其中的test,test1是我自己新建的topic

项 目部分

首先是pom文件
添加如下依赖

org.apache.kafkakafka-clients0.11.0.1org.springframework.kafkaspring-kafka2.0.4.RELEASE

新建配置文件,application-kafka.xml

<?xml version="1.0" encoding="UTF-8"?>

上边配置文件中指定了两个监听tpoic的监听类

配置文件介绍:

consumerProperties: kafka的配置bean,配置参数都在这里

messageListernerConsumerService、messageListernerConsumerService2: 声明两个监听类

consumerFactory: 把consumerProperties配置的参数加载进来

containerProperties和containerProperties2: 第一个是test,第二个是test1,分别绑定两个topic,指定监听类需要监听kafka服务中的那个topic

messageListenerContainer1和messageListenerContainer2: 把topic和监听类的绑定信息加载进服务中

监听类如下:

继承了MessageListener,需要实现其中的onMessage方法,上边bean配置中配置了两个监听类,所以需要创建出两个监听类,其中一个如下

public class KafkaConsumerListener implements MessageListener { @Override public void onMessage(ConsumerRecord integerStringConsumerRecord) { System.err.println("============="); System.err.println(integerStringConsumerRecord.value()); }}

我本地的是Linux环境,启动zookeeper,启动kafka
打开kafka消息提供者控制台,准备推送消息
这个ip地址就是你在配置文件中填的地址,–topic后边就是kafka的topic

./kafka-console-producer.sh --broker-list 192.168.153.134:9092 --topic test


输入你好,回车后返回控制台可以查看到信息

基础整合完毕

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。