目录
一、阻塞队列
二、Kafka入门
2.1 Kafka概念
2.2 Kafka下载及配置
三、Spring整合Kafka
一、阻塞队列
在学习Kafka之前,我们需要先了解阻塞队列、以及生产者和消费者模式。
图中:Thread-1为生产者,put往队列里存数据,当队列已满时,该方法将阻塞;
Thread-2为消费者,take从队列里取数据,当队列已空时,该方法将阻塞;
阻塞队列用于解决线程异步通信的问题,同时在生产者和消费者之间建立了缓冲,提高了系统的性能。
阻塞队列BlockingQueue是一个接口,我们需要通过其实现类来调用。具体实现如下:
【生产者线程】
class Producer implements Runnable { // 传入阻塞队列,把线程交予阻塞队列管理 private BlockingQueue
【消费者线程】
class Consumer implements Runnable { // 传入阻塞队列,把线程交予阻塞队列管理 private BlockingQueue
【main函数实例化阻塞队列、生产者和消费者】
public static void main(String[] args) { // 实例化阻塞队列,生产者和消费者共用一个阻塞队列 BlockingQueue queue = new ArrayBlockingQueue(10);// 默认长度为10 // 实例化生产者线程 new Thread(new Producer(queue)).start(); // 实例化消费者线程 new Thread(new Consumer(queue)).start(); new Thread(new Consumer(queue)).start(); new Thread(new Consumer(queue)).start();}
运行main函数,观察输出结果:
二、Kafka入门 2.1 Kafka概念
【Kafka简介】
Kafka是一个分布式的流媒体平台。
应用:消息系统、日志收集、用户行为追踪、流式处理
【Kafka特点】
高吞吐量:处理数据的能力强,可以处理TB级的海量数据,即使是非常普通的硬件Kafka也可以支持每秒数百万的消息消息持久化:通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
有人会有疑问:读取硬盘中的数据岂不是比读取内存中的数据慢很多?为什么还能保持高性能呢? 实际上,读写硬盘数据性能的高与低,取决于对硬盘的使用,对硬盘的顺序读写的性能其实是很高的,甚至高于内存对数据的随机读写。而Kafka就是采用了对硬盘的顺序读写。高可靠性:分布式服务器,可以做集群部署,有容错的能力。高扩展性:简单的配置即可增加服务器
【Kafka消费模式】
Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个发送一个接收。第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。
一对一
消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
一对多
这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
【Kafka术语】
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为brokerTopic:主题。生产者把消息发布到的位置/空间,可以理解为一个文件夹,用于存放消息的位置Partition:分区。对主题位置的一个分区。每个分区都从队尾追加数据offset:索引。消息在分区内存放的索引。Consumer Group:消费者组。消费者组则是一组中存在多个消费者。消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费。Leader Replica:主副本。对分区数据做备份,提高容错率。当消费者尝试获取数据时,可以做出响应,提供数据。Follower Replica:随从副本。只是备份,不做响应。实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。Zookeeper:独立的软件,管理Kafka的集群
2.2 Kafka下载及配置
【下载】
官网下载地址:Apache Kafka
根据建议点击链接即可下载。之后解压到某一文件夹即可。
【配置】
配置config/zookeeper.properties文件
配置config/server.properties文件
【启动】
启动zookeeper(内置):打开命令行,cd到kafka安装目录下,输入以下命令:
binwindowszookeeper-server-start.bat configzookeeper.properties
启动Kafka:再打开一个命令行,cd到kafka安装目录下,输入以下命令
binwindowskafka-server-start.bat configserver.properties
三、Spring整合Kafka
【引入依赖】
【配置Kafka】
在application.properties中添加如下配置:
# KafkaPropertiesspring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=3000
【测试代码】
Kafka生产者:
@Componentclass KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String content) { kafkaTemplate.send(topic, content); }}
Kafka消费者:
@Componentclass KafkaConsumer { @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); }}
实现功能:
@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() { kafkaProducer.sendMessage("test", "你好"); kafkaProducer.sendMessage("test", "在吗"); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); }}