什么是线程池?
线程池内部维护了若干线程,没有人物的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
线程池常用的类和接口
在Java标准库提供了如下几个类或接口,来操作并使用线程池:
- ExecutorService接口:来进行管理操作线程池;
- Executors类:用于创建线程池的工具类;
- ThreadPoolExecutor及其子类:线程池;
基本使用方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13// 线程池基本使用方式 // 创建一个ThreadPoolExecutor类型的对象,代表固定大小的线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 该线程池拥有3个线程 // 执行线程任务 executor.execute(task1); executor.execute(task2); executor.execute(task3); executor.execute(task4); executor.execute(task5); // 使用结束后,使用shutdown关闭线程池 executor.shutdown();
线程池的常用方法:
- 执行无返回值的线程任务:void exeutor(Runnable command);
- 提交有返回值的线程任务:Future<T> submit(Callable<T>task);
- 关闭线程池:void shutdown(); 或shutdown();
- 等待线程池关闭:boolean awaitTermination(long timeout,TimeUnit unit);
execute()只能提交Runnable类型的任务,没有返回值,而submit()既能提交Runnable类型的任务也能提交Callable类型任务,可以返回Future类型的结果,用于获取线程任务执行结果。
execute()方法提交的任务异常时直接抛出的,而submit()方法是捕获异常,当调用Future的get()方法获取返回值时,才会抛出异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// 计算1-100w的之间所有数字的累加和,每10w个数字交给1个线程处理 // 创建一个固定大小的线程池: ExecutorService executorService = Executors.newFixedThreadPool(4); // 创建集合,用于保存Future执行结果 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); // 每10w个数字,封装成一个Callable线程任务,并提交给线程池 for (int i = 0; i <= 900000; i += 100000) { Future<Integer> result = executorService.submit(new CalcTask(i+1, i + 100000)); futureList.add(result); } // 处理线程任务执行结果 try { int result = 0; for (Future<Integer> f : futureList) { result += f.get(); } System.out.println("计算结果" + result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 关闭线程池 // 省略.....
线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务;
当使用awaitTermination()方法时,主线程会处于一种等待的状态,按照指定的timeout检查线程池。
第一个参数指定的是时间,第二个参数指定的是时间单位(秒),返回值类型是boolean型。
- 如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,awaitTermination()返回true;
- 如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,awaitTermination()返回false;
- 如果等待时间没有超过指定时间,则继续等待。
该方法经常与shutdown()方法配合使用,用于检测线程池中的任务是否已经执行完毕:
1
2
3
4
5
6
7
8
9
10
11// 线程池已提交或执行若干个任务 // 关闭线程池:必须等待任务执行结束后,线程池才会关闭 executorService.shutdown(); // 每隔1秒钟,检查一次线程池的任务执行状态 while(!executorService.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("还没有关闭!"); } System.out.println("已关闭!");
线程池分类:
ExecutorService是一个线程池管理接口,Java标准库提供了几个常用线程池,创建这些线程池的方法都被封装到Executors工具类中。
- FixedThreadPool:线程数固定的线程池,使用Executors.newFixedThreadPool()创建;
- CachedThreadPool:线程数根据任务动态调整的线程池,使用Executors.newSingleThreadExecutor()创建;
- SingleThreadExecutor:仅提供一个单线程的线程池,使用Executors.newSingleThreadExecutor()创建;
- ScheduledThreadPool:能实现定时、周期性人物的线程池,使用Executors.newScheduledThreadPool()创建;
FixedThreadPool线程池
线程数固定的线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class Main { public static void main(String[] args) { // 创建一个固定大小的线程池: ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 6; i++) { executorService.execute(new Task("线程"+i)); } // 关闭线程池: executorService.shutdown(); } } class Task implements Runnable { private String taskName; public Task(String taskName) { this.taskName = taskName; } @Override public void run() { System.out.println("启动线程 ===> " + this.taskName); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("结束线程 <= " + this.taskName); } }
执行结果:
启动线程 ===> 线程2
启动线程 ===> 线程3
启动线程 ===> 线程0
启动线程 ===> 线程1
结束线程 <= 线程2
结束线程 <= 线程3
结束线程 <= 线程1
结束线程 <= 线程0
启动线程 ===> 线程5
启动线程 ===> 线程4
结束线程 <= 线程4
结束线程 <= 线程5
执行分析:
观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。
CachedThreadPool线程池
线程数根据任务动态调整的线程池
1ExecutorService executorService = Executors.newCachedThreadPool();
执行结果:
启动线程 => 线程1
启动线程 => 线程5
启动线程 => 线程2
启动线程 => 线程4
启动线程 => 线程0
启动线程 => 线程3
结束线程 <= 线程4
结束线程 <= 线程1
结束线程 <= 线程5
结束线程 <= 线程0
结束线程 <= 线程3
结束线程 <= 线程2
执行分析:
观察执行结果,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可以一次性全部同时执行。
ScheduledThreadPool线程池
能实现定时、周期性任务的线程:
放入ScheduledTrheadPool的任务可以定期反复执行:
创建ScheduledThreadPool定时任务线程池:
1
2ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
延迟3秒后执行,任务只执行一次:
1executorService.schedule(new Task("线程A"), 3, TimeUnit.SECONDS);
延迟2秒后,每隔3秒执行任务1次
1
2
3
4
5// 方式1:总是以固定的时间间隔触发,不管任务会执行多长时间; executorService.scheduleAtFixedRate(new Task("线程A"), 2,3, TimeUnit.SECONDS); // 方式2:上一次任务执行完,等待固定的时间间隔,在执行下一次任务 executorService.scheduleWithFixedDelay(new Task("线程A"), 2,3, TimeUnit.SECONDS);
线程池的执行流程
- 提交一个线程任务,线程池会在线程池中分配一个空闲线程,用于执行线程任务;
- 如果线程池中不存在空闲线程,则线程池会判断当前“存活的线程数”是否小于核心线程数corePoolSize。
- 如果小于核心线程数,线程池会创建一个核心线程去处理提交的线程任务。
- 如果大于核心线程数,则线程池会判断工作队列是否已满;
- 如果工作队列未满,则将该线程任务放入工作队列,等待线程池从工作队列取出并 执行;
- 如果工作队列已满,则判断线程数是否已经达到maximumPoolSize;
- 如果当前存活线程数没有达到maximumPoolSize,则创建一个非核心线程执行提交的任务;
- 如果当前存活线程数已经达到maximumPoolSize,还有新的任务被提交,直接采用拒绝策略处理;
线程池的配置参数
- corePoolSize:线程池核心线程数量,也可以理解为线程池维护的最小线程数量,核心线程创建后不会被回收。大于核心线程数的线程,在空闲时间超过了keepAliveTime后会被回收;
- maximumPoolSize:线程池最大线程数,线程池允许创建的最大线程数量;
- keepAliveTime:非核心线程存活时间,当一个可被回收的线程空闲时间大于keepAliveTime,就会被回收;
- TimeUnit:参数keepAliveTime的时间单位;
- blockingQueue:一个阻塞队列,用来存储等待执行的任务;
- RejectedExecutionHandler:拒绝策略,当线程池内的线程耗尽。并且工作队列达到已满时,新提交的任务,将使用拒绝策略进行处理;
阻塞工作队列BlockingQueue
BlockingQueue阻塞队列接口,实现机制是使用两条线程,允许两个线程同时操作队列,一个线程用于存储(生产者),一个线程用于取出(消费者)。当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列,在保证并发安全的同时,提高了队列的存取效率。
生产者消费者模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class RedPackage { private String rpid; private BigDecimal amount; public RedPackage() { this.rpid = UUID.randomUUID().toString(); this.amount = new BigDecimal(String.valueOf(Math.random()*10)); } public String getRpid() { return rpid; } public void setRpid(String rpid) { this.rpid = rpid; } public BigDecimal getAmount() { return amount; } public void setAmount(BigDecimal amount) { this.amount = amount; } @Override public String toString() { return "红包 [rpid=" + rpid + ", amount=" + amount + "]"; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Producer implements Runnable { private BlockingQueue<RedPackage> redPackageQueue; public Producer(BlockingQueue<RedPackage> queue) { this.redPackageQueue = queue; } @Override public void run() { while (true) { if(redPackageQueue.remainingCapacity() > 0) { RedPackage rpk = new RedPackage(); redPackageQueue.offer(rpk); System.out.println("[生产者] 已生成1个红包,并放入队列!" + rpk); }else if(redPackageQueue.remainingCapacity() == 0) { System.out.println("[生产者] 红包队列已满!"); } } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Consumer implements Runnable { private BlockingQueue<RedPackage> redPackageQueue; public Consumer(BlockingQueue<RedPackage> queue) { this.redPackageQueue = queue; } @Override public void run() { while (true) { RedPackage rpk = null; try { rpk = redPackageQueue.poll(1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if(rpk != null) { System.out.println("[消费者]线程" + Thread.currentThread().getName() + "抢到红包:" + rpk); }else { System.out.println("[消费者]很遗憾,线程" + Thread.currentThread().getName() + "没有抢到红包"); } } } }
常用BlockingQueue阻塞队列实现类
ArrayBlockingQueue
ArrayBlockingQueue是一个有界队列,基于数组实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,按照FIFO的方式排序:
1
2
3
4
5
6
7
8
9
10
11
12public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ // ArrayBlockingQueue使用定长数组做为存储结构 final Object[] items; // 创建时传入数组容量(长度) public ArrayBlockingQueue(int capacity) { this(capacity, false); } }
1
2
3
4
5// 使用ArrayBlockingQueue创建自定义线程池 ExecutorService executorService = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockinQueue有界队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于线程数量达到maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
LinkedBlockingQueue
LinkedBlockingQueue是一个无界队列,基于单向链表结构,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,LinkedBlockingQueue吞吐量通常要高于ArrayBlockingQueue。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 单向链表Node节点 static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } // 按照Integer.MAX_VALUE设置容量 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } }
FixedTheadPool、SingleThreadExecutor线程池使用LinkedBlockingQueue队列:
1
2
3
4
5
6
7
8
9
10
11
12
13public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corePoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意任务与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
DelayWorkQueue
DelayWorkQueue是基于堆结构的延迟队列,基于数组实现,初始容量16,leader线程用于获取堆顶元素。该队列根据指定的延迟时间从小到大排序,如果延迟时间相同,则根据插入队列的先后排序。
1
2
3
4
5
6static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private Thread leader = null; }
ScheduledThreadPool
线程池使用了这个队列:
1
2
3
4
5
6
7
8
9
10public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //..... public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
PriorityBlockingQueue(优先任务队列 )
PriorityBlockingQueue
是一个基于优先级的无界队列(优先级的判断通过构造函数传入的Compator
或元素实现Comparable
接口来决定)。
注意:PriorityBlockingQueue
并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36class PayOrder implements Runnable,Comparable<PayOrder> { private int orderNo; // 订单编号 private BigDecimal payment; // 支付金额 public PayOrder(int orderNo, BigDecimal payment) { this.orderNo = orderNo; this.payment = payment; } @Override public int compareTo(PayOrder o) { return this.payment.compareTo(o.payment); } @Override public void run() { System.out.printf("订单编号为%d,订单金额为:¥%.1f的订单已完成支付!【%s】n",orderNo,payment,Thread.currentThread().getName()); } public int getOrderNo() { return orderNo; } public void setOrderNo(int orderNo) { this.orderNo = orderNo; } public BigDecimal getPayment() { return payment; } public void setPayment(BigDecimal payment) { this.payment = payment; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class Main { public static void main(String[] args) throws InterruptedException { // 使用PriorityBlockingQueue创建线程池,核心线程数为2 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 20, 10, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()); // 执行10笔订单的支付任务 pool.execute(new PayOrder(1,new BigDecimal("1943"))); pool.execute(new PayOrder(2,new BigDecimal("7894"))); pool.execute(new PayOrder(3,new BigDecimal("3253"))); pool.execute(new PayOrder(4,new BigDecimal("1353"))); pool.execute(new PayOrder(5,new BigDecimal("6344"))); pool.execute(new PayOrder(6,new BigDecimal("5430"))); pool.execute(new PayOrder(7,new BigDecimal("3574"))); pool.execute(new PayOrder(8,new BigDecimal("3673"))); pool.execute(new PayOrder(9,new BigDecimal("8653"))); pool.execute(new PayOrder(10,new BigDecimal("1100"))); // 关闭线程池 pool.shutdown(); } }
SynchronousQueue(同步队列)
不存储元素的阻塞队列(内部没有保存元素的数据结构容器),每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
。
CachedThreadPool
线程池使用这个队列。
1
2
3
4
5public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class Main { public static void main(String[] args) { // maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略(直接抛出异常) ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy()); // 执行的线程任务大于maximumPoolSize,执行拒绝策略 for (int i = 1; i <= 3; i++) { pool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "被执行!"); } }); } // 关闭线程池 pool.shutdown(); } }
当任务队列为SynchronousQueue
,创建的线程数大于maximumPoolSize
时,直接执行了拒绝策略抛出异常。
使用SynchronousQueue
队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize
,则尝试创建新的线程,如果达到maximumPoolSize
设置的最大值,则根据你设置的handler
执行拒绝策略。
因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize
数量,否则很容易就会执行拒绝策略;
线程池的状态
线程池的状态分为:running、shutdown、stop、tidying、terminated
- running:运行状态,线程池被一旦创建,就处于running状态,并且线程池中的任务数为0.该状态的线程池会接收新任务,并处理工作队列中的任务。
- shutdown:关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;
- stop:停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- tidying:整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为0;
- terminated:该状态表示线程池彻底终止;
线程池分类总结
FixedThreadPool
线程数固定的线程池
- 核心线程数和最大线程数一致
- 非核心线程数空闲存活时间,即keepAliveTime为0
- 阻塞队列为无界队列
工作机制:
a、提交线程任务
b、如果线程数少于核心线程,创建核心线程任务
c、如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
d、如果线程执行完任务,去阻塞队列取任务,继续执行
使用场景: 适用于处理CPU
密集型的任务,确保CPU
在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
CachedThreadPool
可缓存线程池,线程数根据任务动态调整的线程池
- 线程池参数:
-
- 核心线程数为
0
- 最大线程数为
Integer.MAX_VALUE
- 工作队列是
SynchronousQueue
同步队列 - 非核心线程空闲存活时间为
60
秒
- 核心线程数为
- 工作机制:
-
- 提交线程任务
- 因为核心线程数为
0
,所以任务直接加到SynchronousQueue
工作队列 - 判断是否有空闲线程,如果有,就去取出任务执行
- 如果没有空闲线程,就新建一个线程执行
- 执行完任务的线程,还可以存活
60
秒,如果在这期间,接到任务,可以继续存活下去;否则,被销毁。
- 使用场景: 用于并发执行大量短期的小任务。
SingleThreadExecutor
单线程化的线程池
- 线程池参数:
-
- 核心线程数为
1
- 最大线程数也为
1
- 阻塞队列是
LinkedBlockingQueue
- 非核心线程空闲存活时间为
0
秒
- 核心线程数为
- 使用场景: 适用于串行执行任务的场景,将任务按顺序执行。
ScheduledThreadPool
能实现定时、周期性任务的线程池
- 线程池参数:
- 最大线程数为
Integer.MAX_VALUE
- 阻塞队列是
DelayedWorkQueue
keepAliveTime
为0
- 阻塞队列是
- 使用场景: 周期性执行任务,并且需要限制线程数量的需求场景。
最后
以上就是温柔黑夜最近收集整理的关于什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类 线程池的状态线程池分类总结的全部内容,更多相关什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类内容请搜索靠谱客的其他文章。
发表评论 取消回复