我是靠谱客的博主 雪白导师,这篇文章主要介绍Java大数据量(多线程)分段分批处理,现在分享给大家,希望可以做个参考。

#分段处理主类
github地址:
https://github.com/zuojingang/common-tools-intergrated/blob/master/src/main/java/pers/zuo/design/pattern/piecewise/PiecewiseHandler.java

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package pers.zuo.component.piecewise; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import pers.zuo.component.piecewise.bean.PiecewiseKey; import pers.zuo.component.piecewise.bean.PiecewiseResult; import pers.zuo.component.piecewise.bean.PiecewiseTask; /** * @author zuojingang * * @param <T> * the type of part process return */ public abstract class PiecewiseHandler<V> { public void nThreads( final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult, final int totalNum) throws Exception { nThreads(nThreadResult, totalNum, D_THREAD_SIZE, D_PART_SIZE); } /** * @param totalNum * @param threadSize * @return nThreads process result. */ public void nThreads( final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult, final int totalNum, final int threadSize, final int partSize) throws Exception { if (null == nThreadResult || 0 >= totalNum || 0 >= threadSize) { return; } ExecutorService fixThreadPool = Executors.newFixedThreadPool(D_N_THREAD); List<PiecewiseTask> fTaskList = new ArrayList<>(); int fromIndex = 0; try { while (totalNum > fromIndex) { final int thisFromIndex = fromIndex; final int threadProcessNum = Math.min(totalNum - fromIndex, threadSize); final int thisToIndex = thisFromIndex + threadProcessNum; if (0 < threadProcessNum) { PiecewiseTask futureTask = PiecewiseBuilder.buildTask(new Callable<Boolean>() { @Override public Boolean call() throws Exception { final Map<PiecewiseKey, PiecewiseResult<V>> threadResult = PiecewiseBuilder .initializeThreadResult(); nThreadResult.put(PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex), PiecewiseBuilder.buildResult(threadResult)); singleThread(threadResult, thisFromIndex, threadProcessNum, partSize); return true; } }, PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex)); fixThreadPool.submit(futureTask); fTaskList.add(futureTask); } fromIndex += threadProcessNum; } boolean finished = true; for (PiecewiseTask futureTask : fTaskList) { try { finished = finished && futureTask.get(); } catch (InterruptedException | ExecutionException e) { nThreadResult.get(futureTask.getTaskKey()).setException(e); } } } catch (Exception e) { throw e; } finally { // the threadPool must manual-lock after use fixThreadPool.shutdown(); } } public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int totalNum) { singleThread(threadResult, 0, totalNum); } public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset, final int totalNum) { singleThread(threadResult, offset, totalNum, D_PART_SIZE); } /** * @param offset * @param toIndex * @param partSize * @return process subList values and include first index(offset) and exclude * latest index(offset + totalNum) */ public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset, final int totalNum, final int partSize) { if (0 >= totalNum || 0 >= partSize) { return; } final int toIndex = offset + totalNum; int fromIndex = offset; while (toIndex > fromIndex) { int thisToIndex = Math.min(fromIndex + partSize, toIndex); V partResult = null; Exception pe = null; try { partResult = partProcess(fromIndex, thisToIndex); } catch (Exception e) { pe = e; } threadResult.put(PiecewiseBuilder.buildKey(fromIndex, thisToIndex), PiecewiseBuilder.buildResult(partResult, pe)); fromIndex = thisToIndex; } } /** * @param offset * @param partSize * @return part process result */ protected abstract V partProcess(final int fromIndex, final int toIndex) throws Exception; public static final int D_N_THREAD = 10; public static final int D_THREAD_SIZE = 10000; public static final int D_PART_SIZE = 1000; }

#分段任务定制类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package pers.zuo.component.piecewise.bean; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; /** * @author zuojingang * * @param <K * extends Number> the type of part process return */ public class PiecewiseTask extends FutureTask<Boolean> { private final PiecewiseKey taskKey; public PiecewiseTask(Callable<Boolean> callable, PiecewiseKey taskKey) { super(callable); this.taskKey = taskKey; } public PiecewiseKey getTaskKey() { return taskKey; } }

#分段任务Key值类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package pers.zuo.component.piecewise.bean; public class PiecewiseKey { private final Integer from; private final Integer to; public PiecewiseKey(Integer from, Integer to) { super(); this.from = from; this.to = to; } public Integer getFrom() { return from; } public Integer getTo() { return to; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((from == null) ? 0 : from.hashCode()); result = prime * result + ((to == null) ? 0 : to.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; PiecewiseKey other = (PiecewiseKey) obj; if (from == null) { if (other.from != null) return false; } else if (!from.equals(other.from)) return false; if (to == null) { if (other.to != null) return false; } else if (!to.equals(other.to)) return false; return true; } }

#分段任务返回值类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pers.zuo.component.piecewise.bean; public class PiecewiseResult<V> { private final V val; private Exception exception; public PiecewiseResult(V val) { super(); this.val = val; } public PiecewiseResult(V val, Exception exception) { super(); this.val = val; this.exception = exception; } public Exception getException() { return exception; } public void setException(Exception exception) { this.exception = exception; } public V getVal() { return val; } }

##获取实例工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package pers.zuo.component.piecewise.manager; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import pers.zuo.component.piecewise.bean.PiecewiseKey; import pers.zuo.component.piecewise.bean.PiecewiseResult; import pers.zuo.component.piecewise.bean.PiecewiseTask; public class PiecewiseBuilder { public static PiecewiseKey buildKey(Integer from, Integer to) { return new PiecewiseKey(from, to); } public static <V> PiecewiseResult<V> buildResult(V val) { return new PiecewiseResult<V>(val); } public static <V> PiecewiseResult<V> buildResult(V val, Exception exception) { return new PiecewiseResult<V>(val, exception); } public static PiecewiseTask buildTask(Callable<Boolean> callable, PiecewiseKey taskKey) { return new PiecewiseTask(callable, taskKey); } /** * this method aimed for simple when define the nThreadResult * * @return */ public static <V> Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> initializeNThreadResult() { return new HashMap<>(); } /** * this method aimed for simple when define the threadResult * * @return */ public static <V> Map<PiecewiseKey, PiecewiseResult<V>> initializeThreadResult() { return new HashMap<>(); } }

最后

以上就是雪白导师最近收集整理的关于Java大数据量(多线程)分段分批处理的全部内容,更多相关Java大数据量(多线程)分段分批处理内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(99)

评论列表共有 0 条评论

立即
投稿
返回
顶部