concurrent包下的并发工具类

前言

JDK1.5,Sun大神(Doug Lea)为我们带来了java.util.concurrent工具包,以简化并发编程。
在concurrent包之前,我们要借助join(),wait()/notify(),synchronized等来实现并发编程,难度较大,也容易出现问题,而concurrent包的到来,我们可以使用线程池(ThreadPoolExecutor)、灵活的可重入锁(ReentrantLock)、原子类(AtomicInteger…)、阻塞队列(ArrayBlockingQueue…)、线程安全Map(ConcurrentMap…)、以及本节会介绍的并发工具(CountDownLatch、CyclicBarrier…),使得并发编程变得容易了,也增加了很多灵活性。

总结

并发工具:CountDownLatch、CyclicBarrier、Semaphore、Exchanger

CountDownLatch

CountDownLatch允许一个或者多个线程等待其他线程完成操作。

  1. 通过调用int参数的构造方法,获得CountDownLatch对象cdl,此时对象cdl中计数器为N;

    CountDownLatch cdl = new CountDownLatch(N);

  2. 一个或者多个线程中,调用cdl.await()进入阻塞,直到cdl中的N消耗为0,才进行后续代码;

  3. “其它N个线程完成操作”,调用cdl.countDown(),消耗对象cdl中计数器的N,直至为0。这里,既可以是N个线程
    每个调用一下countDown(),也可以是一个线程完成N个步骤,每个步骤调用一次countDown(),总之把计数器中的N消耗完即可。
  4. 如果调用的cdl.await()的线程是同步的,可能需要返回结果,不能一直让客户等着,这样就需要有一个超时时间,这里可以使用:

    cdl.await(5, TimeUnit.SECONDS);//5秒后,如果还是没有被(计数器为0)唤醒,则自己终止阻塞


应用场景一:用户上传多个excel表格,每个表格开一个线程处理,当全部工作线程处理完后,主线程通知用户,已经全部处理完成。(通常做法,也可以使用join方法)[这是CountDownLatch经典的应用场景,同时可以加上超时处理,比如一定时间后,告诉用户:处理时间较长,稍后通过查询接口获取处理状态]

应用场景二:跑步比赛,多个运动员做好准备,等待裁判觉得时机成熟了,发布“预备跑”,各个运动员开始比赛。

如果碰到以上类似的场景,就可以使用这个好用的工具类,后续有测试的源码

CyclicBarrier

CyclicBarrier可循环使用的屏障,让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,让所有被拦截的线程继续运行,如果在屏障的构造函数处设置barrierAction(一个线程),则优先执行该线程。

  1. 通过构造函数,获得一个屏障对象,

    CyclicBarrier cyc = new CyclicBarrier(2);//会拦截2个线程
    CyclicBarrier cyc = new CyclicBarrier(2, new Thread(){//override run()});//2个线程到达同步点,优先执行第二个参数的线程任务,再执行各线程后续代码

  2. 每个工作线程,调用cyc.await(),等待2个(构造函数设置的个数)线程到达同步点。(如果超过构造函数设置的个数,超过的工作线程会一直等待)

  3. 满足以下任何条件都可以让等待 CyclicBarrier 的线程释放:

    • 最后一个线程也到达 CyclicBarrier(调用 await())
    • 当前线程被其他线程打断(其他线程调用了这个线程的 interrupt() 方法)
    • 其他等待栅栏的线程被打断
    • 其他等待栅栏的线程因超时而被释放
    • 外部线程调用了栅栏的 CyclicBarrier.reset() 方法

应用场景 :类似CountDownLatch的应用场景,同样可以使用CyclicBarrier实现。

那CountDownLatch和CyclicBarrier的异同呢?

  1. 从字面意思,CountDownLatch更加适用于倒计时类似的场景,而CyclicBarrier(栅栏)更加适合大家速度不同,但我们要在同一状态(同一位置)集合后,再出发的场景。
  2. CountDownLatch的计数器只能使用一次,countDown()消耗完了,就不能使用了,而CyclicBarrier通过reset()方法,可以重复使用,这样可以处理更加复杂的业务,比如:计算发生错误,可以重置计数器,再重新跑一遍计算逻辑。

当然,能实现相应的功能,用哪个都可以的。

Semaphore

Semaphore(信号量)是用来控制同事访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

  1. 通过构造方法,获取信号量对象s,10表示:同一时间,只能有10个名额可以使用
    Semaphore s = new Semaphore(10);

  2. 多个线程,比如100个线程,他们通过信号量限制去访问公共资源,s.acquire()获取信号量的许可证,一共就10个,没有名额,就阻塞等待,直到有线程使用完,调用s.release()释放一个许可证,再去竞争、


应用场景:100个人排队过桥,桥太窄,桥面只能有10个人。这时,大家就竞争这10个名额,最初上去10个人,其余人就等待,当有人下桥时,则等待的竞争上去一个…

Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。

应用场景一 : 用于遗传算法,选出两个人作为交配对象,交换两人的数据,使用交叉规则得出结果。
应用场景二 :用于校对工作,两个线程分别计算,通过交换结果并比较,来确定结果是否正确

测试源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package com.kevinlsui.multithread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 测试使用CountDownLatch
* 示例:打印1-100,数字顺序随机,最后打印ok表示结束。
* 开10个线程来执行
*/
public class CountDownLatchTest {
//使用join实现,
//此写法错误
public static void joinTest() throws InterruptedException{
for(int i = 0 ;i < 10;i++){
final int a[] = {0};//匿名内部类使用
a[0] = i;
Thread t = new Thread(){
@Override
public void run() {
System.out.println("线程号:"+a[0]);
int j = a[0]*10+1;
int len = j+10;
for(;j<len;j++){
System.out.println(j);
}
};
};
t.start();
t.join();//此写法有问题,变成同步的了,由于t1必须执行完了,才去做i=1的下一轮循环
}
System.out.println("ok");
}
//使用join实现,
public static void joinTest2() throws InterruptedException{
Thread t1 = new Thread(){
@Override
public void run() {
try {
sleep(2000);//睡2s,此时其它4个线程执行完了,主线程再join他们,会不会出错?答案:不会
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
}
};
Thread t2 = new Thread(){
@Override
public void run() {
System.out.println(2);
}
};
Thread t3 = new Thread(){
@Override
public void run() {
System.out.println(3);
}
};
Thread t4 = new Thread(){
@Override
public void run() {
System.out.println(4);
}
};
Thread t5 = new Thread(){
@Override
public void run() {
System.out.println(5);
}
};
//启动
t2.start();
t1.start();
t3.start();
t4.start();
t5.start();
//join
t1.join();//阻塞的是main线程
System.out.println(t2.getState());//TERMINATED,再join不报错
t2.join();
t3.join();
t4.join();
t5.join();
System.out.println("ok");
}
//使用CountDownLatch
public static void countDownLatchTest() throws InterruptedException{
final CountDownLatch startflag = new CountDownLatch(1);
final CountDownLatch flag = new CountDownLatch(5);
Thread t1 = new Thread(){
@Override
public void run() {
try {
startflag.await(5, TimeUnit.SECONDS);//开始标志,5s后不在阻塞,执行下面的代码
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
flag.countDown();
}
};
Thread t2 = new Thread(){
@Override
public void run() {
try {
startflag.await();//开始标志
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(2);
flag.countDown();
}
};
Thread t3 = new Thread(){
@Override
public void run() {
try {
startflag.await();//开始标志
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(3);
flag.countDown();
}
};
Thread t4 = new Thread(){
@Override
public void run() {
try {
startflag.await();//开始标志
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(4);
flag.countDown();
}
};
Thread t5 = new Thread(){
@Override
public void run() {
try {
startflag.await();//开始标志
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(5);
flag.countDown();
}
};
//启动
t2.start();
t1.start();
t3.start();
t4.start();
t5.start();
Thread.sleep(10000);
System.out.println("开始...");
startflag.countDown();
flag.await();
System.out.println("结束。");
}
public static void main(String[] args) throws InterruptedException {
//joinTest2();
countDownLatchTest();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package com.kevinlsui.multithread;
import java.util.Map.Entry;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 测试使用CyclicBarrier
* 能实现CountDownLatch的功能,但是有更加高级的用法
*
* @author Administrator
*
*/
public class CyclicBarrierTest {
//常规用法
public static void test1() throws InterruptedException, BrokenBarrierException{
final CyclicBarrier cyc = new CyclicBarrier(2);
new Thread(){
@Override
public void run() {
try {
System.out.println("我执行的比较慢,需要2s");
sleep(2000);
cyc.await();
System.out.println("1");
} catch (Exception e) {
e.printStackTrace();
}
}}.start();
System.out.println("我执行的比较快,在这等你");
cyc.await();
System.out.println(2);
}
//等设定的线程都完成后,去优先执行给定的线程
public static void test2(){
final CyclicBarrier cyc = new CyclicBarrier(2, new Thread(){
@Override
public void run() {
System.out.println("等3个线程都到达指定点,执行我这个线程。我要3秒钟");
try {
sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
new Thread(){
@Override
public void run() {
System.out.println("1");
try {
cyc.await();
System.out.println("after 1...");
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
System.out.println("2");
try {
cyc.await();
System.out.println("after 2...");
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
System.out.println("3");
try {
cyc.await();//如果屏障数为2,则此处一直获取不了通过
System.out.println("after 3...");
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
//示例,开三个线程去计算,然后结果合并
public static void test3() throws InterruptedException{
final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String,Integer>();
final CyclicBarrier cyc = new CyclicBarrier(4, new Thread(){
@Override
public void run() {
int result = 0;
for(Entry<String, Integer> entry : map.entrySet()){
result += entry.getValue();
}
System.out.println("结果为:"+result);
}
});
Executor executor = Executors.newFixedThreadPool(3);
for(int i = 0 ; i < 3 ; i++){
executor.execute(new Runnable(){
@Override
public void run() {
System.out.println("计算:"+Thread.currentThread().getName());
map.put(Thread.currentThread().getName(), 2);
try {
cyc.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(2000);//此时,设置需要4个线程到达指定点,永远实现不了:new CyclicBarrier(4, new Thread(){
System.out.println("阻塞的个数:"+cyc.getNumberWaiting());//3
}
/**
* 满足以下任何条件都可以让等待 CyclicBarrier 的线程释放:
* 最后一个线程也到达 CyclicBarrier(调用 await())
* 当前线程被其他线程打断(其他线程调用了这个线程的 interrupt() 方法)
* 其他等待栅栏的线程被打断
* 其他等待栅栏的线程因超时而被释放
* 外部线程调用了栅栏的 CyclicBarrier.reset() 方法
* @param args
* @throws InterruptedException
* @throws BrokenBarrierException
*/
public static void test4(){
final CyclicBarrier cyc = new CyclicBarrier(3, new Thread(){
@Override
public void run() {
System.out.println("等3个线程都到达指定点,执行我这个线程。我要3秒钟");
try {
sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
new Thread(){
@Override
public void run() {
System.out.println("1");
try {
cyc.await();
System.out.println("after 1...");
} catch (Exception e) {
System.out.println("1 exception...");
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
System.out.println("2");
try {
//cyc.await(1, TimeUnit.SECONDS);
cyc.await();
System.out.println("after 2...");
} catch (Exception e) {
System.out.println("2 exception...");
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
try {
sleep(3000);
cyc.reset();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("3");
try {
cyc.await();
System.out.println("after 3...");
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
//test1();
//test2();
// test3();
test4();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.kevinlsui.multithread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.swing.plaf.SliderUI;
/**
* 测试Semaphore
* 开40个线程去数据库存数据,但是只有10个数据连接可用,需要限流
* @author Administrator
*
*/
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(100);
final Semaphore s = new Semaphore(10);
System.out.println("初始的个数:"+s.availablePermits());//可用的信号量个数
for(int i = 0 ; i<100;i++){
pool.execute(new Runnable() {
public void run() {
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println("存储数据。。。");
s.release();//
}
});
}
//Thread.sleep(100);
System.out.println("可用的信号量:"+s.availablePermits());//可用的信号量个数
System.out.println("是否有等待的线程:"+s.hasQueuedThreads());
System.out.println("等待的线程个数:"+s.getQueueLength());
pool.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.kevinlsui.multithread;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class ExchangerTest {
public static void main(String[] args) {
/*final Exchanger<String> ex = new Exchanger<String>();
new Thread(){
@Override
public void run() {
String a = "我是 a 的数据";
try {
String c = ex.exchange(a);
System.out.println("a交换后:"+c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
String b = "我是 b 的数据";
try {
sleep(3000);
String d = ex.exchange(b);
System.out.println("b交换后:"+d);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();*/
final Exchanger<String> ex = new Exchanger<String>();
new Thread(){
@Override
public void run() {
String a = "我是 a 的数据";
try {
String c = ex.exchange(a,2,TimeUnit.SECONDS);
System.out.println("a交换后:"+c);
} catch (Exception e) {
System.out.println("a 最多等2s,进异常...");
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
String b = "我是 b 的数据";
try {
sleep(3000);
String d = ex.exchange(b);
System.out.println("b交换后:"+d);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
文章目录
  1. 1. 前言
  2. 2. 总结
    1. 2.1. CountDownLatch
    2. 2.2. CyclicBarrier
    3. 2.3. 那CountDownLatch和CyclicBarrier的异同呢?
    4. 2.4. Semaphore
    5. 2.5. Exchanger
  3. 3. 测试源码
|