欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Spark流处理中定时更新广播变量值

时间:2023-07-20
Spark流处理中定时更新广播变量值

在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。


一、Spark streaming

可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:

注意事项: spark streaming会为每一个流创建job,为了不同job间互不影响,需在foreachRDD、transform算子内进行变量的广播操作此方法仅适用于spark streaming,structured streaming需使用其他方法做广播变量的更新

@Slf4jpublic class JDBCBroadcastPeriodicUpdater { private static final int PERIOD = 30 * 1000; //更新周期,秒 private static volatile JDBCBroadcastPeriodicUpdater instance; private Broadcast>> broadcast; private long lastUpdate = 0L; private JDBCBroadcastPeriodicUpdater() {} public static JDBCBroadcastPeriodicUpdater getInstance() { if (instance == null) { synchronized (JDBCBroadcastPeriodicUpdater.class) { if (instance == null) { instance = new JDBCBroadcastPeriodicUpdater(); } } } return instance; } public Broadcast>> updateAndGet(SparkSession spark, DruidDataSource dataSource, String sql) { SparkContext sc = spark.sparkContext(); long now = System.currentTimeMillis(); long offset = now - lastUpdate; if (offset > PERIOD || null == broadcast) { if (broadcast != null) { // 删除已获取的广播变量值 broadcast.unpersist(); } lastUpdate = now; // 重新广播新的变量值 List> value = fetchBroadcastData(dataSource, sql); broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value); } return broadcast; } @SneakyThrows private List> fetchBroadcastData(DruidDataSource dataSource, String sql) { List> result = new ArrayList<>(); DruidPooledConnection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql); ResultSet data = ps.executeQuery(); ResultSetmetaData metaData = data.getmetaData(); int colCount = metaData.getColumnCount(); while (data.next()) { HashMap row = new HashMap<>(); for (int i=0; i


二、Structured streaming

Structured streaming使用trigger触发每个批次的数据处理,但由于使用了Spark sql engine,代码是优化后执行的,只有在首次触发trigger时才会获取广播变量的值,故前述在Spark streaming中使用的方法并不能达到更新变量的目的。

2.1 借用Listener的特性

Spark使用微批的形式处理流数据,而每个流的运行都会伴随着Listener监控任务的执行状态。在Structured streaming中,Listener有三个方法:onQueryStarted、onQueryProgress、onQueryTerminated,分别在程序开始运行、每批次数据处理完毕、程序结束时调用,需要注意的是onQueryProgress是异步调用的。

变量的广播操作需要在driver上执行的,而Listener的调用也是在driver端,我们正好可以利用这一点,在onQueryProgress方法中进行广播变量的更新操作。具体就是使用.unpersist()删除广播变量再重新广播。

以上理论上可以实现所需功能,但spark是支持static与stream做join的,而且在执行时每次触发trigger都会去重新获取static的df,故比起上面的方法,以下方法更为推荐。

2.2 使用SQL Hints

如前所述,实际操作中可以分别在代码中获取static df和stream df,创建临时视图做join操作,并用Hints语法标识需要做广播的表。

此法利用了static df在每个批次都会重新读取的特点更新规则数据,又利用Hints语法使实际的数据处理完全用SQL完成,对比自定义Listener的方式更佳简单易用,利于维护。

代码示例:

SELECt * FROM records r JOIN src s ON r.key = s.key

关于Spark SQL Hints,参考官方文档:Hints - Spark 3.1.2 documentation (apache.org)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。