欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

【Flink调优】【第一章】【资源配置调优】

时间:2023-05-02
资源配置调优

1、内存设置

1.1 TaskManager内存模型1.2 生产资源配置 2、CPU资源的合理使用

2.1 使用DefaultResourceCalculator策略2.2 使用DominantResourceCalculator策略2.3 使用DominantResourceCalculator策略并指定core个数总结 3、并行度设置

3.1 全局并行度计算案例3.2 Source端并行度配置3.3 Transform端并行度配置3.4 Sink端并行度配置



1、内存设置 1.1 TaskManager内存模型


JVM进程内存:
一般flink作业是提交给yarn的,建议配置进程内存;包含所有内存

1.JVM特定内存配置参数

jvm元空间内存一般不管执行开销内存

框架内存和Task内存

3.网络内存

4.托管内存

案例分析

1.2 生产资源配置

提交任务

从flink1.12版本,WebUI就提供了TaskManager的内存模型图


可以对比上面分析的内存占比,是一模一样的;

查看TaskManager日志

那么给一个flink job多少内存合适? 以及各个部分内存给多少

TaskManager默认给2G如下图,每块内存使用率 TaskManager的WebUI上都有;
如果每块内存占比都很高,可以调高整个JVM进程内存;并且流计算还需要考虑数据洪峰,需要保留一定余地对于管理内存,它的作用是提供k-v类型的状态来用的,说白了就是给RocksDB使用的,如果我们的状态后端没有使用RocksDB,这块内存给0对于其他的部分,哪块紧缺,提高哪一块的占比一般性建议:一个TaskManager在2-8G比较合适,可以先给2G做测试使用,然后看情况进行调整 2、CPU资源的合理使用 2.1 使用DefaultResourceCalculator策略

cpu资源主要指的就是core数量;
为什么并行度的调整,核心数没有跟着调整;

(1)容器数量

提交参数中,并行度设置的是5,每个TaskManager2个slot;
也就是说至少5个slot,任务才能跑,需要使用3个TaskManager,也就是3个容器;
JobManager自己也需要一个容器,所以一共占用4个容器;

(2) 每个容器的线程数(core数)

现象说明:

默认情况下,一个TaskManager有几个slot就会申请几个Core;我们再提交任务的时候指定了每个TaskManager2个slot,意味着每个TM申请2个Core,一共6个Core;加上JobManager 一共7个Core;为什么只有4个?

原因: Yarn调度器

对于公平调度器 不会有这种问题,而容量调度器 会有这种问题;
Hadoop除了四个 -site.xlm配置文件,还有一个capacity-scheduler.xml配置文件专门用于配置容量调度器:

其内部有一个资源计算器的配置:

因此如果我们需要调整容器的cpu core数,需要修改资源计算器;

2.2 使用DominantResourceCalculator策略

调整如下:

使用Dominant资源计算器;
他的计算规则:内存+cpu

调整完了记得将此文件分发以下,并且重启Yarn!

验证
yarn重启完毕后,重新提交job;现象如下,占用了4个container,7个cpu,符合之前的计算!

2.3 使用DominantResourceCalculator策略并指定core个数

在提交flink job的时候,强行指定yarn 容器的Vcore个数:


现象:

分析:
JobManager 强行只有一个线程;
指定的并行度5 ,每个TaskManager2个slot,因此需要3个TaskManager,也就是3个容器;
每个容器强行分配3个Vcore,所以TaskManager占9个Vcore
所以一共10个;

总结

除了调整slot 并发度意外,还可以指定多给点线程;
slot有共享机制,一个slot可能会有多个task,也就是说一个slot可能有一个到多个线程;

默认情况下,线程数和slot个数 1:1就够了;这里只是提供如何提高cpu资源的;

还需要注意一点就是在指定yarn 线程数的时候,不要超过yarn配置的默认值(8),如果还需要往上调,就需要修改yarn配置文件

3、并行度设置

并行度和内存一样没有固定的原则,和代码逻辑有很大关系;
并行度可以全局设置,也可以针对每一个算子进行设置;

并行度设置回顾
(1)配置文件指定的默认并行度 【1】
(2)job提交参数指定 -p
(3)代码中 env 指定全局并行度
(4)代码中算子后面指定算子级别并行度
优先级别:(4)> (3) > (2) > (1)

如何压测?

(1)开发完一个flink job之后,先进行压测,任务并行度给10以下,测试单个并行度的处理上限,观察有没有产生反压,产生反压说明某个地方出现瓶颈处理不过来;

查看产生反压的节点 或者 直接看source(source数据没有经过处理),看source的每个子任务(每个并行度)每秒钟能往下游发送多少数据,能够观察到单个并行度的处理上限

(2) 获取单个并行度的处理上限后, 总QPS/单并行度处理能力 = 并行度

要处理的数据源的总QPS应该是心中有数的,也就是每秒的数据量;不要用平均值,建议使用高峰期的,
比如:一个不大不小的公司,大概百万日活左右,一天有1亿左右的数据,平均1000多条每秒,高峰就在2000到20000左右,也就是平均在2m左右,高峰在20m/s; source端在产生反压的情况下 平均5000条/s,所以4个并行度刚刚好,一般再乘以1.2

如何压测?

(1)自己造数据往kafka写,由flink消费;从生产环境拉数据
(2)如果自己造数据量不够大,就先不让flink消费,让数据积压在kafka中一段时间,再开启flink程序泄洪!这样很容易达到反压

压测的意义?

不能只从QPS去获取并行度,因为每个人写的代码不一样、数据不一样(比如有的字段少,有的字段多,一条数据的处理繁杂程度不同),所以最好先压测,用高峰期的QPS,查看并行度平均处理上限,然后 并行度*1.2 富余一些资源;

对于kafka数据源的,并行度可以和topic分区数保持一致,简单验证一下大概能抗住 就ok了

一般压测给10以下就够了,除非是大厂,数据量能够达到PB级别,QPS及其高,并行度可能大几十甚至上百,默认最大并行度是128

3.1 全局并行度计算案例

全局并行度设置是个粗犷的设置方式;

UV代码案例

提交作业

在webUI上查看反压情况

flink1.13版本后,每个task都能查看反压情况(busy程度)早期版本,如下图所示:

有反压情况后,查看source

可以查看每个并行度的数据量:


可以看出来 单并行度瓶颈大概为7000条/s

生产环境高峰期QPS假设30000条/s

那么就给5个并行度往上就够了

3.2 Source端并行度配置

如果全局并行度设置感觉效果不好,或者有浪费,就需要更细粒度的设置;一些繁忙的算子并行度设置大些。

3.3 Transform端并行度配置

source 和 keyBy之间的算子

一般没有特别重的逻辑处理,保持和source一致即可;

keyBy之后的算子

为什么设置为2的次幂?和keyBy原理有关

keyBy的分区器用的是键组分区器,经过两次hash获取一个id值;

分区号 = id值 * 下游并行度 / 最大并行度值(128)

3.4 Sink端并行度配置

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。