欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

大数据实战之Spark-Flume-Kafka-idea-Mysql实时处理数据并存储

时间:2023-04-18
大数据实战之Spark-Flume-Kafka-idea-Mysql实时处理数据并存储

数据流的处理

实时数据的模拟需求分析设计流程流程图Spark与hadoop部分:flume部分:kafka部分idea部分编写接受数据并存储到Mysql中

总结 数据流的处理

1、使用工具:
数据模拟工具:pychame,Python编译脚本
链接,监听工具:flume,kafka
数据存储工具:mysql(实时),hdfs(离线),本地(离线)
代码编写工具:idea(实时),sparkshell(离线)

2、项目要求:
利用模拟或者爬取等方式获取相应的数据,然后使用Flume、Kafka、Spark、数据存储、HDFS、MySQL等组件来实时处理和离线批处理模块,最后将处理好的数据存储至分布式文件系统或者数据库当中。

3、项目目标:
统计完成离线与实时部分并成功存储。

实时数据的模拟

实时数据,可以通过编写网络爬虫、采集网页数据、实时模拟等方式来实现。本次,即将采用的是通过Python代码来模拟实时产生。这种数据模拟代码,网上有很多,不会的可以去借鉴一下。在linux系统中,Python的编译环境是已经提前准备好的。

import randomimport timeiplist=[101,198,65,177,98,21,34,61,19,11,112,114]urllist=['WeGame','Steam','Blizzard','origin','Uplay','EPIC']mobile=['lol','CF','Yuansheng','CSGO','PUBG','Data2']def get_ip(): return '.'.join(str(x) for x in random.sample(iplist,4))def get_time(): return time.strftime('%Y%m%d%H%M%S',time.localtime())def get_url(): return random.sample(urllist,1)[0]def get_mobile(): return random.sample(mobile,1)[0]def get_log(count): while count>0: log='{},{},{},{}n'.format(get_ip(),get_time(),get_url(),get_mobile()) with open('/training/testData/test01.log', 'a+') as file:file.write(log) print(log) time.sleep(2) count=count-1 f=open("D:Test_1Text_access.log","w+")if __name__ == '__main__': get_log(10000)

需求分析

1、用户需求分析:
能够统计游戏平台的使用量。
由于游戏平台的使用是每个时刻都有可能发生的,所以可以利用先处理存储(实时处理日志),再进行统计计数的方式实现(离线操作)。

2、功能需求分析:

(1)实时处理功能:
①通过pychome完成编写模拟日志产生并使用liunx系统中的Python编译脚本
>>完成日志模拟的功能

②通过flume–kafka–sparkstreaming–idea–mysql
>>完成实时处理并存储的功能。

(2)离线处理功能
通过flume-kafka–hdfs–sparkshell–本地
>>完成离线批处理的功能。

3:业务需求分析:
预期日志产生:
65.11.101.198,20211212222147,EPIC+PUBG
177.65.98.19,20211212222149,origin+CF
61.98.101.112,20211212222151,Uplay+CSGO
101.112.61.177,20211212222153,WeGame+CSGO
112.19.21.98,20211212222155,origin+PUBG
61.98.21.11,20211212222157,Steam+lol

预期处理结果:
EPIC+PUBG
origin+CF
……
……

预期存储结果:
EPIC+PUBG
origin+CF
……
……

设计流程

使用flume+kafka的方式监听数据源
在flume编写conf文件配置agent(连接kafka)
监听文件在:/root/training/TestData/*
在kafka集群中创建topic>>test02

在idea中编写Spark Streaming对实时数据进行处理和连接数据库

使用数据库来进行存储我们的结果
把统计结果写入到数据库里面

选择什么数据库作为统计结果的存储呢?
RDBMS: MySQL、
name count
平台+ 游戏名 4
…… ……

下一个批次数据进来以后:(游戏名+平台名)(name)+ 1 (count ) ==> click_count + 下一个批次的统计结果 ==> 写入到数据库中等存储结束后可以使用sql语句来进行后续的查询处理或者可视化操作。比较简单直观。前提需要启动:Zookeeper集群 kafka集群flume hadoop单机 spark集群 idea +mysql

流程图

实时部分:

离线部分(我的离线部分,是利用sparkshell编程。当然也可以利用实时存储后的数据,进行处理)

Spark与hadoop部分:

在该次操作中,我利用的是spark集群进行处理,所以要先进行spark集群的搭建。
而我处理的数据将有一股分发到hdfs上,所以还需要进行hadoop环境的搭建。
当然,在此过程中可以使用伪分布模式,只是后期的配置需要一些更改。
在本次实战中,需要先将数据输入的目标目录提前创建。

flume部分:

a1.sources=r1a1.sinks=k1 k2a1.channels=c1 c2#define sourcea1.sources.r1.type=execa1.sources.r1.channels=c1 c2a1.sources.r1.command=tail -F /training/testData/test-flume.log//这里是我的文件目录,可以更改成你自己的a1.sources.r1.shell=/bin/sh -ca1.sources.r1.selector.type=replicating#sink1toKafkaa1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = test02//监听的kafka目录a1.sinks.k1.brokerList= 192.168.63.139:9092,192.168.63.140:9092,192.168.63.134:9092//非集群模式可以只设主机ip地址a1.sinks.k1.kafka.bootstrap.servers = 192.168.63.139:9092,192.168.63.140:9092,192.168.63.134:9092//这里也是一样可以只设主机ip地址a1.sinks.k1.producer.requiredAcks = 1a1.sinks.k1.batchSize = 5a1.sinks.k1.channel=c1#sink2toHDFSa1.sinks.k2.type=hdfsa1.sinks.k2.channel=c2a1.sinks.k2.hdfs.path=hdfs://192.168.63.139:9000/DataTest/datatest/test02//hdfs上,你想输出文件的目录#channel1a1.channels.c1.type=memory#channel2a1.channels.c2.type=memory

kafka部分

kafka的安装,需要先安装zookeeper。
zookeeper的安装在csdn上有很多,可以去借鉴。
首先打开zookeeper,打开kafka集群,然后创建需要被监听的目录。我这里是test02目录。

idea部分编写接受数据并存储到Mysql中

package com.niit.sparkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.kafka010.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject BigWork{ def main(args:Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("BigWork").setMaster("local[2]") val streamingContext = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "flume:9092",//主机 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("test02", "t100")//kafka监听目录 val stream = KafkaUtils.createDirectStream[String, String]( //监听 streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) val resultRDD: DStream[(String, Int)] = mapDStream.map(_._2.split(",")(2)).map(word=>(word,1)).reduceByKey(_+_)//处理数据 resultRDD.print()//打印 //转化成rdd,连接并存储到myql中 resultRDD.foreachRDD(rdd => { def func(records: Iterator[(String,Int)]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://localhost:3306/bigjob?useUnicode=true&characterEncoding=UTF-8" val user = "root" val password = "123456" conn = DriverManager.getConnection(url, user, password) records.foreach(p => { val sql = "insert into bigwork(name,count) values (?,?)" stmt = conn.prepareStatement(sql); stmt.setString(1, p._1.trim) stmt.setInt(2,p._2.toInt) stmt.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(func) }) streamingContext.start() streamingContext.awaitTermination() }}

总结

大数据的学习,需要自己动手专研。以上就是最简单实时处理流程。有什么问题,可以在评论区讨论。如果有什么不对的地方,望海涵!!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。