欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Java主线程调度,ExecutorService执行任务

时间:2023-06-17
Java主线程调度,ExecutorService执行任务

在项目开发过程中,遇到了需要并发处理(例:调用百度API查询一批GPS的位置)的情况,但是http查询有QPS限制,因此只能指定一个固定的线程数执行。
但是Executors.newFixedThreadPool默认使用的是linkedBlockingQueue,因此大数据量的情况下,数据全部offer到队列很容易触发OOM。
如果配置了linkedBlockingQueue的capacity,则会触发ThreadPoolExecutor.RejectedExecutionHandler。

ThreadPoolExecutor.java

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //offer插入新任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //没有可用线程时处理 reject(command); }

通过查看RejectedExecutionHandler有以下实现:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 默认策略

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,一般是Executor.execute在主线程

看上去CallerRunsPolicy更符合要求,但是如果主线程也在执行任务,而http请求往往都有一定的响应时间。
假设某个子线程的任务已经完成了,主线程还在等待http响应就无法实时的给子程序调度任务。

因此CallerRunsPolicy也不适合。

下面试着换一下linkedBlockingQueue,使用SynchronousQueue。

ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new DefaultThreadFactory("test"), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 1000; i++) { executorService.execute(new Runnable() { @Override public void run() { try { //模拟http请求 Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }); }

触发了RejectedExecutionException

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task rejected from java.util.concurrent.ThreadPoolExecutor@5d20e46[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]

显然SynchronousQueue的offer在无人消费的情况下直接返回false。
所以只要让offer阻塞就可以了。

public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SyncQueue<>(), new DefaultThreadFactory("test"), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 1000; i++) { executorService.execute(new Runnable() { @Override public void run() { try { //模拟http请求 Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } public static class SyncQueue extends SynchronousQueue { @Override public boolean offer(T t) { try { //offer重试一次,成功返回true,失败返回false //put会一直等待 super.put(t); } catch (InterruptedException e) { throw new RuntimeException(); } return true; } }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。