4.condition接口的await与signal等待/通知机制
wait,notify,必须在同步代码块和同步方法中使用
4.1 Object的wait,notify方法是与内建锁(对象监视器monitor)搭配使用完成线程的等待与通知机制,属于jvm底层实现
4.2而Condition的await与signal是与Lock体系配合实现线程的等待与通知,属于java语言级别,具有更好的控制和扩展性
4.3 Condition三个独有特性:
1. Condition await 能够支持不响应中断,而通过使用Object方式不支持;
2. Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
3. Condition支持设置截止时间,而Object wait只支持设置超时时间
Lock和内建锁的区别:
Lock支持中断,内建锁不支持中断
4.4等待方法await()
- void await() throws InterruptedException(同wait());
无参,死等
- void awaitUninterruptibly();特性1,等待过程中不响应中断
- boolean await(long time, TimeUnit unit)throws InterruptedException:在1的基础上增加了超时等待功能,支持自定义时间单位
- boolean awaitUntil(Date deadline) throws InterruptedException:特性3,支持设置截止时间(前线程进入等待状态直到被通知,中断或者到了 某个时间)
4.5唤醒方法:signal 和signalAll()
4.6 Condition等待队列
Lock lock=new ReentrantLock ();
//获取与Lock绑定的Condition对象
Condition condition=lock.newCondition ();
newCondition ():new一个Condition对象即可
Condition队列与AQS(双向链表)中的同步队列共享节点(Node类)数据结构(带有头尾指针的单向队列)
同时还有一点需要注意的是:我们可以多次调用lock.newCondition()方法创建多个condition对象,也就是一个lock可 以持有多个等待队列。而在之前利用Object的方式实际上是指在对象Object对象监视器上只能拥有一个同步队列和 一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列。示意图如下:
应用场景:使用condition实现有界队列(限制他的个数)
4.7 await 实现原理
public final void await() throws InterruptedException {
//判断中断,如果有异常,直接抛出中断异常,然后退出,(await()默认响应中断
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程封装为Node节点后尾插入等待队列
Node node = addConditionWaiter();
//释放拿到的同步状态(也就是释放锁)
int savedState = fullyRelease(node);
int interruptMode = 0;
//(OnSyncQueue 同步)当线程不在同步队列就将其阻塞,置为wait状态
while (!isOnSyncQueue(node)) {
//在等待队列中期待被唤醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//独占式锁,在同步队列中排队获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//处理中断
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
问题:
1.如何将当前线程插入等待队列? addConditionWaiter()
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled(取消), clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//清空所有等待队列中状态不为condition的节点
unlinkCancelledWaiters();
//将最新的尾节点赋值
t = lastWaiter;
}
//将当前线程包装为Node节点且状态为condition
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾插入等待队列过程
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
2.释放锁的过程:
将线程包装为Node节点尾插入等待队列后,线程释放锁过程fullyRelease()
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取当前同步状态
int savedState = getState();
//调用release方法释放同步状态
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//若在释放过程中出现异常,把当前节点置为取消
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
- 线程如何能从await()方法中退出?
while (!isOnSyncQueue(node)) {
//在等待队列中期待被唤醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
3. 退出条件:
1.被唤醒后置入同步队列,退出循环
2.在等待时被中断了,通过break退出循环
4.8 signal实现原理
public final void signal() {
//判断当前线程是否拿到锁,如果没有拿到锁,就抛出异常,然后退出
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//拿到当前等待队列的头节点
Node first = firstWaiter;
if (first != null)
//唤醒头节点
doSignal(first);
}
private void doSignal(Node first) {
do {
//当头节点为空时,把头节点从等待队列中移除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//transferForSignal方法对头节点做真正的处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//首先将头节点状态由condition更新为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将节点使用enq方法尾插到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
结论:调用condition的signal的前提条件是当前线程已经获取了lock,该 方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等 待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成 功退出。
signal执行示意图如下图:
用condition写生产消费者模型:
package www.wl.java.link;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Goods{
private int maxCount;
private String name;
//当前商品数量
private int count;
private Lock lock=new ReentrantLock ();
//生产者等待队列
private Condition producerCondition=lock.newCondition ();
//消费者等待队列
private Condition consumerCondition=lock.newCondition ();
public Goods(int maxCount) {
this.maxCount = maxCount;
}
//生产者方法
public void setName(String name) {
lock.lock ();
try{
while (count==maxCount){
//当前商品已达到最大值,生产者线程需要等待
System.out.println (Thread.currentThread ().getName ()+"商品已达到最大值,歇会");
try{
producerCondition.await ();
}catch (InterruptedException e) {
e.printStackTrace ();
}
}
this.name=name;
count++;
System.out.println (Thread.currentThread ().getName ()+"生产"+toString ());
//唤醒消费者线程
consumerCondition.signalAll ();
} finally{
lock.unlock ();
}
}
//消费者方法
public void getName(String name) {
lock.lock ();
try{
while (count==0){
//当前商品为0,消费者线程需要等待
System.out.println (Thread.currentThread ().getName ()+"商品为0,歇会");
try{
consumerCondition.await ();
}catch (InterruptedException e) {
e.printStackTrace ();
}
}
count--;
System.out.println (Thread.currentThread ().getName ()+"消费"+toString ());
//唤醒生产者线程
producerCondition.signalAll ();
} finally{
lock.unlock ();
}
}
@Override
public String toString() {
return "Goods{" +
", name='" + name + '\'' +
", count=" + count +
'}';
}
}
//生产者线程
class Producer implements Runnable{
private Goods goods;
public Producer(Goods goods) {
this.goods = goods;
}
@Override//一直生产
public void run() {
while(true){
this.goods.setName ("生产华为");
}
}
}
//消费者线程
class Consumer implements Runnable{
private Goods goods;
public Consumer(Goods goods) {
this.goods = goods;
}
@Override
public void run() {
while(true){
this.goods.getName ("消费华为");
}
}
}
public class WL {
public static void main(String[] args) {
Goods goods=new Goods (100);
Producer producer=new Producer (goods);
Consumer consumer=new Consumer (goods);
List<Thread> list=new ArrayList<> ();
//5个生产者
for(int i=0;i<5;i++){
Thread thread=new Thread (producer,"生产者"+i);
list.add (thread);
}
//10个生产者
for(int i=0;i<5;i++){
Thread thread=new Thread (consumer,"消费者"+i);
list.add (thread);
}
for(Thread thread:list){
thread.start ();
}
}
}
使用Lock锁优点:不响应中断,多个等待队列,可以设置截止时间