欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkStreaming+Kafka实现实时数据传输

时间:2023-04-27
Spark Streaming + Kafka 实现实时数据传输

版本说明:Spark 3.0.0Kafka 2.12zookeeper 3.5.7

文章目录

Spark Streaming + Kafka 实现实时数据传输

一、集群端二、IDEA端 一、集群端

前提:配置好并启动三台节点的zookeeper

在三个结点分别配置Kafka

①解压安装包,在安装目录/home/kafka-2.12下创建logs文件夹

②修改./config/vi server.properties配置文件

-----修改部分------##broke全局唯一编号,不能重复broker.id=0 ## 将另外两个节点分别改称1,2##开启topic删除功能delete.topic.enable=true##kafka运行日志存放路径log.dirs=/home/kafka-2.12/logs##配置Zookeeper集群地址zookeerper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181-----以下不用修改----##处理网络请求的线程数量num.network.threads=3##处理磁盘IO线程数量num.io.threads=8##发送套接字的缓冲区大小socket.send.buffer.bytes=102400##接收套接字的缓冲区大小socket.receive.buffer.bytes=102400##请求套接字的缓冲区大小socket.request.max.bytes=104857600##kafka运行日志存放路径log.dirs=/opt/module/kafka/logs##topic在当前broker上的分区个数num.partition=1##用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1##segment文件保留的最长时间,超时将被删除log.retention.hours=168

③配置环境变量

sudo vim /etc/profile##KAFKA_HOMEexport KAFKA_HOME=/home/kafka-2.12export PATH=$PATH:$KAFKA_HOME/binsource vim /etc/profile

ps:如果修改profile不规范导致所有指令失效,可用

export PATH=/usr/bin:/usr/sbin:/bin:/sbin:/usr/X11R6/bin

暂时恢复数据,然后重新进入/etc/profile检查是否有语法错误,进行修改。

④使用预先写好的xsync群发脚本向其他节点分发kafka-2.12

xsync kafka-2.12

注意要修改另外节点的brokerid为1,2

⑤在三台节点分别启动kafka

##hadoop01中./bin/kafka-server-start.sh -daemon config/server.properties##hadoop02中./bin/kafka-server-start.sh -daemon config/server.properties##hadoop03中./bin/kafka-server-start.sh -daemon config/server.properties

可写好shell群发脚本方便群起

for i in hadoop01 hadoop02 hadoop03doecho"======$i======"ssh $i "/home/kafka-2.12/bin/kafka-server-start.sh -daemon config/server.properties"done

⑥创建一个新的topic,向streaming发送数据

kafka2.12创建新topic指令要加入 --bootstrap-server,老版本可能是–zookeeper

[root@hadoop01 ~]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --create --topic atguiguNew

查看kafka中存在的list

[root@hadoop01 ~]# bin/kafka-topics.sh --list --bootstrap-server hadoop01:9092

⑦等待IDEA端Spark Streaming与集群段kafka连接

⑧Idea程序运行后等待producer产生数据

producer产生数据

[root@hadoop01 kafka-2.12]# bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic atguiguNew>hello>hello>hey

二、IDEA端

前提:配置好spark环境,添加了Spark Streaming依赖等

①在maven项目中的pom.xml文件中添加spark-streaming-kafka依赖

org.apache.spark spark-streaming-kafka-0-10_2.12 3.0.0

②创建scala object,根据所需方法导入对应jar包

package com.atguigu.bigdata.spark.streamingimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming04_Kafka { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3))//kafka相关参数 val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "hadoop01:9092,hadoop02:9092,hadoop03:9092", ConsumerConfig.GROUP_ID_ConFIG -> "skTest",//consumer group "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc,//上下文环境对象 LocationStrategies.PreferConsistent,//采集节点和计算节点的位置匹配 ConsumerStrategies.Subscribe[String, String](Set("skTest"), kafkaPara)//set中是kafka中topic名称 ) kafkaDataDS.map(_.value()).print() ssc.start() ssc.awaitTermination() }}

③在集群端输入数据,观察idea端控制台采集结果

内容参考:尚硅谷大数据Spark教程从入门到精通、尚硅谷大数据kafka快速入门

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。