我是靠谱客的博主 搞怪手机,这篇文章主要介绍Java并发学习笔记20 线程池 ForkJoinPool,现在分享给大家,希望可以做个参考。

bilibili-Java并发学习笔记20 线程池 ForkJoinPool

基于 java 1.8.0

P64_ForkJoinPool原理与构造方式详解

  1. 分而治之
  2. 工作窃取

适合 CPU 密集型计算任务,不适合 IO 密集型任务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/** * @param parallelism 并行度级别 默认值 = java.lang.Runtime#availableProcessors * @param factory 创建新线程的工厂 默认值 = defaultForkJoinWorkerThreadFactory * @param handler 由于执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序 默认值为 null * @param asyncMode * true 为从未连接的分叉任务建立本地先进先出调度模式。 在工作线程只处理事件式异步任务的应用程序中,此模式可能比默认的基于本地堆栈的模式更合适。 * 默认值 = false * @throws IllegalArgumentException 如果并行度小于或等于零,或大于实现限制 * @throws NullPointerException if the factory is null * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }

P65_ForkJoinTask详解与示例分析

复制代码
1
2
3
4
5
6
7
8
9
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { V result; protected abstract V compute(); } public abstract class RecursiveAction extends ForkJoinTask<Void> { protected abstract void compute(); }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** * Submits a ForkJoinTask for execution. * * @param task the task to submit * @param <T> the type of the task's result * @return the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; }

ForkJoinPool 示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package new_package.thread.p64; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class ForkJoinTest { public static void main(String[] args) { //ForkJoinPool forkJoinPool = new ForkJoinPool(2); ForkJoinPool forkJoinPool = new ForkJoinPool(); SumTask sumTask = new SumTask(1, 100, 10); System.out.println(forkJoinPool.invoke(sumTask)); forkJoinPool.shutdown(); } } class SumTask extends RecursiveTask<Integer> { int limit; int start; int end; public SumTask(int start, int end, int limit) { this.start = start; this.end = end; this.limit = limit; } @Override protected Integer compute() { int sum = 0; if ((end - start) <= limit) { System.out.println(Thread.currentThread().getName()); for (int i = start; i <= end; i++) { sum += i; } return sum; } // Fork 步骤 int mid = (end + start) / 2; SumTask leftTask = new SumTask(start, mid, limit); SumTask rightTask = new SumTask(mid + 1, end, limit); // Join 步骤 invokeAll(leftTask, rightTask); Integer leftResult = leftTask.join(); Integer rightResult = rightTask.join(); return leftResult + rightResult; } }

P66_CompletionService源码详解与示例剖析

复制代码
1
2
3
4
5
6
7
8
// jdk 中 CompletionService 的唯一实现 public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; // ... }

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package new_package.thread.p66; import java.util.concurrent.*; import java.util.stream.IntStream; /** * 以任务完成顺序获取到最后的结果集合 */ public class CompletionServiceTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = new ThreadPoolExecutor(10, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(), (r, executor) -> { }); CompletionService<Integer> completionService = new ExecutorCompletionService(executorService); IntStream.range(0, 10).forEach(r -> { completionService.submit(() -> { try { Thread.sleep((long) (Math.random() * 3000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); return r + 1; }); }); for (int i = 0; i < 10; i++) { System.out.println(completionService.take().get()); } executorService.shutdown(); } }

P67_ThreadLocalRandom在多线程竞争环境下的实现策略

  1. Random
复制代码
1
2
3
4
5
6
7
8
9
10
11
import java.util.Random; public class RandomTest { public static void main(String[] args) { Random random = new Random(); System.out.println(random.nextInt(10)); } }
  • 多线程性能问题
    • 多线程同时操作种子更新,产生竞争(自旋锁)
  • 正确性没有问题,只是在多线程高并发情况下效率低下
  1. ThreadLocalRandom
复制代码
1
2
3
4
5
6
7
8
9
10
import java.util.concurrent.ThreadLocalRandom; public class ThreadLocalRandomTest { public static void main(String[] args) { ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); System.out.println(threadLocalRandom.nextInt(10)); } }
  • 解决随机数生成在多线程高并发情况下效率低的问题
  • 在并发代码中,随机数生成使用 ThreadLocalRandom 较好

  • 随机数生成器
    • 随机数生成器种子
    • 随机数生成算法
  • 对于 ThreadLocalRandom 来说,其随机器生成器的种子存放在每个的线程的 ThreadLocal 中
    • Random 是共享同一个种子

ThreadLocalRandom.java

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** * Returns the {@link #current() current} thread's {@code ThreadLocalRandom}. * @return the {@link #current() current} thread's {@code ThreadLocalRandom} */ private Object readResolve() { return current(); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long SEED; private static final long PROBE; private static final long SECONDARY; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; SEED = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSeed")); PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); SECONDARY = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSecondarySeed")); } catch (Exception e) { throw new Error(e); } }

Thread.java

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// The following three initially uninitialized fields are exclusively // managed by class java.util.concurrent.ThreadLocalRandom. These // fields are used to build the high-performance PRNGs in the // concurrent code, and we can not risk accidental false sharing. // Hence, the fields are isolated with @Contended. /** The current seed for a ThreadLocalRandom */ @sun.misc.Contended("tlr") long threadLocalRandomSeed; /** Probe hash value; nonzero if threadLocalRandomSeed initialized */ @sun.misc.Contended("tlr") int threadLocalRandomProbe; /** Secondary seed isolated from public ThreadLocalRandom sequence */ @sun.misc.Contended("tlr") int threadLocalRandomSecondarySeed;

最后

以上就是搞怪手机最近收集整理的关于Java并发学习笔记20 线程池 ForkJoinPool的全部内容,更多相关Java并发学习笔记20内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(94)

评论列表共有 0 条评论

立即
投稿
返回
顶部