在与外部系统交互(用数据库中的数据扩充流数据)的时候,需要考虑与外部系统的通信延迟对整个流处理应用的影响。
简单地访问外部数据库的数据,比如使用 MapFunction,通常意味着同步交互: MapFunction 向数据库发送一个请求然后一直等待,直到收到响应。在许多情况下,等待占据了函数运行的大部分时间。
与数据库异步交互是指一个并行函数实例可以并发地处理多个请求和接收多个响应。这样,函数在等待的时间可以发送其他请求和接收其他响应。至少等待的时间可以被多个请求摊分。大多数情况下,异步交互可以大幅度提高流处理的吞吐量。
注意: 仅仅提高 MapFunction 的并行度(parallelism)在有些情况下也可以提升吞吐量,但是这样做通常会导致非常高的资源消耗:更多的并行 MapFunction 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络连接、 更多的与数据库的网络连接、更多的缓冲和更多程序内部协调的开销。
先决条件如上节所述,正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。
如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。
异步 I/O APIFlink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。
在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:
实现分发请求的 AsyncFunction
获取数据库交互的结果并发送给 ResultFuture 的 回调 函数
将异步 I/O 操作应用于 DataStream 作为 DataStream 的一次转换操作。
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple5;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;import java.util.ArrayList;import java.util.List;// TODO: 2022/2/17 实现 'RichAsyncFunction' 用于发送请求和设置回调。传入参数 1.输入流的类型 2.输出流的类型public class AsyncMysqlData extends RichAsyncFunction
主函数调用
import com.zxl.blink.StudentDB;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple5;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.RestOptions;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class AsyncDemo { public static void main(String[] args) throws Exception { //配置Flink WEB UI 可以登入localhost:8848 查看flink运行图 Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT,8848); //创建执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); environment.setParallelism(4); //调用自定义函数形成数据流 DataStream