欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

源码学习Dubbo的线程模型

时间:2023-07-28
前言 本文章从官网介绍的Dubbo线程模型结论出发,了解dubbo是怎样暴露netty服务,接收请求后怎样提交到业务线程池了解dubbo怎样提供Dispatcher的扩展以及ThreadPool的扩展怎样去修改dispatcher和threadpool的默认配置,线程数配置的思考以及出现问题后如何排查 Dubbo线程模型介绍 如果事件处理的逻辑能迅速完成,并且不会发起新的IO请求,则直接在IO线程上处理更快但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,则必须派发到线程池
暴露服务

监听到Spring启动事件ContextRefreshedEvent后,暴露服务

遍历所有服务进行暴露

2.1 暴露服务会同时暴露本地和远程服务(ServiceConfig#exportUrl)
(1)本地服务会将url的host改为127.0.0.1,port改为0,protocol改为injvm
(2)暴露远程服务时,以url的address维度进行暴露(创建服务 DubboProtocol#openServer)

private void openServer(URL url) {// 192.168.182.1:20880String key = url.getAddress();ProtocolServer server = serverMap.get(key);if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); }else { server.reset(url); } } }}

2.2 通过初始化NettyServer,然后调用其doOpen开启Netty远程服务
(1)NettyServer会初始化一个包装handler,用来处理用户请求事件

// NettyServer构造方法 --> ChannelHandlers#wrap --> ChannelHandlers#wrapInternal(例如HeartbeatHandler包装AllChannelHandler)protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultframeworkModel().getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url)));}

(2)从url中获取dispather参数,默认为all。然后根据扩展参数在org.apache.dubbo.remoting.Dispatcher扩展文件中相应的扩展,通过Dispatcher的dispatch分发handler

(3)NettyServer服务端会按照端口的维护根据url创建一个业务线程池

// DefaultExecutorRepositorypublic synchronized ExecutorService createExecutorIfAbsent(URL url) {Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();URL finalUrl = url;// 从url中获取threadpool参数,默认为fixed --> 根据扩展名在org.apache.dubbo.common.threadpool.ThreadPool文件中找相应的扩展 ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(finalUrl));}

处理用户请求 NettyServerHandler继承netty的ChannelDuplexHandler,在处理netty事件时,转交给handler属性进行处理

protected void doOpen() throws Throwable {// 将NettyServer作为NettyServerHandler的handler属性进行封装final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);bootstrap.group(bossGroup, workerGroup) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }}

direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行
2.1 配置Dispatcher为direct

2.2 DirectChannelHandler处理消息

public void received(Channel channel, Object message) throws RemotingException {// 获取线程池(一般不属于ThreadlessExecutor,所以不会交给业务线程池执行)ExecutorService executor = getPreferredExecutorService(message); if (executor instanceof ThreadlessExecutor) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } else { handler.received(channel, message); }}

扩展业务线程池的线程数以及队列大小
3.1 以FixedThreadPool为例,默认线程数为200,队列大小为0

3.2 通过下面的配置可以扩展线程数和队列大小


3.3 线程池不是配置越大越好,与系统层面以及配置JVM参数相关
(1)JVM参数:-Xms 初识堆大小 -Xmx最大堆大小 -Xss 每个线程栈大小
(2)系统最大可创建的线程
(3)公式线程数量 = (机器本身可用内存 -(JVM分配的堆内存 + JVM元数据去))/Xss的值 扩展

《记一次敖丙dubbo线程池事故排查》中提到线程池耗尽时伴随一次较久的ygc
1.1 IO高为什么会导致GC时间长
(1)JVM GC需要通过发起系统调用write(),来记录GC行为
(2)write()调用可以被后台磁盘IO所阻塞
(3)记录GC日志属于JVM停顿的一部分,因此write()调用时间会被计算在JVM STW的停顿时间内

1.2 在告警时间段发现磁盘IO呈现不间断锯齿状
(1)磁盘IO较高,导致cpu_iowait变高,cpu等IO,瞬间分配不到时间片,rt就会抖动

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。