正文
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node =
new
Node(Thread.currentThread(), Node.CONDITION);
if
(t ==
null
){
firstWaiter = node;
}
else
{
t.nextWaiter = node;
}
lastWaiter = node;
return
node;
}
4.2 删除Cancelled节点的方法 unlinkCancelledWaiters
当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是被中断或超时。在调用 addConditionWaiter 将线程放入 Condition Queue 里面时或 awiat 方法获取结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点。这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点。
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter;
if(t.waitStatus != Node.CONDITION){
t.nextWaiter = null;
if(trail == null){
firstWaiter = next;
} else {
trail.nextWaiter = next;
}
if(next == null){
lastWaiter = trail;
}
}else{
trail = t;
}
t = next;
}
}
4.3 转移节点的方法 transferForSignal
transferForSignal只有在节点被正常唤醒才调用的正常转移的方法。
将Node 从Condition Queue 转移到 Sync Queue 里面在调用transferForSignal之前, 会 first.nextWaiter = null;而我们发现若节点是因为 timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0
final boolean transferForSignal(Node node){
if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
return false;
}
Node p = enq(node);
int ws = p.waitStatus;
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(node.thread);
}
return true;
}
4.4 转移节点的方法 transferAfterCancelledWait
transferAfterCancelledWait 在节点获取lock时被中断或获取超时才调用的转移方法。将 Condition Queue 中因 timeout/interrupt 而唤醒的节点进行转移
final boolean transferAfterCancelledWait(Node node){
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
enq(node);
return true;
}
while(!isOnSyncQueue(node)){
Thread.yield();
}
return false;
}
5. Sync Queue
AQS内部维护着一个FIFO的CLH队列,所以AQS并不支持基于优先级的同步策略。至于为何要选择CLH队列,主要在于CLH锁相对于MSC锁,他更加容易处理cancel和timeout,同时他具备进出队列快、无所、畅通无阻、检查是否有线程在等待也非常容易(head != tail,头尾指针不同)。当然相对于原始的CLH队列锁,ASQ采用的是一种变种的CLH队列锁:
-
原始CLH使用的locked自旋,而AQS的CLH则是在每个node里面使用一个状态字段来控制阻塞,而不是自旋。
-
为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
-
head结点使用的是傀儡结点。
Sync Queue
这个图代表有个线程获取lock, 而 Node1, Node2, Node3 则在Sync Queue 里面进行等待获取lock(PS: 注意到 dummy Node 的SINGNAL 这是叫获取 lock 的线程在释放lock时通知后继节点的标示)
5.1 Sync Queue 节点入Queue方法
这里有个地方需要注意, 就是初始化 head, tail 的节点, 不一定是 head.next, 因为期间可能被其他的线程进行抢占了。将当前的线程封装成 Node 加入到 Sync Queue 里面。
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred, node)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){
if(compareAndSetHead(new Node())){
tail = head;
}
}else{
node.prev = t;
if(compareAndSetTail(t, node)){
t.next = node;
return t;
}
}
}
}
5.2 Sync Queue 节点出Queue方法
这里的出Queue的方法其实有两个:
新节点获取lock, 调用setHead抢占head, 并且剔除原head;节点因被中断或获取超时而进行 cancelled, 最后被剔除。
private void setHead(Node node){
head = node;
node.thread = null