欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flinkcdc生产遇到的问题

时间:2023-05-01

知识备份:

阿里云Flink CDC文档地址:

MySQL的CDC源表 - 实时计算Flink版 - 阿里云

cdc参数:

WITH参数 参数说明是否必填数据类型备注connector源表类型。是STRING可以填写为mysql-cdc或者mysql,二者等价。hostnameMySQL数据库的IP地址或者Hostname。是STRING无。usernameMySQL数据库服务的用户名。是STRING无。passwordMySQL数据库服务的密码。是STRING无。database-nameMySQL数据库名称。是STRING数据库名称支持正则表达式以读取多个数据库的数据。table-nameMySQL表名。是STRING表名支持正则表达式以读取多个表的数据。portMySQL数据库服务的端口号。否INTEGER默认值为3306。server-id数据库客户端的一个数字 ID。否STRING该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。默认会随机生成一个5400~6400的值。

该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。

scan.incremental.snapshot.enabled是否开启增量快照。否BOOLEAN默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:

读取全量数据时,Source可以是并行读取。读取全量数据时,Source支持chunk粒度的检查点。读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。

如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。

scan.incremental.snapshot.chunk.size表的chunk的大小(行数)。否Integer默认值为8096。当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。scan.snapshot.fetch.size当读取表的全量数据时,每次最多拉取的记录数。否Integer默认值为1024。scan.startup.mode消费数据时的启动模式。否STRING参数取值如下:

initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该Connector启动以后的最新变更。 。server-time-zone数据库在使用的会话时区。否STRING例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。debezium.min.row.count.to.stream.results当表的条数大于该值时,会使用分批读取模式。否INTEGER默认值为1000。Flink采用以下方式读取MySQL源表数据:

全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。connect.timeout在尝试连接MySQL数据库服务器之后,连接器在超时之前应该等待的最大时间。否Duration默认值为30秒。

版本:

Flink版本 1.13

Flink cdc版本 2.1.1

场景说明:

使用flink cdc stream api 读取mysql整库数据直接写入doris

大概100G数据量,大概几十个表,大表小表,字段多,字段少,单个字段类型复杂等等情况都包含了。

出现情况:

任务运行一段时间之后挂掉,出现问题:

2022-02-11 18:33:59,461 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlSnapshotSplit{tableId=plateform_stable_copy.order_address, splitId='plateform_stable_copy.order_address:196', splitKeyType=[`id` INT NOT NULL], splitStart=[17079248], splitEnd=[17165910], highWatermark=null} to subtask 0
2022-02-11 18:33:59,976 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator receives finished split offsets FinishedSnapshotSplitsReportEvent{finishedOffsets={plateform_stable_copy.order_address:196={ts_sec=0, file=mysql-bin.006361, pos=441499143, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18305222, row=0, event=0}}} from subtask 0.
2022-02-11 18:33:59,977 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlSnapshotSplit{tableId=plateform_stable_copy.order_address, splitId='plateform_stable_copy.order_address:197', splitKeyType=[`id` INT NOT NULL], splitStart=[17165910], splitEnd=[17252572], highWatermark=null} to subtask 0
2022-02-11 18:34:00,079 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 57 (type=CHECKPOINT) @ 1644575640072 for job 01f4e4416ccf488091611165e921b83b.
2022-02-11 18:34:00,760 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler      [] - FATAL: Thread 'SourceCoordinator-Source: dataSourceStream -> processStream' produced an uncaught exception、Stopping the process...
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit、Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2022-02-11 18:34:00,768 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit、Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
        at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleEventFromOperator(SourceCoordinator.java:156) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:81) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:209) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:130) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.Schedulerbase.deliverOperatorEventToCoordinator(Schedulerbase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,788 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting YarnJobClusterEntrypoint down with application status UNKNOWN、Diagnostics Cluster entrypoint has been closed externally..
2022-02-11 18:34:00,789 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-02-11 18:34:00,791 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (01f4e4416ccf488091611165e921b83b) switched from state RUNNING to RESTARTING.
2022-02-11 18:34:00,795 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:12287
2022-02-11 18:34:00,800 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit、Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
        at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleEventFromOperator(SourceCoordinator.java:156) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:81) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:209) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:130) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.Schedulerbase.deliverOperatorEventToCoordinator(Schedulerbase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,816 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-dd18d903-98a1-4ae6-8e07-cd5a7bae3801/flink-web-ui
2022-02-11 18:34:00,828 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://prod-qd-ct7-cdh-data-node03:9490 lost leadership
2022-02-11 18:34:00,828 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
2022-02-11 18:34:00,828 INFO  org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2022-02-11 18:34:00,829 INFO  org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
2022-02-11 18:34:00,829 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Stopping dispatcher akka.tcp://flink@prod-qd-ct7-cdh-data-node03:5662/user/rpc/dispatcher_1.
2022-02-11 18:34:00,829 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Stopping all currently running jobs of dispatcher akka.tcp://flink@prod-qd-ct7-cdh-data-node03:5662/user/rpc/dispatcher_1.
2022-02-11 18:34:00,830 INFO  org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Interrupted while waiting for queue
java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_181]
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_181]
        at java.util.concurrent.linkedBlockingQueue.take(linkedBlockingQueue.java:442) ~[?:1.8.0_181]
        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274) [flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-11.0.jar:2.6.0-cdh5.16.2-11.0]
2022-02-11 18:34:00,832 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Job 01f4e4416ccf488091611165e921b83b was not finished by JobManager.
2022-02-11 18:34:00,832 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Shutting down cluster because job not finished
2022-02-11 18:34:00,834 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (1/1) (d6710965feba37b327b8f6083c4f53cc) switched from RUNNING to FAILED on container_e05_1641803753156_0100_01_000002 @ prod-qd-ct7-cdh-data-node03 (dataPort=29600).
java.util.concurrent.ExecutionException: Boxed Error
        at scala.concurrent.impl.Promise$.resolver(Promise.scala:59) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
Caused by: java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit、Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
        at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.subtaskFailed(SourceCoordinator.java:182) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$subtaskFailed$1(RecreateOnResetOperatorCoordinator.java:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.subtaskFailed(RecreateOnResetOperatorCoordinator.java:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.subtaskFailed(OperatorCoordinatorHolder.java:214) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.notifyCoordinatorOfCancellation(DefaultScheduler.java:561) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelExecutionVertex(DefaultScheduler.java:317) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_181]
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) ~[?:1.8.0_181]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_181]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_181]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_181]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_181]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_181]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelTasksAsync(DefaultScheduler.java:309) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:253) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:234) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:229) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:133) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.Schedulerbase.deliverOperatorEventToCoordinator(Schedulerbase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        ..、9 more
2022-02-11 18:34:00,840 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job dev-SingleInstanceData2doris(01f4e4416ccf488091611165e921b83b).
2022-02-11 18:34:00,842 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (01f4e4416ccf488091611165e921b83b) switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: Scheduler is being stopped.
        at org.apache.flink.runtime.scheduler.Schedulerbase.closeAsync(Schedulerbase.java:604) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallonStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,845 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/1) (1a3ac3e7fbadc81e49d53b75795c2c63) switched from RUNNING to CANCELING.
2022-02-11 18:34:00,848 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/1) (1a3ac3e7fbadc81e49d53b75795c2c63) switched from CANCELING to CANCELED.
2022-02-11 18:34:00,850 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 1a3ac3e7fbadc81e49d53b75795c2c63.
2022-02-11 18:34:00,850 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2022-02-11 18:34:00,850 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
 

问题分析:

看日志,监控等等,没有发现是代码报错,任务自动重启重试也是失败。

1,首先我尝试通过checkpoint恢复任务,发现恢复之后运行一段时间也是同样的错误
2,判断是不是某个表数据导致的问题,清空遇到问题的那个表数据,通过checkpoint恢复任务之后,运行一段时间还是出现同样的错误,而且发现一个规律就是大表会导致同样的问题,小表数据会被写入(cdc读取库是按表一个一个读取的)

3,那么解决问题可能就是资源问题,尝试加大flink 任务资源。(因为测试写入数据量100G,所以给的资源不大)

问题搜索:

在flink cdc社区搜索关键字:

This indicates that a fatal error has happened and caused the coordinator executor thread to exit  

地址:

https://github.com/ververica/flink-cdc-connectors/issues?q=This+indicates+that+a+fatal+error+has+happened+and+caused+the+coordinator+executor+thread+to+exit

 

遂加大jobManager和 taskManger内存(扩大4倍),目前任务正在运行中

问题2:
读取离线数据完成之后 yarn任务自己死掉了。

 2022-02-14 12:34:53,207 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.006360, pos=93487577, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18300905, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}} to subtask 0
2022-02-14 12:34:58,680 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (1/3) (e6807bd2ac2a982054dd3bb62006a462) switched from RUNNING to FAILED on container_e05_1641803753156_0111_01_000002 @ prod-qd-ct7-cdh-data-node02 (dataPort=1818).
java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.SourceReaderbase.getNextFetch(SourceReaderbase.java:154) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.SourceReaderbase.pollNext(SourceReaderbase.java:116) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.lang.RuntimeException: SplitFetcher thread 6349 received unexpected exception while polling the records
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runonce(SplitFetcher.java:146) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ..、1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644813298581,db=,server_id=0,file=mysql-bin.006360,pos=93487577,row=0}, but this is no longer available on the server、Reconfigure the connector to use a snapshot when needed.
        at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runonce(SplitFetcher.java:138) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ..、1 more
2022-02-14 12:34:58,715 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,715 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0.
2022-02-14 12:34:58,716 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 6 tasks should be restarted to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0. 
2022-02-14 12:34:58,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RUNNING to RESTARTING.
2022-02-14 12:34:58,719 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 1 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,719 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 2 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,726 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from CANCELING to CANCELED.
2022-02-14 12:34:58,727 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,365 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,367 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}]
2022-02-14 12:34:59,426 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,426 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2022-02-14 12:34:59,569 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,570 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 8082a3ead0f36284e54f4bf28b8a695e
2022-02-14 12:35:18,727 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RESTARTING to RUNNING.
2022-02-14 12:35:18,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 8082a3ead0f36284e54f4bf28b8a695e from Checkpoint 93 @ 1644813292042 for 8082a3ead0f36284e54f4bf28b8a695e located at hdfs://nameservice1/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/8082a3ead0f36284e54f4bf28b8a695e/chk-93.
2022-02-14 12:35:18,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master state to restore
2022-02-14 12:35:18,732 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 2 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (1/3) (9975f5c8e492cda22add5c1abd34ba64) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (2/3) (097d045473dfbc8e980daf2ed5095b96) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (3/3) (bfeda1f01099ea1fe1656a6f431a56f7) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/3) (41433c56861b77cb145c5e2eabda66ce) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (2/3) (d88b153f6c0db78c8b461b768d90cb0f) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (3/3) (eee0927845d4dcff02ecf8a7af2810f8) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 1 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.

分析:

可能是binlog文件被删除掉了 ,在随后的通过checkpoint savepoint恢复过程中也发现 binlog文件不存在,任务恢复失败。 

问题 :

2022-02-14 19:14:24
com.alibaba.fastjson.JSONException: write javaBean error, fastjson version 1.2.47, class java.nio.HeapByteBuffer, fieldName : post_dept
    at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:465)
    at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:120)
    at com.alibaba.fastjson.serializer.MapSerializer.write(MapSerializer.java:270)
    at com.alibaba.fastjson.serializer.MapSerializer.write(MapSerializer.java:44)
    at com.alibaba.fastjson.serializer.ListSerializer.write(ListSerializer.java:137)
    at com.alibaba.fastjson.serializer.JSONSerializer.write(JSONSerializer.java:281)
    at com.alibaba.fastjson.JSON.toJSonString(JSON.java:673)
    at com.alibaba.fastjson.JSON.toJSonString(JSON.java:611)
    at com.alibaba.fastjson.JSON.toJSonString(JSON.java:576)
    at com.sjb.cdc.customization.Data2dorisCustomization$3.process(Data2dorisCustomization.java:248)
    at com.sjb.cdc.customization.Data2dorisCustomization$3.process(Data2dorisCustomization.java:225)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:434)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.alibaba.fastjson.util.FieldInfo.get(FieldInfo.java:484)
    at com.alibaba.fastjson.serializer.FieldSerializer.getPropertyValueDirect(FieldSerializer.java:140)
    at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:249)
    ..、25 more
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:506)
    at java.nio.HeapByteBuffer.getChar(HeapByteBuffer.java:259)
    ..、32 more

2022-02-15 11:53:09
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to [B
    at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getBytes(Struct.java:168)
    at com.sjb.cdc.customization.JsonSchemaCustomization.foreachStruct(JsonSchemaCustomization.java:147)
    at com.sjb.cdc.customization.JsonSchemaCustomization.createData(JsonSchemaCustomization.java:110)
    at com.sjb.cdc.customization.JsonSchemaCustomization.deserialize(JsonSchemaCustomization.java:89)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53)
    at org.apache.flink.connector.base.source.reader.SourceReaderbase.pollNext(SourceReaderbase.java:128)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)

分析:

遇到了二进制字段  需要单独判断 ,二进制字段放入json报错

for (Field field : afterStruct.schema().fields()) { String fieldName = field.name();

if ("bytes".equals(field.schema().type().getName())) { byte[] bytes = afterStruct.getBytes(fieldName); if (bytes != null) { String fieldValue = new String(bytes); dataJson.put(fieldName, fieldValue); }

}

问题:

cdc读取mysql 写入doris 字段映射处理问题,直接上测试代码吧

for (Field field : struct.schema().fields()) { String fieldName = field.name(); String schemaName = field.schema().name(); Object fieldValue = struct.get(field); if (pkNameList.contains(fieldName)) { partitionerNum += fieldValue.hashCode(); dataJson.put("partitionerNum", partitionerNum); } //todo 这里是时间字段 try { if ("int32".equals(field.schema().type().getName()) && Date.SCHEMA_NAME.equals(schemaName)) { if (fieldValue != null) { int day = (int) fieldValue; long second = day * 24 * 60 * 60L; String dateStr = LocalDateTime.ofEpochSecond(second, 0, ZoneOffset.ofHours(8)).format(dateFormatter); dataJson.put(fieldName, dateStr); } } else if ("int64".equals(field.schema().type().getName()) && Timestamp.SCHEMA_NAME.equals(schemaName)) { if (fieldValue != null) { long times = (long) fieldValue; String dateTime = LocalDateTime.ofEpochSecond(times / 1000 - 8 * 60 * 60, 0, ZoneOffset.ofHours(8)).format(dateTimeFormatter); dataJson.put(fieldName, dateTime); } } else if ("string".equals(field.schema().type().getName()) && "io.debezium.time.ZonedTimestamp".equals(schemaName)) { String timestampValueStr = struct.getString(fieldName); if (fieldValue != null) { LocalDateTime localDateTime = LocalDateTime.parse(timestampValueStr,timestampFormatter); LocalDateTime rsTime = localDateTime.plusHours(8); String timestampValue = rsTime.format(dateTimeFormatter); dataJson.put(fieldName, timestampValue); } } else if ("bytes".equals(field.schema().type().getName()) && StringUtils.isEmpty(schemaName)) { byte[] bytes = struct.getBytes(fieldName); if (bytes != null) { String bytesValue = new String(bytes); dataJson.put(fieldName, bytesValue); } } else { if (fieldValue != null) { dataJson.put(fieldName, fieldValue); } } } catch (Exception ex) { String errorInfo = StringUtils.join(ex.getStackTrace(), ""); logger.error("table:" + dataJson.getString("canal_table") + ",fieldName = " + fieldName + ",msg:" + errorInfo); DingProd.sendDingRabotProd("table:" + dataJson.getString("canal_table") + ",fieldName = " + fieldName + ",msg:" + errorInfo); }}

备注:

mysql对应的drois如何处理?测试结构drois备注说明,针对E列bigintbigintbigintbit#N/A?intbitmapblob#N/Avarcharbooleancharcharchardatedatedatedatetimedatetimedatetimedecimaldecimaldescriptiondoubledoublekeywordenum#N/A?varchardecimalfloatfloatdoubleintintfloatjson#N/A?stringhll在doris里面就是TEXTlongblob#N/Avarcharintlongtext#N/Avarcharlargeintmediumblob#N/Avarcharsmallintmediumint#N/Abigintstringmediumtext#N/Astringtinyintset#N/A?varcharvarcharsmallintsmallinttext#N/Astringtime#N/Avarchartimestamp#N/Avarcharmysql实际:2022-02-16 15:34:01
cdc读取出来格式:2022-02-16T07:34:01Z 需要加8个小时tinyinttinyinttinytext#N/Avarcharvarbinary#N/A?varchar二进制一样varcharvarcharyear#N/Avarchar

 

问题:

cdc ddl语句监控

if (valueStruct.schema().name().equals("io.debezium.connector.mysql.SchemaChangevalue")) { String historyRecord = valueStruct.getString("historyRecord"); JSonObject schemaChangevalueJson = JSONObject.parseObject(historyRecord); logger.error("元数据变更信息:" + schemaChangevalueJson); dataJson = schemaChangevalueJson; if (targetDb != null) { dataJson.put("targetDb", targetDb); dataJson.put("targetTb", table); //向下游传递数据 out.collect(dataJson); String content = "warn!!!,钉钉预警!元数据发生变更...目标库:" + targetDb + ",目标表:" + table; DingProd.sendDingRabotProd(content); }} else {

 

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。