AND型信号量可能大家都听说过并可能都有一定的理解,但是你有使用过么?今天就使用Java来模拟实现!
本文是对上篇文章(进程同步机制)的一次实践,通过JUC提供的一些机制来模拟一些OS中的AND型信号量,因为记录型型信号量可以等价于JUC中提供的Semaphore(信号量),但是对于AND型信号量因为一些原因(主要是过时了),JUC没有提供,今天就手动的来写一个AND型信号量对应的Swait操作和Ssignal操作(这里不明白的可以看前面的理论篇)。通过本篇博文让你对进程同步机制有个更好的理解。
1.一个错误示例
在这里,首先解释一下,为了满足线程申请信号量不成功后将进程阻塞,并插入到对应的队列中,所以使用了ReentrantLock+Condition来实现Swait方法。废话不多说,直接上代码:
//数据定义 static Lock lock = new ReentrantLock(); static Condition condition1 = lock.newCondition(); static Condition condition2 = lock.newCondition(); public static void Swait(String id, Semaphore s1, Semaphore s2) throws InterruptedException { lock.tryLock(1, TimeUnit.SECONDS); log.info("当前的两个信号量的状态:【{},{}】", s1.availablePermits(), s2.availablePermits()); //availablePermits可获取到信号量中还剩余的值 if(s1.availablePermits() < 1 || s2.availablePermits() < 1){ if (s1.availablePermits() < 1) { log.info("线程【{}】被挂起到信号量【{}】中", id, s1); //阻塞,并插入到condition1的阻塞队列中 condition1.await(); } else { log.info("线程【{}】被挂起到信号量【{}】中", id, s2); //阻塞,并插入到condition2的阻塞队列中 condition2.await(); } log.info("被挂起的线程【{}】被唤醒执行。", id); } else { log.info("为线程【{}】分配资源!", id); s1.acquire(); s2.acquire(); } lock.unlock(); } public static void Ssignal(Semaphore s1, Semaphore s2) throws InterruptedException { log.info("线程【{}】执行了释放资源", id); lock.tryLock(1, TimeUnit.SECONDS); s1.release(); s2.release(); //唤醒等待队列中的线程 condition.signal(); lock.unlock(); }
大家仔细看上面的代码,这个也是我刚开始写的代码,第一眼看似乎是没什么问题,但是里面隐藏着一个坑,在Swait方法中,调用condition1.await(),此时线程被阻塞在这一行中,但是当被别的线程(调用Ssignal)唤醒时,在被阻塞的下一行开始继续执行,但是在后续的代码里,是没有去申请信号量的,而是直接就Swait成功了,这样在执行Ssignal时就会导致信号量凭空的增加了,也就无法正确的表征系统中的资源数量了。
2.一个简单的示例
下面我们就对代码进行优化,大家可以回顾一下AND型信号量,当其因为资源不足时,需要将线程插入到第一个无法满足条件(即Si<1)的信号量对应的等待队列中,并且将程序计数器放置到Swait操作的开始处,所以我们对Swait代码进行修改如下:
public static void Swait(String id, Semaphore s1, Semaphore s2) throws InterruptedException { lock.tryLock(1, TimeUnit.SECONDS); log.info("当前的两个信号量的状态:【{},{}】", s1.availablePermits(), s2.availablePermits()); //如果申请不到,就挂起线程,并将线程插入到condition的队列中 while (s1.availablePermits() < 1 || s2.availablePermits() < 1) { if (s1.availablePermits() < 1) { log.info("线程【{}】被挂起到信号量【{}】中", id, s1); condition1.await(); } else { log.info("线程【{}】被挂起到信号量【{}】中", id, s2); condition2.await(); } log.info("被挂起的线程【{}】被唤醒执行。", id); } log.info("为线程【{}】分配资源!", id); s1.acquire(); s2.acquire(); lock.unlock(); }
在上面的代码中,我们将请求的资源放到一个循环条件中,以满足将程序计数器放置到Swait操作的开始处,在每次被唤醒后都要重新判断资源是否足够,如果足够才跳出循环,否则就再次自我阻塞。
3.一个可以同时申请N个的Swait操作
如果你知道了信号量的种类数(系统中的资源类型),其实上面的代码已经可以满足一定的需要了,只需要我们将所有的信号量写入到参数列表中即可。但是对于致力于代码的复用,这里就有些差强人意了,因此我们再次对代码进行改进,代码如下所示:
public static void Swait(String id, Semaphore... list) throws InterruptedException { lock.lock(); //如果资源不足,就挂起线程,并将线程插入到condition的队列中 while (true) { int count=0; //循环判断参数列表中信号量的可用值 for (Semaphore semaphore:list){ if(semaphore.availablePermits()>0){ count++; } } //如果资源都满足,则跳出循环,进行资源分配 if(count == list.length){ break; } log.info("线程【{}】被挂起-----", id); //将当前线程阻塞 condition1.await(); log.info("被挂起的线程【{}】被唤醒执行。", id); } log.info("为线程【{}】分配资源!", id); //分配资源 for (Semaphore semaphore:list){ semaphore.acquire(); } lock.unlock(); } public static void Ssignal(String id, Semaphore... list) throws InterruptedException { log.info("线程【{}】执行了释放资源", id); lock.tryLock(1, TimeUnit.SECONDS); //循环释放信号量 for (Semaphore semaphore:list){ semaphore.release(); } //唤醒等待队列中的线程 condition.signal(); lock.unlock(); }
为此,我们将方法中的信号量列表改为可变的参数列表,这样在传参的时候就可以方便的进行了,但是也会存才一些问题,比如无法约束“借出”与“归还”的信号量的数量是否一致。并且因为信号量的数量不定,所以无法为每个信号量新建一个条件变量(Condition),因此在上面的代码中所有的信号量公用一个条件变量,所有阻塞的线程都插入在其阻塞队列中。
4.一个完整的例子
这里我们使用一个经典的进程同步问题来演示我们使用Java模拟的AND型信号量,在这里,我们采用生产者–消费者问题来演示,完整的代码如下:
//用来保证互斥的访问临界区(缓存区) static final Semaphore mutex = new Semaphore(1); //缓冲区,最大容量为50 static List<Integer> buffer = new ArrayList<>(); //缓冲区中还可放入的消息数量 static final Semaphore empty = new Semaphore(50); //缓冲区中的消息数量 static final Semaphore full = new Semaphore(0); //可重入锁和条件变量 static Lock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); //用与辅助的简单的生成消息 static Integer count = 0; //生产者 static class Producer extends Thread { Producer(String name) { super.setName(name); } @Override public void run() { do { try { Swait(this.getName(), mutex, empty); log.info("生产了一条消息:【{}】", count); buffer.add(count++); Thread.sleep(1000); Ssignal(this.getName(), mutex, full); } catch (InterruptedException e) { log.error("生产消息时产生异常!"); } } while (true); } } //消费者 static class Consumer extends Thread { Consumer(String name) { super.setName(name); } @Override public void run() { do { try { Swait(this.getName(), mutex, full); log.info("消费了一条消息:【{}】", buffer.remove(0)); Thread.sleep(1000); Ssignal(this.getName(), mutex, empty); } catch (InterruptedException e) { log.error("消费消息时产生异常!"); } } while (true); } } public static void Swait(String id, Semaphore... list) throws InterruptedException { lock.lock(); //如果资源不足,就挂起线程,并将线程插入到condition的队列中 while (true) { int count=0; for (Semaphore semaphore:list){ if(semaphore.availablePermits()>0){ count++; } } if(count == list.length){ break; } log.info("线程【{}】被挂起", id); condition.await(); log.info("被挂起的线程【{}】被唤醒执行。", id); } log.info("为线程【{}】分配资源!", id); for (Semaphore semaphore:list){ semaphore.acquire(); } lock.unlock(); } public static void Ssignal(String id, Semaphore... list) throws InterruptedException { log.info("线程【{}】执行了释放资源", id); lock.tryLock(1, TimeUnit.SECONDS); for (Semaphore semaphore:list){ semaphore.release(); } //唤醒等待队列中的一个线程 condition.signal(); lock.unlock(); } public static void main(String[] args) { Producer p1 = new Producer("p1"); Consumer c1 = new Consumer("c1"); p1.start(); c1.start(); }
上面代码都是可以直接执行的,如果不需要使用参数列表,可以将上面的Swait方法进行替换即可(记得创建对应的条件变量)。
下图是部分的执行结果:
本文的所有java代码都已通过测试,对其中有什么疑惑的,可以评论区留言,欢迎你的留言与讨论;另外原创不易,如果本文对你有所帮助,还请留下个赞,以表支持。
希望本文可以帮助你理解加深理解进程同步,也可以帮助你理解Java并发编程。
极牛网精选文章《Java并发编程(JUC)模拟AND型信号量》文中所述为作者独立观点,不代表极牛网立场。如有侵权请联系删除。如若转载请注明出处:https://geeknb.com/6048.html