欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkStreaming输出算子foreachRDD介绍

时间:2023-05-02

  SparkStreaming中的数据抽象叫做DStream。DStream是抽象类,它把连续的数据流拆成很多的小RDD数据块, 这叫做“微批次”, spark的流式处理, 都是“微批次处理”。 DStream内部实现上有批次处理时间间隔,滑动窗口等机制来保证每个微批次的时间间隔里, 数据流以RDD的形式发送给spark做进一步处理。因此, 在一个为批次的处理时间间隔里, DStream只产生一个RDD。

foreachRDD、foreachPartition和foreach 的区别:

首先是作用范围不同,foreachRDD 作用于 DStream中每一个时间间隔的 RDD,foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition,foreach 作用于每一个时间间隔的 RDD 中的每一个元素。foreach 与 foreachPartition都是在每个partition中对iterator进行操作,不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用,而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理(可以避免内存溢出)。 Dstream.foreachRDD介绍:

首先他操作的是Dstream, Dstream是一个由RDD组成的流,foreachRDD是一个输出的操作,它可以操作RDD,比如把RDD的数据写入的数据库,对RDD进行业务逻辑处理,把SparkStream运行得到的结果保存到外部系统比如HDFS、Mysql、Redis等等。
要是想要操作RDD里面的数据,就要用RDD.foreach

foreachRDD算子使用的常见误区:

可以利用Dstream.foreachRDD把数据发送给外部系统。 但是想要正确地, 有效率的使用它, 必须理解一下背后的机制。通常向外部系统写数据需要一个Connection对象(通过它与外部服务器交互)。

误区一:在driver上创建连接对象(比如网络连接或数据库连接)
  如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。

dstream.foreachRDD { rdd =>  val connection = createNewConnection() // 数据库连接在driver上执行  rdd.foreach { record =>  connection.send(record) // 在worker上执行  }}

误区二:为每一条记录都创建一个连接对象
  通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。

  比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入数据库中。这样可以大大减少创建的连接对象的数量。

正确做法一:为每个RDD分区创建一个连接对象

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }}

正确做法二:为每个RDD分区使用一个连接池中的连接对象

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords => // 从数据库连接池中获取连接 val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 用完以后将连接返回给连接池,进行复用}}

举例:输出到MySQL数据库中

import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.DriverManagerobject ForeachRDDDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //一、初始化程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) //二、获取数据流,即数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9996) //三、数据处理 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val wordCounts: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) //四、数据输出,将数据保存到MySQL中 wordCounts.foreachRDD{(rdd,time) => rdd.foreach{record => //为每一条数据都创建了一个连接,连接使用完了就关闭 //频繁的创建和关闭连接。其实对数据性能影响很大。 这个就是可以优化的点,可以考虑将foreach换成foreachPartition Class.forName("com.mysql.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark","root","111111") val statement = conn.prepareStatement(s"insert into wordcount(ts,word,count) values (?,?,?)") statement.setLong(1,time.milliseconds.toLong) statement.setString(2,record._1) statement.setInt(3,record._2) statement.execute() statement.close() conn.close() } } //五、启动任务 ssc.start() ssc.awaitTermination() ssc.stop() }}

注意:
  Dstream和RDD一样是延迟执行,只有遇到action操作才会真正去计算。因此在Dstream的内部RDD必须包含Action操作才能是接受到的数据得到处理。即使代码中包含foreachRDD,但在内部却没有action的RDD,SparkStream只会简单地接受数据数据而不进行处理

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。