本篇讲述新类库中的工具类。
参考博客资料:
http://developer.51cto.com/art/201403/432095.htm
http://blog.csdn.net/flyingdon/article/details/5110582
http://blog.csdn.net/shihuacai/article/details/8856526
《JAVA思想编程》
一、CountDownLatch CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作。例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工作都完成,主线程才能继续它的工作。这些准备工作彼此独立,所以可以并发执行以提高速度。在这个场景下就可以使用 CountDownLatch 协调线程之间的调度了。在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join()。在 JUC 出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了。
例子:
a,b,c,d四个盘,分别统计出大小,最后要求得四个盘总的大小的值。
首先想到的是CountDownLatch这个类,在所有单个盘的大小统计完成之后,再进行总和的计算。之后,我们需要一个DiskMemory类,抽象盘的大小的统计。有个size和totalSize,大概就这些。启动线程时,启动4次,每次分别计算单个的size,最后汇总。
代码如下:
1 import java.util.Random; 2 3 public class DiskMemory { 4 5 private static int totalSize; 6 7 public int getSize() { 8 return new Random().nextInt(5) + 1; 9 }10 11 public void setSize(int size) {12 // synchronized method is must13 synchronized (this) {14 System.out.println("-------first totalSize: " + totalSize);15 totalSize = totalSize + size;16 System.out.println("-------end totalSize: " + totalSize);17 }18 19 }20 21 public int getTotalSize() {22 return totalSize;23 }24 }
1 import java.util.concurrent.CountDownLatch; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.TimeUnit; 5 6 public class CountSizeDemo { 7 8 public static void main(String[] args) { 9 10 CountDownLatch countDownLatch = new CountDownLatch(4);11 DiskMemory diskMemory = new DiskMemory();12 ExecutorService executorService = Executors.newCachedThreadPool();13 for (int i = 0; i < 4; i++) {14 15 executorService.execute(() -> {16 try {17 TimeUnit.MICROSECONDS.sleep(1000);18 } catch (InterruptedException ex) {19 ex.printStackTrace();20 }21 int size = diskMemory.getSize();22 System.out.println("get size: " + size);23 diskMemory.setSize(size);24 countDownLatch.countDown();25 });26 }27 try {28 countDownLatch.await();29 } catch (InterruptedException e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }33 System.out.println(diskMemory.getTotalSize());34 executorService.shutdownNow();35 }36 37 }
执行结果:
1 get size: 3 2 get size: 4 3 get size: 5 4 get size: 5 5 -------first totalSize: 0 6 -------end totalSize: 3 7 -------first totalSize: 3 8 -------end totalSize: 8 9 -------first totalSize: 810 -------end totalSize: 1211 -------first totalSize: 1212 -------end totalSize: 1713 17
二、CyclicBarrier
CyclicBarrier 翻译过来叫循环栅栏、循环障碍什么的(还是有点别扭的。所以还是别翻译了,只可意会不可言传啊)。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。
给个情景:
公司带领大家出外郊游,到达地方后,大家去A地游览,肯定是路上有快有满,都到了A之后,大家再集体去B地。这个工具类CyclicBarrier用于这个类情景。
它与CountDownLatch不同之处在于,它可以重复计数,当计数减一减一最后到零时,之后重新计数。这个与CountDownLatch的一个区别。
下面给出个例子:
1 import java.util.Random; 2 import java.util.concurrent.BrokenBarrierException; 3 import java.util.concurrent.CyclicBarrier; 4 5 public class Horse implements Runnable { 6 7 private static int counter = 0; 8 private final int id = counter++; 9 private int strides = 0;10 private static Random rand = new Random(47);11 private static CyclicBarrier barrier;12 public Horse(CyclicBarrier b) {13 barrier = b;14 }15 public synchronized int getStrides() {16 return strides;17 }18 @Override19 public void run() {20 21 try {22 while (!Thread.interrupted()) {23 synchronized (this) {24 strides += rand.nextInt(3);25 }26 barrier.await();27 }28 } catch (InterruptedException e) {29 e.printStackTrace();30 } catch (BrokenBarrierException e) {31 e.printStackTrace();32 }33 }34 35 public String toString() {36 return "Horse " + id + " ";37 }38 39 public String tracks () {40 StringBuilder sBuilder = new StringBuilder();41 for (int i = 0; i < getStrides(); i++) {42 sBuilder.append("*");43 }44 sBuilder.append(id);45 return sBuilder.toString();46 }47 48 }
1 import java.util.ArrayList; 2 import java.util.List; 3 import java.util.concurrent.CyclicBarrier; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7 8 public class HorseRace { 9 10 static final int FINIASH_LINE = 39;11 private Listhorses = new ArrayList ();12 private ExecutorService executorService = Executors.newCachedThreadPool();13 private CyclicBarrier barrier;14 15 public HorseRace (int nHorse, final int pause) {16 barrier = new CyclicBarrier(nHorse, () -> {17 StringBuilder s = new StringBuilder();18 System.out.println("============================");19 for (Horse horse: horses) {20 System.out.println(horse.getStrides());21 }22 for (Horse horse: horses) {23 if (horse.getStrides() >= FINIASH_LINE) {24 System.out.println(horse + "won!");25 executorService.shutdownNow();26 return;27 }28 }29 30 try {31 TimeUnit.MICROSECONDS.sleep(pause);32 } catch (InterruptedException e) {33 System.out.println(s);34 }35 });36 37 for (int i=0; i
执行结果:
1 ============================ 2 2 3 2 4 1 5 ============================ 6 3 7 4 8 3 9 ============================ 10 3 11 5 12 5 13 ============================ 14 4 15 5 16 5 17 ============================ 18 5 19 7 20 5 21 ============================ 22 6 23 8 24 6 25 ============================ 26 8 27 9 28 7 29 ============================ 30 9 31 9 32 7 33 ============================ 34 11 35 11 36 7 37 ============================ 38 12 39 12 40 9 41 ============================ 42 13 43 13 44 9 45 ============================ 46 13 47 13 48 10 49 ============================ 50 14 51 13 52 11 53 ============================ 54 14 55 15 56 11 57 ============================ 58 14 59 15 60 12 61 ============================ 62 15 63 16 64 14 65 ============================ 66 16 67 16 68 15 69 ============================ 70 18 71 17 72 16 73 ============================ 74 19 75 19 76 16 77 ============================ 78 19 79 20 80 16 81 ============================ 82 19 83 20 84 18 85 ============================ 86 21 87 20 88 20 89 ============================ 90 22 91 21 92 21 93 ============================ 94 24 95 21 96 21 97 ============================ 98 24 99 21100 21101 ============================102 25103 23104 23105 ============================106 27107 24108 25109 ============================110 29111 24112 26113 ============================114 31115 25116 26117 ============================118 31119 25120 28121 ============================122 32123 25124 28125 ============================126 34127 26128 28129 ============================130 35131 27132 30133 ============================134 35135 27136 32137 ============================138 35139 27140 34141 ============================142 36143 29144 36145 ============================146 36147 29148 38149 ============================150 38151 31152 39153 Horse 2 won!
三、Semaphore
Semaphore 作用是只允许一定数量的线程同时执行一段任务。
以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这是如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。
代码例子如下:
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4 5 public class SemaphoreTest { 6 public static void main(String[] args) { 7 // 停车场 8 ExecutorService exec = Executors.newCachedThreadPool(); 9 // 只有五个车位10 final Semaphore semp = new Semaphore(5); 11 // 模拟20辆车进入 12 for (int index = 0; index < 20; index++) {13 final int NO = index; 14 exec.execute(() -> {15 try { 16 // 获取进入许可 17 semp.acquire(); 18 System.out.println("Car No: " + NO); 19 Thread.sleep((long) (Math.random() * 10000)); 20 // 出去后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞21 semp.release(); 22 } catch (InterruptedException e) { 23 } 24 }); 25 } 26 // 退出线程池 27 exec.shutdown(); 28 } 29 }
1 Car No: 1 2 Car No: 2 3 Car No: 3 4 Car No: 0 5 Car No: 4 6 Car No: 5 7 Car No: 6 8 Car No: 7 9 Car No: 810 Car No: 911 Car No: 1012 Car No: 1113 Car No: 1214 Car No: 1315 Car No: 1416 Car No: 1517 Car No: 1618 Car No: 1719 Car No: 1820 Car No: 19
四、PriorityBlockingQueue
名字起的非常的简洁,同时又把所有的重点描述清楚了。首先是Priority,优先级的意思,表明他可以排序,之后是Blocking,因为在没有数据被取出的时候,会发生阻塞,最后,这个一个队列,我们不需要显式的控制它的并发,一切都在它的内部自己完成。
上例子:
1 import java.util.ArrayList; 2 import java.util.List; 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.PriorityBlockingQueue; 6 import java.util.concurrent.TimeUnit; 7 8 public class PriorityBlockingQueueDemo { 9 10 public static void main(String[] args) throws InterruptedException {11 12 PriorityBlockingQueuequeue = new PriorityBlockingQueue<>();13 ExecutorService service = Executors.newCachedThreadPool();14 service.execute(new Thread(() -> {15 System.out.println("Polling...");16 while (true) {17 Integer poll;18 try {19 poll = queue.take();20 System.out.println("Pooled: " + poll);21 } catch (Exception e) {22 e.printStackTrace();23 }24 }25 }));26 27 TimeUnit.MICROSECONDS.sleep(5);28 System.out.println("Adding to queue");29 List list = new ArrayList<>();30 list.add(1);31 list.add(5);32 list.add(1);33 list.add(4);34 list.add(3);35 list.add(1);36 queue.addAll(list);37 TimeUnit.MICROSECONDS.sleep(1);38 }39 }
1 Polling...2 Adding to queue3 Pooled: 14 Pooled: 15 Pooled: 16 Pooled: 37 Pooled: 48 Pooled: 5
PriorityBlockingQueue里面存储的对象一定要实现Comparable接口。队列通过这个接口的compareTo()方法来实现优先级排序的。如果我们自己不重写compareTo()方法,那么会按照类中默认的排序方法,进行排序。