欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

sparkstream3.0.0scala版本读取kafka消息统计写入mysql

时间:2023-05-13

本案例从kafka读取数据,执行业务处理,然后将处理结果数据写入mysql。

1、添加依赖

org.apache.spark spark-core_2.12 3.0.0 org.apache.spark spark-streaming_2.12 3.0.0 org.apache.spark spark-streaming-kafka-0-10_2.12 3.1.0 com.alibaba druid 1.1.10 mysql mysql-connector-java 8.0.11 com.fasterxml.jackson.core jackson-core 2.10.1

2、主程序代码

package com.demo.blacklistimport com.demo.utils.MyKafkaUtilimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}object RealTimeApp { def main(args: Array[String]): Unit = { //1.创建 SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("RealTimeApp").setMaster("local[*]") //2.创建 StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) //3.读取数据// val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =// MyKafkaUtil.getKafkaStream("ads_log", ssc) val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream("test", ssc) //4.将从 Kafka 读出的数据转换为样例类对象 val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => { val value: String = record.value() val arr: Array[String] = value.split(" ") Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4)) }) //5.需求一:根据 MySQL 中的黑名单过滤当前数据集 val filterAdsLogDStream: DStream[Ads_log] = BlackListHandler.filterByBlackList(adsLogDStream) //6.需求一:将满足要求的用户写入黑名单 BlackListHandler.addBlackList(filterAdsLogDStream) //测试打印 filterAdsLogDStream.cache() filterAdsLogDStream.count().print() //启动任务 ssc.start() ssc.awaitTermination() }}

3、kafka消费者工具类

package com.demo.utilsimport com.demo.PropertiesUtilimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import java.util.Propertiesobject MyKafkaUtil { //1.创建配置信息对象 private val properties: Properties = PropertiesUtil.load("config.properties") //2.用于初始化链接到集群的地址 val broker_list: String = properties.getProperty("kafka.broker.list") //3.kafka 消费者配置 val kafkaParam = Map( "bootstrap.servers" -> broker_list, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //消费者组 "group.id" -> "commerce-consumer-group", //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性 //可以使用这个配置,latest 自动重置偏移量为最新的偏移量 "auto.offset.reset" -> "latest", //如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据 //如果是 false,会需要手动维护 kafka 偏移量 "enable.auto.commit" -> (true: java.lang.Boolean) ) // 创建 DStream,返回接收到的输入数据 // LocationStrategies:根据给定的主题和集群地址创建 consumer // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区 // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer // ConsumerStrategies.Subscribe:订阅一系列主题 def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = { val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam)) dStream }}

4、mysql读写工具类

package com.demo.utilsimport java.sql.{Connection, PreparedStatement, ResultSet}import java.util.Propertiesimport javax.sql.DataSourceimport com.alibaba.druid.pool.DruidDataSourceFactoryimport com.demo.PropertiesUtilobject JdbcUtil { //初始化连接池 var dataSource: DataSource = init() //初始化连接池方法 def init(): DataSource = { val properties = new Properties() val config: Properties = PropertiesUtil.load("config.properties") properties.setProperty("driverClassName", "com.mysql.jdbc.Driver") properties.setProperty("url", config.getProperty("jdbc.url")) properties.setProperty("username", config.getProperty("jdbc.user")) properties.setProperty("password", config.getProperty("jdbc.password")) properties.setProperty("maxActive", config.getProperty("jdbc.datasource.size")) DruidDataSourceFactory.createDataSource(properties) } //获取 MySQL 连接 def getConnection: Connection = { dataSource.getConnection } //执行 SQL 语句,单条数据插入 def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = { var rtn = 0 var pstmt: PreparedStatement = null try { connection.setAutoCommit(false) pstmt = connection.prepareStatement(sql) if (params != null && params.length > 0) { for (i <- params.indices) { pstmt.setObject(i + 1, params(i)) } } rtn = pstmt.executeUpdate() connection.commit() pstmt.close() } catch { case e: Exception => e.printStackTrace() } rtn } //执行 SQL 语句,批量数据插入 def executeBatchUpdate(connection: Connection, sql: String, paramsList: Iterable[Array[Any]]): Array[Int] = { var rtn: Array[Int] = null var pstmt: PreparedStatement = null try { connection.setAutoCommit(false) pstmt = connection.prepareStatement(sql) for (params <- paramsList) { if (params != null && params.length > 0) { for (i <- params.indices) { pstmt.setObject(i + 1, params(i)) } pstmt.addBatch() } } rtn = pstmt.executeBatch() connection.commit() pstmt.close() } catch { case e: Exception => e.printStackTrace() } rtn } //判断一条数据是否存在 def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean = { var flag: Boolean = false var pstmt: PreparedStatement = null try { pstmt = connection.prepareStatement(sql) for (i <- params.indices) { pstmt.setObject(i + 1, params(i)) } flag = pstmt.executeQuery().next() pstmt.close() } catch { case e: Exception => e.printStackTrace() } flag } //获取 MySQL 的一条数据 def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]): Long = { var result: Long = 0L var pstmt: PreparedStatement = null try { pstmt = connection.prepareStatement(sql) for (i <- params.indices) { pstmt.setObject(i + 1, params(i)) } val resultSet: ResultSet = pstmt.executeQuery() while (resultSet.next()) { result = resultSet.getLong(1) } resultSet.close() pstmt.close() } catch { case e: Exception => e.printStackTrace() } result } //主方法,用于测试上述方法 def main(args: Array[String]): Unit = { }}

5、逻辑处理类

package com.demo.blacklistimport com.demo.utils.JdbcUtilimport java.sql.Connectionimport java.text.SimpleDateFormatimport java.util.Dateimport org.apache.spark.streaming.dstream.DStreamcase class Ads_log(timestamp: Long, area: String, city: String, userid: String, adid: String)object BlackListHandler { //时间格式化对象 private val sdf = new SimpleDateFormat("yyyy-MM-dd") def addBlackList(filterAdsLogDSteam: DStream[Ads_log]): Unit = { //统计当前批次中单日每个用户点击每个广告的总次数 //1.将数据接转换结构 ads_log=>((date,user,adid),1) val dateUserAdToOne: DStream[((String, String, String), Long)] = filterAdsLogDSteam.map(adsLog => { //a.将时间戳转换为日期字符串 val date: String = sdf.format(new Date(adsLog.timestamp)) //b.返回值 ((date, adsLog.userid, adsLog.adid), 1L) }) //2.统计单日每个用户点击每个广告的总次数 ((date,user,adid),1)=>((date,user,adid),count) val dateUserAdToCount: DStream[((String, String, String), Long)] = dateUserAdToOne.reduceByKey(_ + _) dateUserAdToCount.foreachRDD(rdd => { rdd.foreachPartition(iter => { val connection: Connection = JdbcUtil.getConnection iter.foreach { case ((dt, user, ad), count) => JdbcUtil.executeUpdate(connection, """ |INSERT INTO user_ad_count (dt,userid,adid,count) |VALUES (?,?,?,?) |ON DUPLICATE KEY |UPDATE count=count+? """.stripMargin, Array(dt, user, ad, count, count)) val ct: Long = JdbcUtil.getDataFromMysql(connection, "select count from user_ad_count where dt=? and userid=? and adid =?", Array(dt, user, ad)) if (ct >= 30) { JdbcUtil.executeUpdate(connection, "INSERT INTO black_list (userid) VALUES (?) ON DUPLICATE KEY update userid=?", Array(user, user)) } } connection.close() }) }) } def filterByBlackList(adsLogDStream: DStream[Ads_log]): DStream[Ads_log] = { adsLogDStream.transform(rdd => { rdd.filter(adsLog => { val connection: Connection = JdbcUtil.getConnection val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list where userid=?", Array(adsLog.userid)) connection.close() !bool }) }) }}

6、配置信息(config.properties)

#jdbc 配置jdbc.datasource.size=10jdbc.url=jdbc:mysql://localhost:3306/spark2020?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=truejdbc.user=usernamejdbc.password=password# Kafka 配置kafka.broker.list=localhost:9092

7、mysql建表脚本

CREATE TABLE black_list (userid CHAr(1) PRIMARY KEY);CREATE TABLE user_ad_count (dt varchar(255),userid CHAr (1),adid CHAr (1),count BIGINT,PRIMARY KEY (dt, userid, adid));

8、通用工具类(PropertiesUtil.scala)

package com.demoimport java.io.InputStreamReaderimport java.util.Propertiesobject PropertiesUtil { def load(propertiesName:String): Properties ={ val prop=new Properties() prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8")) prop }}

9、执行测试

在kafka队列中持续输入类似下面数据的情况下。

1645164248400 华东 上海 1 11645164248400 华东 上海 5 41645164248400 华北 北京 3 11645164248400 华北 天津 6 11645164248400 华南 深圳 4 31645164248400 华南 深圳 5 11645164248400 华北 北京 3 41645164248400 华北 北京 6 51645164248400 华北 北京 4 31645164248400 华东 上海 4 21645164248400 华北 北京 2 31645164248400 华南 深圳 3 5

可以在idea控制台看到计算结果输出

-------------------------------------------Time: 1645164234000 ms-------------------------------------------51

同时可以在mysql数据库表中看到写入及更新的数据。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。