欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Dubbo消费端线程池模型源码分析

时间:2023-07-10
前言

在Dubbo官方文档《消费端线程池模型》中提到2.7.5版本改进了消费端线程池模型,通过复用业务端被阻塞的线程,解决消费端线程数分配过多的问题

2.7.5版本引入的线程池模型

2.1 业务线程发出请求,拿到一个Future实例
2.2 在调用future.get()之前,先调用ThreadlessExecutor.wait(),wait会使业务线程在一个阻塞队列上等待,直到队列中被加入元素

下面以dubbo3.0.5的版本进行源码调试验证

Dubbo客户端远程调用

入口在InvokerInvocationHandler#invoke(只展示主要步骤)
1.1 invoker.invoke(rpcInvocation) 发起远程调用(详细见下面分析)
1.2 recreate() 处理响应(AsyncRpcResult#recreate)
(1)调用future.get()获取响应信息AppResponse
(2)返回AppResponse的result

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {return invoker.invoke(rpcInvocation).recreate();}

发起远程调用入口在AbstractInvoker#invoke

public Result invoke(Invocation inv) throws RpcException {// do invoke rpc invocation and return async result AsyncRpcResult asyncResult = doInvokeAndReturn(invocation); // wait rpc result if sync --> ThreadlessExecutor#waitAndDrain waitForResultIfSync(asyncResult, invocation);}

2.1 发起rpc请求并返回异步结果(DubboInvoker#doInvoke)
(1)通过channel发送Request请求
(2)将channel、Request请求以及ThreadlessExecutor封装为DefaultFuture进行返回(详细见获取响应分析)
(3)thenApply异步将结果将转换为AppResponse

protected Result doInvoke(final Invocation invocation) throws Throwable {// 返回new ThreadlessExecutor(sharedExecutor)ExecutorService executor = getCallbackExecutor(getUrl(), inv);// 通过netty的channel发送Request请求(HeaderExchangeChannel#request)CompletableFuture appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);}

2.2 如果是同步调用,则等待rpc返回
(1)取ThreadlessExecutor阻塞队列queue的任务,没有则阻塞(take会阻塞)
(2)执行任务(会循环将所有任务执行完,poll取任务,没有任务返回null -> ThreadlessExecutor$RunnableWrapper#run -> ChannelEventRunnable#run)
(3)通过DecodeHandler#received进行反序列
(4)DefaultFuture#received调用CompletableFuture的complete方法将反序列化的结果安全写入result属性中

// ThreadlessExecutor#waitAndDrainpublic void waitAndDrain() throws InterruptedException { Runnable runnable; try { // 当阻塞队列里没任务时,take方法会阻塞 runnable = queue.take(); }catch (InterruptedException e){ waiting = false; throw e; }synchronized (lock) { waiting = false; runnable.run(); }// 将阻塞队列里的所有任务都执行完,达到复用的作用 runnable = queue.poll(); while (runnable != null) { runnable.run(); runnable = queue.poll(); }}

获取响应 DefaultFuture#newFuture将请求ID与channel以及DefaultFuture建立绑定关系

public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {// 将请求ID与channel 以及 ID与DefaultFuture 建立绑定关系 final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); // ThreadlessExecutor needs to hold the waiting future in case of circuit return. if (executor instanceof ThreadlessExecutor) { ((ThreadlessExecutor) executor).setWaitingFuture(future); } // 启动超时定时任务TimeoutCheckTask timeoutCheck(future); return future;}

获取响应入口:NettyClientHandler#channelRead
2.1 收到响应后,最终触发AllChannelHandler#received将返回消息封装成ChannelEventRunnable任务放入阻塞队列queue中

public void received(Channel channel, Object message) throws RemotingException {// 如果DefaultFuture.getFuture(response.getId())不为空,则获取其Executor(ThreadlessExecutor)ExecutorService executor = getPreferredExecutorService(message);// ThreadlessExecutor#execute 将任务添加到阻塞队列queue中executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));}

总结 通过上述分析,Dubbo通过将响应封装ChannelEventRunnable任务放入阻塞队列,在获取响应时将执行阻塞队列里的所有任务,解决消费端线程数分配过多的问题主要涉及到知识点:netty异步通信以及CompletableFuture异步处理
2.1 通过netty的channel发送消息
2.2 将请求ID与channel以及异步结果CompletableFuture绑定
2.3 netty接收到返回时,根据响应ID找到对应CompletableFuture,将结果写入到CompletableFuture中

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。