专栏名称: aoho
Senior工程师
目录
相关文章推荐
清晨朗读会  ·  渊源直播 ·  昨天  
英文悦读  ·  推荐几个很不错的泛听材料 ·  3 天前  
清晨朗读会  ·  渊源直播 ·  2 天前  
清晨朗读会  ·  清晨朗读3307:How to Have ... ·  3 天前  
51好读  ›  专栏  ›  aoho

并发Lock之AQS(AbstractQueuedSynchronizer)详解

aoho  · 掘金  ·  · 2018-01-01 14:27

正文

请到「今天看啥」查看全文


unlinkCancelledWaiters(); t = lastWaiter; } //将线程封装成 node 准备放入 Condition Queue 里面 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ){ //Condition Queue 是空的 firstWaiter = node; } else { // 追加到 queue 尾部 t.nextWaiter = node; } lastWaiter = node; return node; }

4.2 删除Cancelled节点的方法 unlinkCancelledWaiters

当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是被中断或超时。在调用 addConditionWaiter 将线程放入 Condition Queue 里面时或 awiat 方法获取结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点。这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点。

private void unlinkCancelledWaiters(){
    Node t = firstWaiter;
    Node trail = null;
    while(t != null){
        Node next = t.nextWaiter;               // 1. 先初始化 next 节点
        if(t.waitStatus != Node.CONDITION){   // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
            t.nextWaiter = null;               // 3. Node.nextWaiter 置空
            if(trail == null){                  // 4. 一次都没有遇到有效的节点
                firstWaiter = next;            // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
            } else {
                trail.nextWaiter = next;       // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
            }
            if(next == null){                  // 7. next == null 说明 已经 traverse 完了 Condition Queue
                lastWaiter = trail;
            }
        }else{
            trail = t;                         // 8. 将有效节点赋值给 trail
        }
        t = next;
    }
}

4.3 转移节点的方法 transferForSignal

transferForSignal只有在节点被正常唤醒才调用的正常转移的方法。
将Node 从Condition Queue 转移到 Sync Queue 里面在调用transferForSignal之前, 会 first.nextWaiter = null;而我们发现若节点是因为 timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0

final boolean transferForSignal(Node node){
    /**
     * If cannot change waitStatus, the node has been cancelled
     */
    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已经 cancelled 则失败
        return false;
    }

    Node p = enq(node);                                 // 2. 加入 Sync Queue
    int ws = p.waitStatus;
    if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 这里的 ws > 0 指Sync Queue 中node 的前继节点cancelled 了, 所以, 唤醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明 前继节点已经变成 SIGNAL 或 cancelled, 所以也要 唤醒
        LockSupport.unpark(node.thread);
    }
    return true;
}

4.4 转移节点的方法 transferAfterCancelledWait

transferAfterCancelledWait 在节点获取lock时被中断或获取超时才调用的转移方法。将 Condition Queue 中因 timeout/interrupt 而唤醒的节点进行转移

final boolean transferAfterCancelledWait(Node node){
    if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 没有 node 没有 cancelled , 直接进行转移 (转移后, Sync Queue , Condition Queue 都会存在 node)
        enq(node);
        return true;
    }
    
    while(!isOnSyncQueue(node)){                // 2.这时是其他的线程发送signal,将本线程转移到 Sync Queue 里面的工程中(转移的过程中 waitStatus = 0了, 所以上面的 CAS 操作失败)
        Thread.yield();                         // 这里调用 isOnSyncQueue判断是否已经 入Sync Queue 了
    }
    return false;
}

5. Sync Queue

AQS内部维护着一个FIFO的CLH队列,所以AQS并不支持基于优先级的同步策略。至于为何要选择CLH队列,主要在于CLH锁相对于MSC锁,他更加容易处理cancel和timeout,同时他具备进出队列快、无所、畅通无阻、检查是否有线程在等待也非常容易(head != tail,头尾指针不同)。当然相对于原始的CLH队列锁,ASQ采用的是一种变种的CLH队列锁:

  1. 原始CLH使用的locked自旋,而AQS的CLH则是在每个node里面使用一个状态字段来控制阻塞,而不是自旋。
  2. 为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
  3. head结点使用的是傀儡结点。

SyncQueue
Sync Queue

这个图代表有个线程获取lock, 而 Node1, Node2, Node3 则在Sync Queue 里面进行等待获取lock(PS: 注意到 dummy Node 的SINGNAL 这是叫获取 lock 的线程在释放lock时通知后继节点的标示)

5.1 Sync Queue 节点入Queue方法

这里有个地方需要注意, 就是初始化 head, tail 的节点, 不一定是 head.next, 因为期间可能被其他的线程进行抢占了。将当前的线程封装成 Node 加入到 Sync Queue 里面。

private Node addWaiter(Node mode){
    Node node = new Node(Thread.currentThread(), mode);      // 1. 封装 Node
    Node pred = tail;
    if(pred != null){                           // 2. pred != null -> 队列中已经有节点, 直接 CAS 到尾节点
        node.prev = pred;                       // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时  node.prev 一定 != null(除 dummy node), 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
        if(compareAndSetTail(pred, node)){      // 4. CAS node 到 tail
            pred.next = node;                  // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
            return node;
        }
    }
    enq(node);                                 // 6. 队列为空, 调用 enq 入队列
    return node;
}


/**
 * 这个插入会检测head tail 的初始化, 必要的话会初始化一个 dummy 节点, 这个和 ConcurrentLinkedQueue 一样的
 * 将节点 node 加入队列
 * 这里有个注意点
 * 情况:
 *      1. 首先 queue是空的
 *      2. 初始化一个 dummy 节点
 *      3. 这时再在tail后面添加节点(这一步可能失败, 可能发生竞争被其他的线程抢占)
 *  这里为什么要加入一个 dummy 节点呢?
 *      这里的 Sync Queue 是CLH lock的一个变种, 线程节点 node 能否获取lock的判断通过其前继节点
 *      而且这里在当前节点想获取lock时通常给前继节点 打上 signal 的标识(表示前继节点释放lock需要通知我来获取lock)
 *      若这里不清楚的同学, 请先看看 CLH lock的资料 (这是理解 AQS 的基础)
 */
private Node enq(final Node node){
    for(;;){
        Node t = tail;
        if(t == null){ // Must initialize       // 1. 队列为空 初始化一个 dummy 节点 其实和 ConcurrentLinkedQueue 一样
            if(compareAndSetHead(new Node())){  // 2. 初始化 head 与 tail (这个CAS成功后, head 就有值了, 详情将 Unsafe 操作)
                tail = head;
            }
        }else{
            node.prev = t;                      // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时  node.prev 一定 != null, 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
            if(compareAndSetTail(t, node)){     // 4. CAS node 到 tail
                t.next = node;                  // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
                return t;
            }
        }
    }
}

5.2 Sync Queue 节点出Queue方法

这里的出Queue的方法其实有两个: 新节点获取lock, 调用setHead抢占head, 并且剔除原head;节点因被中断或获取超时而进行 cancelled, 最后被剔除。

/**
 * 设置 head 节点(在独占模式没有并发的可能, 当共享的模式有可能)
 */
private void setHead(Node node){
    head = node;
    node.thread = null






请到「今天看啥」查看全文