1、内存设置
1.1 TaskManager内存模型1.2 生产资源配置 2、CPU资源的合理使用
2.1 使用DefaultResourceCalculator策略2.2 使用DominantResourceCalculator策略2.3 使用DominantResourceCalculator策略并指定core个数总结 3、并行度设置
3.1 全局并行度计算案例3.2 Source端并行度配置3.3 Transform端并行度配置3.4 Sink端并行度配置
JVM进程内存:
一般flink作业是提交给yarn的,建议配置进程内存;包含所有内存
1.JVM特定内存配置参数
jvm元空间内存一般不管执行开销内存
框架内存和Task内存
3.网络内存
4.托管内存
1.2 生产资源配置案例分析
提交任务
从flink1.12版本,WebUI就提供了TaskManager的内存模型图
可以对比上面分析的内存占比,是一模一样的;
查看TaskManager日志
那么给一个flink job多少内存合适? 以及各个部分内存给多少
TaskManager默认给2G如下图,每块内存使用率 TaskManager的WebUI上都有;
如果每块内存占比都很高,可以调高整个JVM进程内存;并且流计算还需要考虑数据洪峰,需要保留一定余地对于管理内存,它的作用是提供k-v类型的状态来用的,说白了就是给RocksDB使用的,如果我们的状态后端没有使用RocksDB,这块内存给0对于其他的部分,哪块紧缺,提高哪一块的占比一般性建议:一个TaskManager在2-8G比较合适,可以先给2G做测试使用,然后看情况进行调整 2、CPU资源的合理使用 2.1 使用DefaultResourceCalculator策略
cpu资源主要指的就是core数量;
为什么并行度的调整,核心数没有跟着调整;
(1)容器数量
提交参数中,并行度设置的是5,每个TaskManager2个slot;
也就是说至少5个slot,任务才能跑,需要使用3个TaskManager,也就是3个容器;
JobManager自己也需要一个容器,所以一共占用4个容器;
(2) 每个容器的线程数(core数)
现象说明:
默认情况下,一个TaskManager有几个slot就会申请几个Core;我们再提交任务的时候指定了每个TaskManager2个slot,意味着每个TM申请2个Core,一共6个Core;加上JobManager 一共7个Core;为什么只有4个?
原因: Yarn调度器
对于公平调度器 不会有这种问题,而容量调度器 会有这种问题;
Hadoop除了四个 -site.xlm配置文件,还有一个capacity-scheduler.xml配置文件专门用于配置容量调度器:
其内部有一个资源计算器的配置:
因此如果我们需要调整容器的cpu core数,需要修改资源计算器;
调整如下:
使用Dominant资源计算器;
他的计算规则:内存+cpu
调整完了记得将此文件分发以下,并且重启Yarn!
验证
yarn重启完毕后,重新提交job;现象如下,占用了4个container,7个cpu,符合之前的计算!
在提交flink job的时候,强行指定yarn 容器的Vcore个数:
现象:
分析:
JobManager 强行只有一个线程;
指定的并行度5 ,每个TaskManager2个slot,因此需要3个TaskManager,也就是3个容器;
每个容器强行分配3个Vcore,所以TaskManager占9个Vcore
所以一共10个;
除了调整slot 并发度意外,还可以指定多给点线程;
slot有共享机制,一个slot可能会有多个task,也就是说一个slot可能有一个到多个线程;
默认情况下,线程数和slot个数 1:1就够了;这里只是提供如何提高cpu资源的;
还需要注意一点就是在指定yarn 线程数的时候,不要超过yarn配置的默认值(8),如果还需要往上调,就需要修改yarn配置文件
3、并行度设置 并行度和内存一样没有固定的原则,和代码逻辑有很大关系;
并行度可以全局设置,也可以针对每一个算子进行设置;
并行度设置回顾
(1)配置文件指定的默认并行度 【1】
(2)job提交参数指定 -p
(3)代码中 env 指定全局并行度
(4)代码中算子后面指定算子级别并行度
优先级别:(4)> (3) > (2) > (1)
如何压测?
(1)开发完一个flink job之后,先进行压测,任务并行度给10以下,测试单个并行度的处理上限,观察有没有产生反压,产生反压说明某个地方出现瓶颈处理不过来;
查看产生反压的节点 或者 直接看source(source数据没有经过处理),看source的每个子任务(每个并行度)每秒钟能往下游发送多少数据,能够观察到单个并行度的处理上限
(2) 获取单个并行度的处理上限后, 总QPS/单并行度处理能力 = 并行度
要处理的数据源的总QPS应该是心中有数的,也就是每秒的数据量;不要用平均值,建议使用高峰期的,
比如:一个不大不小的公司,大概百万日活左右,一天有1亿左右的数据,平均1000多条每秒,高峰就在2000到20000左右,也就是平均在2m左右,高峰在20m/s; source端在产生反压的情况下 平均5000条/s,所以4个并行度刚刚好,一般再乘以1.2
如何压测?
(1)自己造数据往kafka写,由flink消费;从生产环境拉数据
(2)如果自己造数据量不够大,就先不让flink消费,让数据积压在kafka中一段时间,再开启flink程序泄洪!这样很容易达到反压
压测的意义?
不能只从QPS去获取并行度,因为每个人写的代码不一样、数据不一样(比如有的字段少,有的字段多,一条数据的处理繁杂程度不同),所以最好先压测,用高峰期的QPS,查看并行度平均处理上限,然后 并行度*1.2 富余一些资源;
对于kafka数据源的,并行度可以和topic分区数保持一致,简单验证一下大概能抗住 就ok了
3.1 全局并行度计算案例一般压测给10以下就够了,除非是大厂,数据量能够达到PB级别,QPS及其高,并行度可能大几十甚至上百,默认最大并行度是128
全局并行度设置是个粗犷的设置方式;
UV代码案例
提交作业
在webUI上查看反压情况
flink1.13版本后,每个task都能查看反压情况(busy程度)早期版本,如下图所示:
有反压情况后,查看source
可以查看每个并行度的数据量:
可以看出来 单并行度瓶颈大概为7000条/s
生产环境高峰期QPS假设30000条/s
那么就给5个并行度往上就够了
3.2 Source端并行度配置如果全局并行度设置感觉效果不好,或者有浪费,就需要更细粒度的设置;一些繁忙的算子并行度设置大些。
3.3 Transform端并行度配置source 和 keyBy之间的算子
一般没有特别重的逻辑处理,保持和source一致即可;
keyBy之后的算子
为什么设置为2的次幂?和keyBy原理有关
keyBy的分区器用的是键组分区器,经过两次hash获取一个id值;
分区号 = id值 * 下游并行度 / 最大并行度值(128)
3.4 Sink端并行度配置