在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。
一、Spark streaming
可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:
注意事项: spark streaming会为每一个流创建job,为了不同job间互不影响,需在foreachRDD、transform算子内进行变量的广播操作此方法仅适用于spark streaming,structured streaming需使用其他方法做广播变量的更新
@Slf4jpublic class JDBCBroadcastPeriodicUpdater { private static final int PERIOD = 30 * 1000; //更新周期,秒 private static volatile JDBCBroadcastPeriodicUpdater instance; private Broadcast>> broadcast; private long lastUpdate = 0L; private JDBCBroadcastPeriodicUpdater() {} public static JDBCBroadcastPeriodicUpdater getInstance() { if (instance == null) { synchronized (JDBCBroadcastPeriodicUpdater.class) { if (instance == null) { instance = new JDBCBroadcastPeriodicUpdater(); } } } return instance; } public Broadcast>> updateAndGet(SparkSession spark, DruidDataSource dataSource, String sql) { SparkContext sc = spark.sparkContext(); long now = System.currentTimeMillis(); long offset = now - lastUpdate; if (offset > PERIOD || null == broadcast) { if (broadcast != null) { // 删除已获取的广播变量值 broadcast.unpersist(); } lastUpdate = now; // 重新广播新的变量值 List
二、Structured streaming
Structured streaming使用trigger触发每个批次的数据处理,但由于使用了Spark sql engine,代码是优化后执行的,只有在首次触发trigger时才会获取广播变量的值,故前述在Spark streaming中使用的方法并不能达到更新变量的目的。
2.1 借用Listener的特性
Spark使用微批的形式处理流数据,而每个流的运行都会伴随着Listener监控任务的执行状态。在Structured streaming中,Listener有三个方法:onQueryStarted、onQueryProgress、onQueryTerminated,分别在程序开始运行、每批次数据处理完毕、程序结束时调用,需要注意的是onQueryProgress是异步调用的。
变量的广播操作需要在driver上执行的,而Listener的调用也是在driver端,我们正好可以利用这一点,在onQueryProgress方法中进行广播变量的更新操作。具体就是使用.unpersist()删除广播变量再重新广播。
以上理论上可以实现所需功能,但spark是支持static与stream做join的,而且在执行时每次触发trigger都会去重新获取static的df,故比起上面的方法,以下方法更为推荐。
2.2 使用SQL Hints
如前所述,实际操作中可以分别在代码中获取static df和stream df,创建临时视图做join操作,并用Hints语法标识需要做广播的表。
此法利用了static df在每个批次都会重新读取的特点更新规则数据,又利用Hints语法使实际的数据处理完全用SQL完成,对比自定义Listener的方式更佳简单易用,利于维护。
代码示例:
SELECt * FROM records r JOIN src s ON r.key = s.key
关于Spark SQL Hints,参考官方文档:Hints - Spark 3.1.2 documentation (apache.org)