欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

分布式调度引擎elastic-job3源码分析核心服务I

时间:2023-04-17

分布式调度引擎 elastic-job3 源码分析 (一) 概述

分布式调度引擎 elastic-job3 源码分析 (二) 作业模型和注册

分布式调度引擎elastic-job3源码分析(三)-作业执行

分布式调度引擎elastic-job3源码分析(五)-核心服务 II(vip)

分布式调度引擎elastic-job3源码分析(六)-失效转移(vip​​​​​​)

1、背景

调度引擎是关键的基础设施,不但是定时执行任务,更是大规模分布式任务引擎,分布式并行处理平台,管理计算节点集群,提供高吞吐的可伸缩的数据处理能力。

公司日益增长的业务,对调度引擎高吞吐,高并发的要求也快速增长,需构建一个无中心,管理大集群,健壮分片容错的任务调度任务平台,支撑业务发展。

同时,分布式调度引擎也是 datax,可观测-性能指标组件(sentinel dashboard)分布式改造的核心技术

2、参考

芋道源码作业调度中间件 Elastic-Job 源码解析合集_芋道源码-CSDN博客

elastic-job - 文集 - 简书

3、术语

无中心/有中心分布式 有中心分布式设置中心节点负责集群协调和元数据保存等工作,例如 xxl-job 的 admin/executor, dolphin-scheduler master-worker 都是有中心分布式设计;真正无中心设计很少,大部分是节点平等,都可以通过选举成为主节点,也就是,任何一个节点都可以成为中心

脑裂 无中心分布式设计,当网络出现问题,节点分割成多个集群,集群间因不能通讯而不能达到状态一致,通常解决方案是集群节点数奇数,节点数少于总数的集群中一半停止工作

分片/容错 分片是调度平台很重要的特性,调度处理大规模数据,需要分片执行,分片执行带来新的问题,分片失败,平台回收分片,转移到其他节点执行

5 详细分析

服务可分为功能服务和核心服务,其中核心服务支撑功能服务的服务,功能任务有任务注册,任务执行,失效转移等,是调度平台的”业务”功能

5.6 核心服务 5.6.1 znode 结构

znode 设计是 zookeeper 分布式协调的灵魂,zk 的临时 znode,持久 znode,watch 机制

​ 

本文分析的核心,服务如何使用 znode,znode 间组合使用

5.6.2 注册中心和 znode 存储服务

注册中心属于生态一部分,但注册中心主要用在 znode 存储服务,合起来一起看

JobNodeStorage/CoordinatorRegistryCenter 两个类名字有点名不副实,CoordinatorRegistryCenter 才是 znode 节点存取;JobNodeStorage 更多是 znode”应用”:

executeInLeader 主节点选举

addDataListener 注册 znode 监听

executeInTransaction znode 操作事务 TransactionExecutionCallback

AbstracJobListener znode 事件监听器基类

LeaderExecutionCallback 主节点选举后回调

TransactionExecutionCallback znode 事务操作

5.6.3 监听服务

总体来说,监听服务分两大类,quartz 和分布式(zk)

本文只关注 zk 部分,elastic-job 自身也只实现 zk 部分,quartz 部分留接口给用户,其他的核心服务都依赖监听服务,捕获 znode 事件,执行相应服务逻辑,因此监听服务非常关键服务,是其他服务分析的入口

ListenerManager 监听管理器的集中管理

AbstractListenerManager/ AbstractJobListener 其他服务继承,实现服务的监听管理接入的监听器实现

RegistryCenterConnectionStateListener 连接状态的监听器,连接断开暂停作业,重连后,重新初始化作业,清除分片,重启作业

Ø 谁使用:几乎所有核心服务

Ø 依赖服务:

- znode 存储服务/注册中心 znode 节点存取,监听器注册

5.6.4 分片服务(sharding)

分片是分布式任务不可或缺的特性,任务分片并行执行,极大提高处理能力,分片伴随弹性计算,节点发现,容错等能力,是分布式调度的体现

elastic-job 实现了静态分片和分片容错,当作业满足分片条件时,设置需要分片标记,等到作业执行时,判断有该标记后执行作业分配

分片监听

elastic-job 的分片是先设置需要分片标记,等到需要的时候再实际分片,哪里设置分片?

ListenServersChangedJobListener 监听 server znode 和 instance znode,对于一个作业,server 可有对应多个 instance,因此 server 的上下线,同时接收到 server 和 instance 事件

server 节点是持久 znode,所以服务事件只有上线和 server enabled/disabled 变更

ShardingTotalCountChangedJobListener 监听作业配置分片数变更

另外,诊断服务也定时设置分片标记

分片

下面分析分片,分片就是分配任务到各个在线运行实例节点

分片入口方法 ShardingService.shardingIfNecessary,回顾一下作业执行,

分片在 getShardingContexgts 完成

优先处理失效分片,如果没有,调用 ShardingService.shardingIfNecessary 正常作业分片,最终分片封装到 ShardingContexts

分片完后,还有这么一段

getLocalTakeOffItems,获取分配了的失效转移的分片

防止运行实例在主节点分片后下线,然后又上线,该节点的分片很可能已转移给别的运行实例,因此需要进行移除,避免多节点运行相同的作业分片项。

*这里是不是应该直接丢弃所有分片,再看失效转移处理

循环分掉crashed实例所有的分片 

接下来分片代码

分片在主节点完成

LeaderService#isLeaderUntilBlock/blockUntilShardingCompleted 若非主节点等待分片完成

waitingOtherShardingItemCompleted 等待当前执行中的分片完成,依赖 monitor execution 配置

*分片运行依赖 monitorExecution 配置,也就是等不等待完成主要是看监控不监控,感觉主次没搞好;这里也反应了分片要严格按照配置要求,像失效转移分片改变需要另行处理

jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "") 标记分片处理中,

写入/leading/sharding/processing

*processing znode 只有在非主节点等待分片完成是用到,由于 necessary 必定存在,只为监控使用

resetShardingInfo

重写分片 znode,去掉/sharding/{itemNum}/instance 分片分配,剪掉之前可能多出的分片 znode

JobShardingStrategy spi 载入,有多种策略,属于基础(infra)包,细节就不分析

最后事务写入分片分配使用 PersistShardingInfoTransactionExecutionCallback

分片分配给那个作业运行实例

清理 znode,需要分片标记和分片处理中标记

Ø 谁使用:

- 作业执行 作业分片,获取分片信息

- 诊断服务 设置分片标记,处理分布式环境下,离线作业运行实例未处理分片

Ø 依赖服务:

- znode 存储服务/注册中心 znode 节点存取,监听器注册

- 选主服务 分片需主节点执行

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。