我是靠谱客的博主 温柔黑夜,这篇文章主要介绍什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类   线程池的状态线程池分类总结,现在分享给大家,希望可以做个参考。

什么是线程池?

       线程池内部维护了若干线程,没有人物的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

线程池常用的类和接口

         在Java标准库提供了如下几个类或接口,来操作并使用线程池:

  • ExecutorService接口:来进行管理操作线程池;
  • Executors类:用于创建线程池的工具类;
  • ThreadPoolExecutor及其子类:线程池;

基本使用方法如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
// 线程池基本使用方式 // 创建一个ThreadPoolExecutor类型的对象,代表固定大小的线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 该线程池拥有3个线程 // 执行线程任务 executor.execute(task1); executor.execute(task2); executor.execute(task3); executor.execute(task4); executor.execute(task5); // 使用结束后,使用shutdown关闭线程池 executor.shutdown();

线程池的常用方法: 

  •   执行无返回值的线程任务:void exeutor(Runnable command);
  •   提交有返回值的线程任务:Future<T> submit(Callable<T>task);
  • 关闭线程池:void shutdown(); 或shutdown();

  • 等待线程池关闭:boolean awaitTermination(long timeout,TimeUnit unit);  

        execute()只能提交Runnable类型的任务,没有返回值,而submit()既能提交Runnable类型的任务也能提交Callable类型任务,可以返回Future类型的结果,用于获取线程任务执行结果。

       execute()方法提交的任务异常时直接抛出的,而submit()方法是捕获异常,当调用Future的get()方法获取返回值时,才会抛出异常。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 计算1-100w的之间所有数字的累加和,每10w个数字交给1个线程处理 // 创建一个固定大小的线程池: ExecutorService executorService = Executors.newFixedThreadPool(4); // 创建集合,用于保存Future执行结果 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); // 每10w个数字,封装成一个Callable线程任务,并提交给线程池 for (int i = 0; i <= 900000; i += 100000) { Future<Integer> result = executorService.submit(new CalcTask(i+1, i + 100000)); futureList.add(result); } // 处理线程任务执行结果 try { int result = 0; for (Future<Integer> f : futureList) { result += f.get(); } System.out.println("计算结果" + result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 关闭线程池 // 省略.....

      线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务;

       当使用awaitTermination()方法时,主线程会处于一种等待的状态,按照指定的timeout检查线程池。

     第一个参数指定的是时间,第二个参数指定的是时间单位(秒),返回值类型是boolean型。

  •  如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,awaitTermination()返回true;
  •   如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,awaitTermination()返回false;
  • 如果等待时间没有超过指定时间,则继续等待。

该方法经常与shutdown()方法配合使用,用于检测线程池中的任务是否已经执行完毕:
 

复制代码
1
2
3
4
5
6
7
8
9
10
11
// 线程池已提交或执行若干个任务 // 关闭线程池:必须等待任务执行结束后,线程池才会关闭 executorService.shutdown(); // 每隔1秒钟,检查一次线程池的任务执行状态 while(!executorService.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("还没有关闭!"); } System.out.println("已关闭!");

线程池分类: 

 ExecutorService是一个线程池管理接口,Java标准库提供了几个常用线程池,创建这些线程池的方法都被封装到Executors工具类中。

  • FixedThreadPool:线程数固定的线程池,使用Executors.newFixedThreadPool()创建;
  • CachedThreadPool:线程数根据任务动态调整的线程池,使用Executors.newSingleThreadExecutor()创建;
  • SingleThreadExecutor:仅提供一个单线程的线程池,使用Executors.newSingleThreadExecutor()创建;
  • ScheduledThreadPool:能实现定时、周期性人物的线程池,使用Executors.newScheduledThreadPool()创建;

FixedThreadPool线程池

   线程数固定的线程池:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Main { public static void main(String[] args) { // 创建一个固定大小的线程池: ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 6; i++) { executorService.execute(new Task("线程"+i)); } // 关闭线程池: executorService.shutdown(); } } class Task implements Runnable { private String taskName; public Task(String taskName) { this.taskName = taskName; } @Override public void run() { System.out.println("启动线程 ===> " + this.taskName); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("结束线程 <= " + this.taskName); } }

执行结果:

启动线程 ===> 线程2
启动线程 ===> 线程3
启动线程 ===> 线程0
启动线程 ===> 线程1
结束线程 <= 线程2
结束线程 <= 线程3
结束线程 <= 线程1
结束线程 <= 线程0
启动线程 ===> 线程5
启动线程 ===> 线程4
结束线程 <= 线程4
结束线程 <= 线程5 

执行分析:

   观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

CachedThreadPool线程池

  线程数根据任务动态调整的线程池

复制代码
1
ExecutorService executorService = Executors.newCachedThreadPool();

执行结果:

启动线程 => 线程1
启动线程 => 线程5
启动线程 => 线程2
启动线程 => 线程4
启动线程 => 线程0
启动线程 => 线程3
结束线程 <= 线程4
结束线程 <= 线程1
结束线程 <= 线程5
结束线程 <= 线程0
结束线程 <= 线程3
结束线程 <= 线程2 

执行分析:

观察执行结果,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可以一次性全部同时执行。

ScheduledThreadPool线程池

能实现定时、周期性任务的线程:

放入ScheduledTrheadPool的任务可以定期反复执行:

创建ScheduledThreadPool定时任务线程池:

复制代码
1
2
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

 延迟3秒后执行,任务只执行一次:
 

复制代码
1
executorService.schedule(new Task("线程A"), 3, TimeUnit.SECONDS);

延迟2秒后,每隔3秒执行任务1次

 

复制代码
1
2
3
4
5
// 方式1:总是以固定的时间间隔触发,不管任务会执行多长时间; executorService.scheduleAtFixedRate(new Task("线程A"), 2,3, TimeUnit.SECONDS); // 方式2:上一次任务执行完,等待固定的时间间隔,在执行下一次任务 executorService.scheduleWithFixedDelay(new Task("线程A"), 2,3, TimeUnit.SECONDS);

线程池的执行流程

  • 提交一个线程任务,线程池会在线程池中分配一个空闲线程,用于执行线程任务;
  • 如果线程池中不存在空闲线程,则线程池会判断当前“存活的线程数”是否小于核心线程数corePoolSize。
  •  如果小于核心线程数,线程池会创建一个核心线程去处理提交的线程任务。
  •  如果大于核心线程数,则线程池会判断工作队列是否已满;
  •  如果工作队列未满,则将该线程任务放入工作队列,等待线程池从工作队列取出并                    执行;
  • 如果工作队列已满,则判断线程数是否已经达到maximumPoolSize;
  • 如果当前存活线程数没有达到maximumPoolSize,则创建一个非核心线程执行提交的任务;
  • 如果当前存活线程数已经达到maximumPoolSize,还有新的任务被提交,直接采用拒绝策略处理;

线程池的配置参数

  • corePoolSize:线程池核心线程数量,也可以理解为线程池维护的最小线程数量,核心线程创建后不会被回收。大于核心线程数的线程,在空闲时间超过了keepAliveTime后会被回收;
  • maximumPoolSize:线程池最大线程数,线程池允许创建的最大线程数量;
  • keepAliveTime:非核心线程存活时间,当一个可被回收的线程空闲时间大于keepAliveTime,就会被回收;
  • TimeUnit:参数keepAliveTime的时间单位;
  • blockingQueue:一个阻塞队列,用来存储等待执行的任务;
  • RejectedExecutionHandler:拒绝策略,当线程池内的线程耗尽。并且工作队列达到已满时,新提交的任务,将使用拒绝策略进行处理;

阻塞工作队列BlockingQueue

       BlockingQueue阻塞队列接口,实现机制是使用两条线程,允许两个线程同时操作队列,一个线程用于存储(生产者),一个线程用于取出(消费者)。当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列,在保证并发安全的同时,提高了队列的存取效率。

生产者消费者模型:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RedPackage { private String rpid; private BigDecimal amount; public RedPackage() { this.rpid = UUID.randomUUID().toString(); this.amount = new BigDecimal(String.valueOf(Math.random()*10)); } public String getRpid() { return rpid; } public void setRpid(String rpid) { this.rpid = rpid; } public BigDecimal getAmount() { return amount; } public void setAmount(BigDecimal amount) { this.amount = amount; } @Override public String toString() { return "红包 [rpid=" + rpid + ", amount=" + amount + "]"; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Producer implements Runnable { private BlockingQueue<RedPackage> redPackageQueue; public Producer(BlockingQueue<RedPackage> queue) { this.redPackageQueue = queue; } @Override public void run() { while (true) { if(redPackageQueue.remainingCapacity() > 0) { RedPackage rpk = new RedPackage(); redPackageQueue.offer(rpk); System.out.println("[生产者] 已生成1个红包,并放入队列!" + rpk); }else if(redPackageQueue.remainingCapacity() == 0) { System.out.println("[生产者] 红包队列已满!"); } } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer implements Runnable { private BlockingQueue<RedPackage> redPackageQueue; public Consumer(BlockingQueue<RedPackage> queue) { this.redPackageQueue = queue; } @Override public void run() { while (true) { RedPackage rpk = null; try { rpk = redPackageQueue.poll(1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if(rpk != null) { System.out.println("[消费者]线程" + Thread.currentThread().getName() + "抢到红包:" + rpk); }else { System.out.println("[消费者]很遗憾,线程" + Thread.currentThread().getName() + "没有抢到红包"); } } } }

常用BlockingQueue阻塞队列实现类
  

ArrayBlockingQueue

      ArrayBlockingQueue是一个有界队列,基于数组实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,按照FIFO的方式排序:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ // ArrayBlockingQueue使用定长数组做为存储结构 final Object[] items; // 创建时传入数组容量(长度) public ArrayBlockingQueue(int capacity) { this(capacity, false); } }
复制代码
1
2
3
4
5
// 使用ArrayBlockingQueue创建自定义线程池 ExecutorService executorService = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

        使用ArrayBlockinQueue有界队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于线程数量达到maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

LinkedBlockingQueue

        LinkedBlockingQueue是一个无界队列,基于单向链表结构,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,LinkedBlockingQueue吞吐量通常要高于ArrayBlockingQueue。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 单向链表Node节点 static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } // 按照Integer.MAX_VALUE设置容量 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } }

      FixedTheadPool、SingleThreadExecutor线程池使用LinkedBlockingQueue队列:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

      使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corePoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意任务与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

DelayWorkQueue

      DelayWorkQueue是基于堆结构的延迟队列,基于数组实现,初始容量16,leader线程用于获取堆顶元素。该队列根据指定的延迟时间从小到大排序,如果延迟时间相同,则根据插入队列的先后排序。

复制代码
1
2
3
4
5
6
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private Thread leader = null; }

ScheduledThreadPool线程池使用了这个队列:

复制代码
1
2
3
4
5
6
7
8
9
10
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //..... public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

PriorityBlockingQueue(优先任务队列 )

PriorityBlockingQueue是一个基于优先级的无界队列(优先级的判断通过构造函数传入的Compator或元素实现Comparable接口来决定)。

注意:PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class PayOrder implements Runnable,Comparable<PayOrder> { private int orderNo; // 订单编号 private BigDecimal payment; // 支付金额 public PayOrder(int orderNo, BigDecimal payment) { this.orderNo = orderNo; this.payment = payment; } @Override public int compareTo(PayOrder o) { return this.payment.compareTo(o.payment); } @Override public void run() { System.out.printf("订单编号为%d,订单金额为:¥%.1f的订单已完成支付!【%s】n",orderNo,payment,Thread.currentThread().getName()); } public int getOrderNo() { return orderNo; } public void setOrderNo(int orderNo) { this.orderNo = orderNo; } public BigDecimal getPayment() { return payment; } public void setPayment(BigDecimal payment) { this.payment = payment; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Main { public static void main(String[] args) throws InterruptedException { // 使用PriorityBlockingQueue创建线程池,核心线程数为2 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 20, 10, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()); // 执行10笔订单的支付任务 pool.execute(new PayOrder(1,new BigDecimal("1943"))); pool.execute(new PayOrder(2,new BigDecimal("7894"))); pool.execute(new PayOrder(3,new BigDecimal("3253"))); pool.execute(new PayOrder(4,new BigDecimal("1353"))); pool.execute(new PayOrder(5,new BigDecimal("6344"))); pool.execute(new PayOrder(6,new BigDecimal("5430"))); pool.execute(new PayOrder(7,new BigDecimal("3574"))); pool.execute(new PayOrder(8,new BigDecimal("3673"))); pool.execute(new PayOrder(9,new BigDecimal("8653"))); pool.execute(new PayOrder(10,new BigDecimal("1100"))); // 关闭线程池 pool.shutdown(); } }

 

SynchronousQueue(同步队列)

不存储元素的阻塞队列(内部没有保存元素的数据结构容器),每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue

CachedThreadPool线程池使用这个队列。

复制代码
1
2
3
4
5
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Main { public static void main(String[] args) { // maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略(直接抛出异常) ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy()); // 执行的线程任务大于maximumPoolSize,执行拒绝策略 for (int i = 1; i <= 3; i++) { pool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "被执行!"); } }); } // 关闭线程池 pool.shutdown(); } }

 

当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。

使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的线程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。

因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

线程池的状态

       线程池的状态分为:running、shutdown、stop、tidying、terminated

  • running:运行状态,线程池被一旦创建,就处于running状态,并且线程池中的任务数为0.该状态的线程池会接收新任务,并处理工作队列中的任务。
  • shutdown:关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;
  • stop:停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • tidying:整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为0;
  • terminated:该状态表示线程池彻底终止;

线程池分类总结

FixedThreadPool

线程数固定的线程池

  • 核心线程数和最大线程数一致
  • 非核心线程数空闲存活时间,即keepAliveTime为0
  • 阻塞队列为无界队列

工作机制:

a、提交线程任务

b、如果线程数少于核心线程,创建核心线程任务

c、如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列

d、如果线程执行完任务,去阻塞队列取任务,继续执行

使用场景: 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

CachedThreadPool

      可缓存线程池,线程数根据任务动态调整的线程池

  • 线程池参数:
    • 核心线程数为0
    • 最大线程数为Integer.MAX_VALUE
    • 工作队列是SynchronousQueue同步队列
    • 非核心线程空闲存活时间为60
  • 工作机制
    1. 提交线程任务
    2. 因为核心线程数为0,所以任务直接加到SynchronousQueue工作队列
    3. 判断是否有空闲线程,如果有,就去取出任务执行
    4. 如果没有空闲线程,就新建一个线程执行
    5. 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续存活下去;否则,被销毁。
  • 使用场景: 用于并发执行大量短期的小任务。

SingleThreadExecutor

单线程化的线程池

  • 线程池参数
    • 核心线程数为1
    • 最大线程数也为1
    • 阻塞队列是LinkedBlockingQueue
    • 非核心线程空闲存活时间为0
  • 使用场景: 适用于串行执行任务的场景,将任务按顺序执行。

ScheduledThreadPool

能实现定时、周期性任务的线程池

  • 线程池参数:
  • 最大线程数为Integer.MAX_VALUE
    • 阻塞队列是DelayedWorkQueue
    • keepAliveTime0
  • 使用场景: 周期性执行任务,并且需要限制线程数量的需求场景。

最后

以上就是温柔黑夜最近收集整理的关于什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类   线程池的状态线程池分类总结的全部内容,更多相关什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(90)

评论列表共有 0 条评论

立即
投稿
返回
顶部