数据流的处理
实时数据的模拟需求分析设计流程流程图Spark与hadoop部分:flume部分:kafka部分idea部分编写接受数据并存储到Mysql中
总结 数据流的处理
1、使用工具:
数据模拟工具:pychame,Python编译脚本
链接,监听工具:flume,kafka
数据存储工具:mysql(实时),hdfs(离线),本地(离线)
代码编写工具:idea(实时),sparkshell(离线)
2、项目要求:
利用模拟或者爬取等方式获取相应的数据,然后使用Flume、Kafka、Spark、数据存储、HDFS、MySQL等组件来实时处理和离线批处理模块,最后将处理好的数据存储至分布式文件系统或者数据库当中。
3、项目目标:
统计完成离线与实时部分并成功存储。
实时数据,可以通过编写网络爬虫、采集网页数据、实时模拟等方式来实现。本次,即将采用的是通过Python代码来模拟实时产生。这种数据模拟代码,网上有很多,不会的可以去借鉴一下。在linux系统中,Python的编译环境是已经提前准备好的。
import randomimport timeiplist=[101,198,65,177,98,21,34,61,19,11,112,114]urllist=['WeGame','Steam','Blizzard','origin','Uplay','EPIC']mobile=['lol','CF','Yuansheng','CSGO','PUBG','Data2']def get_ip(): return '.'.join(str(x) for x in random.sample(iplist,4))def get_time(): return time.strftime('%Y%m%d%H%M%S',time.localtime())def get_url(): return random.sample(urllist,1)[0]def get_mobile(): return random.sample(mobile,1)[0]def get_log(count): while count>0: log='{},{},{},{}n'.format(get_ip(),get_time(),get_url(),get_mobile()) with open('/training/testData/test01.log', 'a+') as file:file.write(log) print(log) time.sleep(2) count=count-1 f=open("D:Test_1Text_access.log","w+")if __name__ == '__main__': get_log(10000)
需求分析 1、用户需求分析:
能够统计游戏平台的使用量。
由于游戏平台的使用是每个时刻都有可能发生的,所以可以利用先处理存储(实时处理日志),再进行统计计数的方式实现(离线操作)。
2、功能需求分析:
(1)实时处理功能:
①通过pychome完成编写模拟日志产生并使用liunx系统中的Python编译脚本
>>完成日志模拟的功能
②通过flume–kafka–sparkstreaming–idea–mysql
>>完成实时处理并存储的功能。
(2)离线处理功能
通过flume-kafka–hdfs–sparkshell–本地
>>完成离线批处理的功能。
3:业务需求分析:
预期日志产生:
65.11.101.198,20211212222147,EPIC+PUBG
177.65.98.19,20211212222149,origin+CF
61.98.101.112,20211212222151,Uplay+CSGO
101.112.61.177,20211212222153,WeGame+CSGO
112.19.21.98,20211212222155,origin+PUBG
61.98.21.11,20211212222157,Steam+lol
预期处理结果:
EPIC+PUBG
origin+CF
……
……
预期存储结果:
EPIC+PUBG
origin+CF
……
……
使用flume+kafka的方式监听数据源
在flume编写conf文件配置agent(连接kafka)
监听文件在:/root/training/TestData/*
在kafka集群中创建topic>>test02
在idea中编写Spark Streaming对实时数据进行处理和连接数据库
使用数据库来进行存储我们的结果
把统计结果写入到数据库里面
选择什么数据库作为统计结果的存储呢?
RDBMS: MySQL、
name count
平台+ 游戏名 4
…… ……
下一个批次数据进来以后:(游戏名+平台名)(name)+ 1 (count ) ==> click_count + 下一个批次的统计结果 ==> 写入到数据库中等存储结束后可以使用sql语句来进行后续的查询处理或者可视化操作。比较简单直观。前提需要启动:Zookeeper集群 kafka集群flume hadoop单机 spark集群 idea +mysql
流程图 实时部分:
离线部分(我的离线部分,是利用sparkshell编程。当然也可以利用实时存储后的数据,进行处理)
Spark与hadoop部分: 在该次操作中,我利用的是spark集群进行处理,所以要先进行spark集群的搭建。
而我处理的数据将有一股分发到hdfs上,所以还需要进行hadoop环境的搭建。
当然,在此过程中可以使用伪分布模式,只是后期的配置需要一些更改。
在本次实战中,需要先将数据输入的目标目录提前创建。
a1.sources=r1a1.sinks=k1 k2a1.channels=c1 c2#define sourcea1.sources.r1.type=execa1.sources.r1.channels=c1 c2a1.sources.r1.command=tail -F /training/testData/test-flume.log//这里是我的文件目录,可以更改成你自己的a1.sources.r1.shell=/bin/sh -ca1.sources.r1.selector.type=replicating#sink1toKafkaa1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = test02//监听的kafka目录a1.sinks.k1.brokerList= 192.168.63.139:9092,192.168.63.140:9092,192.168.63.134:9092//非集群模式可以只设主机ip地址a1.sinks.k1.kafka.bootstrap.servers = 192.168.63.139:9092,192.168.63.140:9092,192.168.63.134:9092//这里也是一样可以只设主机ip地址a1.sinks.k1.producer.requiredAcks = 1a1.sinks.k1.batchSize = 5a1.sinks.k1.channel=c1#sink2toHDFSa1.sinks.k2.type=hdfsa1.sinks.k2.channel=c2a1.sinks.k2.hdfs.path=hdfs://192.168.63.139:9000/DataTest/datatest/test02//hdfs上,你想输出文件的目录#channel1a1.channels.c1.type=memory#channel2a1.channels.c2.type=memory
kafka部分 kafka的安装,需要先安装zookeeper。
zookeeper的安装在csdn上有很多,可以去借鉴。
首先打开zookeeper,打开kafka集群,然后创建需要被监听的目录。我这里是test02目录。
package com.niit.sparkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.kafka010.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject BigWork{ def main(args:Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("BigWork").setMaster("local[2]") val streamingContext = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "flume:9092",//主机 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("test02", "t100")//kafka监听目录 val stream = KafkaUtils.createDirectStream[String, String]( //监听 streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) val resultRDD: DStream[(String, Int)] = mapDStream.map(_._2.split(",")(2)).map(word=>(word,1)).reduceByKey(_+_)//处理数据 resultRDD.print()//打印 //转化成rdd,连接并存储到myql中 resultRDD.foreachRDD(rdd => { def func(records: Iterator[(String,Int)]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://localhost:3306/bigjob?useUnicode=true&characterEncoding=UTF-8" val user = "root" val password = "123456" conn = DriverManager.getConnection(url, user, password) records.foreach(p => { val sql = "insert into bigwork(name,count) values (?,?)" stmt = conn.prepareStatement(sql); stmt.setString(1, p._1.trim) stmt.setInt(2,p._2.toInt) stmt.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(func) }) streamingContext.start() streamingContext.awaitTermination() }}
总结大数据的学习,需要自己动手专研。以上就是最简单实时处理流程。有什么问题,可以在评论区讨论。如果有什么不对的地方,望海涵!!