Java并发之Condition

在java中,对于任意一个java对象,它都拥有一组定义在java.lang.Object上监视器方法,包括wait(),wait(long timeout),notify(),notifyAll(),这些方法配合synchronized关键字一起使用可以实现等待/通知模式。

wait()、notify()和notifyAll()方法是本地方法,并且为final方法,无法被重写。

调用某个对象的wait()方法能让当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁)

调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程;

调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程;

notify()和notifyAll()方法只是唤醒等待该对象的monitor的线程,并不决定哪个线程能够获取到monitor。

同样,Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。

为了更好的了解Condition的特性,我们来对比一下两者的使用方式以及功能特性:

对比项 Object监视器 Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象
调用方式 直接调用,比如object.notify() 直接调用,比如condition.await()
等待队列的个数 一个 多个
当前线程释放锁进入等待状态 支持 支持
当前线程释放锁进入等待状态,在等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态直到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

Condition的概念

synchronized关键字,它配合Object的wait()、notify()系列方法可以实现等待/通知模式。

对于Lock,通过Condition也可以实现等待/通知模式。

AQS同步器维护了一个同步队列,其实还维护了多个等待队列,两种队列均为FIFO队列

Condition是一个接口,Condition接口的实现类是Lock(AQS)中的ConditionObject。

Lock接口中有个 newCondition()方法,通过这个方法可以获得Condition对象(其实就是ConditionObject)。
因此,通过Lock对象可以获得Condition对象。

1
2
3
Lock lock  = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();

Condition的实现分析

实现

ConditionObject类是AQS的内部类,实现了Condition接口。

1
2
3
4
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
...

可以看到,等待队列和同步队列一样,使用的都是同步器AQS中的节点类Node。
同样拥有首节点和尾节点,
每个Condition对象都包含着一个FIFO队列。
结构图:

等待

调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
//释放同步状态(锁)
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断节点是否放入同步对列
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

分析上述方法的大概过程:

  1. 将当前线程创建为节点,加入等待队列。
  2. 释放锁,唤醒同步队列中的后继节点。
  3. while 循环判断节点是否放入同步队列;如果没有放入则阻塞继续 while 循环(如果已经中断则退出);如果放入则退出 while 循环执行后面的判断。
  4. 退出 while 说明节点已经在同步队列中,调用 acquireQueued() 方法加入同步状态竞争。
  5. 竞争到锁后从 await() 方法返回,即退出该方法。

addConditionWaiter() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//清除条件队列中所有状态不为Condition的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//将该线程创建节点,放入等待队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

过程分析:同步队列的首节点移动到等待队列。加入尾节点之前会清除所有状态不为 Condition 的节点。

通知

调用 Condition 的 signal() 方法可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。

1
2
3
4
5
6
7
8
public final void signal() {
//判断是否获取了锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

过程:

  1. 先判断当前线程是否获取了锁。
  2. 然后对首节点调用 doSignal() 方法。
1
2
3
4
5
6
7
8
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

过程:

  1. 修改首节点。
  2. 调用 transferForSignal() 方法将节点移动到同步队列。
1
2
3
4
5
6
7
8
9
10
11
12
final boolean transferForSignal(Node node) {
//将节点状态变为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将该节点加入同步队列
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

调用同步器的 enq 方法,将节点移动到同步队列,满足条件后使用 LockSupport 唤醒该线程。

当 Condition 调用 signalAll() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

可以看到 doSignalAll() 方法使用了 do-while 循环来唤醒每一个等待队列中的节点,直到 first 为 null 时停止循环。

一句话总结 signalAll() 的作用:将等待队列中的全部节点移动到同步队列中,并唤醒每个节点的线程。

Conditon中的await()对应Object的wait();

Condition中的signal()对应Object的notify();

Condition中的signalAll()对应Object的notifyAll()。

总结

整个过程可以分为三步:

第一步:一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到等待队列中,并释放锁。然后就在 await() 中的一个 while 循环中判断节点是否已经在同步队列,是则尝试获取锁,否则一直阻塞。

第二步:当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法将节点移动到同步队列,并唤醒节点中的线程。

第三步:被唤醒的线程,将从 await() 中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。竞争成功则退出 await() 方法继续执行。

参考

https://juejin.im/post/5b69a5a151882563522b7e42

https://www.cnblogs.com/csuwater/p/5411693.html

http://www.importnew.com/30150.html

https://www.jb51.net/article/134496.htm

看官可在此打赏