博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发(三)
阅读量:6651 次
发布时间:2019-06-25

本文共 11195 字,大约阅读时间需要 37 分钟。

本篇讲述新类库中的工具类。

参考博客资料:

http://developer.51cto.com/art/201403/432095.htm

http://blog.csdn.net/flyingdon/article/details/5110582

http://blog.csdn.net/shihuacai/article/details/8856526

《JAVA思想编程》

 

一、CountDownLatch

 CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作。例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工作都完成,主线程才能继续它的工作。这些准备工作彼此独立,所以可以并发执行以提高速度。在这个场景下就可以使用 CountDownLatch 协调线程之间的调度了。在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join()。在 JUC 出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了。

例子:

a,b,c,d四个盘,分别统计出大小,最后要求得四个盘总的大小的值。

首先想到的是CountDownLatch这个类,在所有单个盘的大小统计完成之后,再进行总和的计算。之后,我们需要一个DiskMemory类,抽象盘的大小的统计。有个size和totalSize,大概就这些。启动线程时,启动4次,每次分别计算单个的size,最后汇总。

代码如下:

1 import java.util.Random; 2  3 public class DiskMemory { 4  5     private static int totalSize; 6      7     public int getSize() { 8         return new Random().nextInt(5) + 1; 9     }10     11     public void setSize(int size) {12         // synchronized method is must13         synchronized (this) {14             System.out.println("-------first totalSize: " + totalSize);15             totalSize = totalSize + size;16             System.out.println("-------end totalSize: " + totalSize);17         }18         19     }20     21     public int getTotalSize() {22         return totalSize;23     }24 }
View Code
1 import java.util.concurrent.CountDownLatch; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.TimeUnit; 5  6 public class CountSizeDemo { 7  8     public static void main(String[] args) { 9 10         CountDownLatch countDownLatch = new CountDownLatch(4);11         DiskMemory diskMemory = new DiskMemory();12         ExecutorService executorService = Executors.newCachedThreadPool();13         for (int i = 0; i < 4; i++) {14 15             executorService.execute(() -> {16                 try {17                     TimeUnit.MICROSECONDS.sleep(1000);18                 } catch (InterruptedException ex) {19                     ex.printStackTrace();20                 }21                 int size = diskMemory.getSize();22                 System.out.println("get size: " + size);23                 diskMemory.setSize(size);24                 countDownLatch.countDown();25             });26         }27         try {28             countDownLatch.await();29         } catch (InterruptedException e) {30             // TODO Auto-generated catch block31             e.printStackTrace();32         }33         System.out.println(diskMemory.getTotalSize());34         executorService.shutdownNow();35     }36 37 }
View Code

执行结果:

1 get size: 3 2 get size: 4 3 get size: 5 4 get size: 5 5 -------first totalSize: 0 6 -------end totalSize: 3 7 -------first totalSize: 3 8 -------end totalSize: 8 9 -------first totalSize: 810 -------end totalSize: 1211 -------first totalSize: 1212 -------end totalSize: 1713 17
View Code

 

二、CyclicBarrier

CyclicBarrier 翻译过来叫循环栅栏、循环障碍什么的(还是有点别扭的。所以还是别翻译了,只可意会不可言传啊)。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。

给个情景:

公司带领大家出外郊游,到达地方后,大家去A地游览,肯定是路上有快有满,都到了A之后,大家再集体去B地。这个工具类CyclicBarrier用于这个类情景。

它与CountDownLatch不同之处在于,它可以重复计数,当计数减一减一最后到零时,之后重新计数。这个与CountDownLatch的一个区别。

下面给出个例子:

1 import java.util.Random; 2 import java.util.concurrent.BrokenBarrierException; 3 import java.util.concurrent.CyclicBarrier; 4  5 public class Horse implements Runnable { 6  7     private static int counter = 0; 8     private final int id = counter++; 9     private int strides = 0;10     private static Random rand = new Random(47);11     private static CyclicBarrier barrier;12     public Horse(CyclicBarrier b) {13         barrier = b;14     }15     public synchronized int getStrides() {16         return strides;17     }18     @Override19     public void run() {20 21         try {22             while (!Thread.interrupted()) {23                 synchronized (this) {24                     strides += rand.nextInt(3);25                 }26                 barrier.await();27             }28         } catch (InterruptedException e) {29             e.printStackTrace();30         } catch (BrokenBarrierException e) {31             e.printStackTrace();32         }33     }34     35     public String toString() {36         return "Horse " + id + " ";37     }38     39     public String tracks () {40         StringBuilder sBuilder = new StringBuilder();41         for (int i = 0; i < getStrides(); i++) {42             sBuilder.append("*");43         }44         sBuilder.append(id);45         return sBuilder.toString();46     }47 48 }
View Code
1 import java.util.ArrayList; 2 import java.util.List; 3 import java.util.concurrent.CyclicBarrier; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7  8 public class HorseRace { 9 10     static final int FINIASH_LINE = 39;11     private List
horses = new ArrayList
();12 private ExecutorService executorService = Executors.newCachedThreadPool();13 private CyclicBarrier barrier;14 15 public HorseRace (int nHorse, final int pause) {16 barrier = new CyclicBarrier(nHorse, () -> {17 StringBuilder s = new StringBuilder();18 System.out.println("============================");19 for (Horse horse: horses) {20 System.out.println(horse.getStrides());21 }22 for (Horse horse: horses) {23 if (horse.getStrides() >= FINIASH_LINE) {24 System.out.println(horse + "won!");25 executorService.shutdownNow();26 return;27 }28 }29 30 try {31 TimeUnit.MICROSECONDS.sleep(pause);32 } catch (InterruptedException e) {33 System.out.println(s);34 }35 });36 37 for (int i=0; i
View Code

执行结果:

1 ============================  2 2  3 2  4 1  5 ============================  6 3  7 4  8 3  9 ============================ 10 3 11 5 12 5 13 ============================ 14 4 15 5 16 5 17 ============================ 18 5 19 7 20 5 21 ============================ 22 6 23 8 24 6 25 ============================ 26 8 27 9 28 7 29 ============================ 30 9 31 9 32 7 33 ============================ 34 11 35 11 36 7 37 ============================ 38 12 39 12 40 9 41 ============================ 42 13 43 13 44 9 45 ============================ 46 13 47 13 48 10 49 ============================ 50 14 51 13 52 11 53 ============================ 54 14 55 15 56 11 57 ============================ 58 14 59 15 60 12 61 ============================ 62 15 63 16 64 14 65 ============================ 66 16 67 16 68 15 69 ============================ 70 18 71 17 72 16 73 ============================ 74 19 75 19 76 16 77 ============================ 78 19 79 20 80 16 81 ============================ 82 19 83 20 84 18 85 ============================ 86 21 87 20 88 20 89 ============================ 90 22 91 21 92 21 93 ============================ 94 24 95 21 96 21 97 ============================ 98 24 99 21100 21101 ============================102 25103 23104 23105 ============================106 27107 24108 25109 ============================110 29111 24112 26113 ============================114 31115 25116 26117 ============================118 31119 25120 28121 ============================122 32123 25124 28125 ============================126 34127 26128 28129 ============================130 35131 27132 30133 ============================134 35135 27136 32137 ============================138 35139 27140 34141 ============================142 36143 29144 36145 ============================146 36147 29148 38149 ============================150 38151 31152 39153 Horse 2 won!
View Code

 

三、Semaphore

Semaphore 作用是只允许一定数量的线程同时执行一段任务。

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这是如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

代码例子如下:

1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4  5 public class SemaphoreTest { 6     public static void main(String[] args) {   7         // 停车场 8         ExecutorService exec = Executors.newCachedThreadPool();   9         // 只有五个车位10         final Semaphore semp = new Semaphore(5);  11         // 模拟20辆车进入 12         for (int index = 0; index < 20; index++) {13             final int NO = index; 14             exec.execute(() -> {15                 try {  16                     // 获取进入许可 17                     semp.acquire();  18                     System.out.println("Car No: " + NO);  19                     Thread.sleep((long) (Math.random() * 10000));  20                     // 出去后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞21                     semp.release();  22                 } catch (InterruptedException e) {  23                 }  24             });  25         }  26         // 退出线程池 27         exec.shutdown();  28     }  29 }
View Code
1 Car No: 1 2 Car No: 2 3 Car No: 3 4 Car No: 0 5 Car No: 4 6 Car No: 5 7 Car No: 6 8 Car No: 7 9 Car No: 810 Car No: 911 Car No: 1012 Car No: 1113 Car No: 1214 Car No: 1315 Car No: 1416 Car No: 1517 Car No: 1618 Car No: 1719 Car No: 1820 Car No: 19
View Code

 

四、PriorityBlockingQueue

名字起的非常的简洁,同时又把所有的重点描述清楚了。首先是Priority,优先级的意思,表明他可以排序,之后是Blocking,因为在没有数据被取出的时候,会发生阻塞,最后,这个一个队列,我们不需要显式的控制它的并发,一切都在它的内部自己完成。

上例子:

1 import java.util.ArrayList; 2 import java.util.List; 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.PriorityBlockingQueue; 6 import java.util.concurrent.TimeUnit; 7  8 public class PriorityBlockingQueueDemo { 9 10     public static void main(String[] args) throws InterruptedException {11 12         PriorityBlockingQueue
queue = new PriorityBlockingQueue<>();13 ExecutorService service = Executors.newCachedThreadPool();14 service.execute(new Thread(() -> {15 System.out.println("Polling...");16 while (true) {17 Integer poll;18 try {19 poll = queue.take();20 System.out.println("Pooled: " + poll);21 } catch (Exception e) {22 e.printStackTrace();23 }24 }25 }));26 27 TimeUnit.MICROSECONDS.sleep(5);28 System.out.println("Adding to queue");29 List
list = new ArrayList<>();30 list.add(1);31 list.add(5);32 list.add(1);33 list.add(4);34 list.add(3);35 list.add(1);36 queue.addAll(list);37 TimeUnit.MICROSECONDS.sleep(1);38 }39 }
View Code
1 Polling...2 Adding to queue3 Pooled: 14 Pooled: 15 Pooled: 16 Pooled: 37 Pooled: 48 Pooled: 5
View Code

PriorityBlockingQueue里面存储的对象一定要实现Comparable接口。队列通过这个接口的compareTo()方法来实现优先级排序的。如果我们自己不重写compareTo()方法,那么会按照类中默认的排序方法,进行排序。

 

转载于:https://www.cnblogs.com/lihao007/p/7630090.html

你可能感兴趣的文章
Linux IO模式及 select、poll、epoll详解(转)
查看>>
Django用普通user对象登录的必须准备步骤
查看>>
Mac Launchpad出现两个相同快捷方式的解决办法
查看>>
AMP Physical Link Creation And Disconnect
查看>>
虚拟化技术基础原理
查看>>
spring-boot项目在外部tomcat环境下部署
查看>>
ld: framework not found FileProvider for architecture arm64
查看>>
Buildroot Qt 5
查看>>
Mysql中的条件语句if、case
查看>>
转 Linux调优方案,sysctl.conf的设置
查看>>
SparkSQL – 从0到1认识Catalyst(转载)
查看>>
[转]深入理解MFC中程序框架
查看>>
Delphi图像处理 -- RGB与HSL转换
查看>>
e297: write error in swap file
查看>>
XamarinAndroid组件教程RecylerView自定义适配器动画
查看>>
Node.js中环境变量process.env详解
查看>>
管理 python logging 日志使用
查看>>
vue - 实例事件
查看>>
Hibernate征途(五)之继承映射和组件映射
查看>>
桥接让XE2 DATASNAP中间层支持DELPHI低版本开发客户端
查看>>