0%

ExecutorService+Callable+Future实现多线程控制并发数并返回数据

一、开始

多任务并行执行阻塞等待全部执行完毕,通过线程池控制并发数。
前半部分为代码(按照执行流程罗列),后半部分为相关注释。

二、创建带有返回数据的Callable

1
2
3
4
5
6
7
8
9
10
public Callable<String> get(String key, Map<String, Object> params) {

return () -> {
System.out.println("开始查询=======" + params.toString());
Thread.sleep(500);
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("get任务耗时:" + (currentTimeMillis1 - currentTimeMillis) + "ms");
return key + "哈哈哈哈哈哈";
};
}

三、循环调用创建Callable<String>

1
2
3
4
5
6
7
8
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 30; i++) {

// ... params

Callable<String> callable = mSmsStatisticsService.get(i + "key", params);
tasks.add(callable);
}

四、创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* corePoolSize:核心线程数。
* maximumPoolSize:线程池允许创建的最大线程数。
* keepAliveTime:非核心线程闲置的超时时间。超过这个时间则回收。
* TimeUnit:keepAliveTime参数的时间单位。
* workQueue:任务队列。
* ThreadFactory:线程工厂,用于创建线程。
* RejectedExecutionHandler:饱和策略。
*
* ThreadPoolExecutor(int corePoolSize,
* int maximumPoolSize,
* long keepAliveTime,
* TimeUnit unit,
* BlockingQueue<Runnable> workQueue,
* ThreadFactory threadFactory,
* RejectedExecutionHandler handler)
*/
ExecutorService executorService = new ThreadPoolExecutor(5, 32,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
r -> {
Thread thread = new Thread(r);
// thread.setName("sdwfqin");
return thread;
}, new ThreadPoolExecutor.AbortPolicy());

五、启动线程并获取返回数据

1
2
3
4
5
6
List<String> resultList = new ArrayList<>();
// 启动线程并获取返回数据
List<Future<String>> futures = executorService.invokeAll(tasks);
for (Future<String> future : futures) {
resultList.add(future.get());
}

六、关闭线程池

1
executorService.shutdown();

七、上述提及API相关解释

  1. ExecutorService

    1. 基本方法

      1. submit: 提交的是Callable方法,返回Future,说明submit是有返回值的
      2. execute: 执行的是Runnable方法,没有返回值
      3. invokeAny: 接收一个包含Callable对象的集合作为参数。调用该方法不会返回Future对象,而是返回集合中某一个Callable对象的结果,而且无法保证调用之后返回的结果是一个Callable,只知道它是这些Callable中一个执行结束的Callable对象。
      4. invokeAll: 调用存在于参数集合中的所有Callable对象,并且返回一个包含Future对象的集合,可以通过这个返回的集合来管理每个Callable的执行结果。
      5. shutdown: 平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
    2. 拒绝策略

      1. ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 
      2. ThreadPoolExecutor.DiscardPolicy: 也是丢弃任务,但是不抛出异常。 
      3. ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列最前面的任务,执行后面的任务
      4. ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务 
    3. 阻塞队列

      1. ArrayBlockingQueue: 基于数组的先进先出队列,有界
      2. LinkedBlockingQueue: 基于链表的先进先出队列,无界
      3. SynchronousQueue: 无缓冲的等待队列,无界
      4. 什么是无界: 如果不指定容量,默认为Integer.MAX_VALUE
    4. 常用线程池

      1. Executors.newCachedThreadPool(): 可缓存线程池
      2. Executors.newSingleThreadExecutor(): 单线程池
      3. Executors.newFixedThreadPool(3): 固定线程数线程池
      4. Executors.newScheduledThreadPool(5): 固定线程数,支持定时和周期性任务
      5. ThreadPoolExecutor(): 手动创建
  2. Future

    1. 基本方法

      1. get: 当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕
      2. get(long timeout,TimeUnit unit): 最多等待timeout的时间就会返回结果
      3. cancel(boolean mayInterruptIfRunning): 可以用来停止一个任务,如果任务可以停止(通过mayInterruptIfRunning来进行判断),则可以返回true,如果任务已经完成或者已经停止,或者这个任务无法停止,则会返回false.
      4. isDone(): 判断当前方法是否完成
      5. isCancel(): 判断当前方法是否取消