GCD之线程原理向

GCD系列 文章之原理向,本文将通过libdispatchlibpthreadxnu详细的剖析GCD实现原理。

背景

本文将直接从源码讲起,读者可以对照前一篇概念向的文章来阅读本文,所有源码均在苹果开源官网下可下载
源码 | 版本
-|-
libdispatch | 1008.250.7
libpthread | 330.250.2
xnu | 6153.11.26

队列的创建

通过dispatch_queue_create或者通过dispatch_queue_create_with_target来创建一个队列,二者的实现都一致,只不过第一个函数的targetroot queue。下面直接看实现

dispatch_queue_create

1
2
3
4
5
6
dispatch_queue_t
dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq)
{
return _dispatch_lane_create_with_target(label, dqa, tq, false);
}

直接调用_dispatch_lane_create_with_target,该方法流程较长(其实源码的注释已经很清晰了),这里缩减如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);

//
// Step 1: Normalize arguments (qos, overcommit, tq)
//
// tq 默认值 = _dispatch_get_root_queue

//
// Step 2: Initialize the queue
//

if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
legacy = false;
}
}

const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqai.dqai_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}

dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

dq->dq_label = label;
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
dqai.dqai_relpri);
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
if (!dqai.dqai_inactive) {
_dispatch_queue_priority_inherit_from_target(dq, tq);
_dispatch_lane_inherit_wlh_from_target(dq, tq);
}
_dispatch_retain(tq);
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq).rdxz_dq;
}

这个方法冲注释来看来还是比较直观:

  1. 初始化队列的参数:qos, overcommit, tq。
  2. 初始化队列。把1中的参数赋值给队列。返回。

While custom queues are a powerful abstraction, all blocks you schedule on them will ultimately trickle down to one of the system’s global queues and its thread pool(s).

有点需要注意的就是,自定义的队列block ,都将会在全局队列中被执行。

sync

先从dispatch_sync()的方法定义,在dispatch/queue.h中,我们可以得到如下信息:

  • 提交一个工作项(workitem),但是直到这个workitem执行完成前,dispatch_sync()都不会返回
  • 在当前线程中调用dispatch_sync(),会造成死锁
  • dispatch_sync()对于主队列和target为主队列的workitem做了一些优化。

接下来我们查看下具体dispatch_sync()实现,来到libdispatch的源码,dispatch_sync()会经过下面两个函数的包装之后,

dispatch_sync()
_dispatch_sync_f()
_dispatch_sync_f_inline()

_dispatch_sync_f_inline

这里放出伪代码1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
   dispatch_queue_t queue = dispatch_queue_create("deadLockTest", DISPATCH_QUEUE_SERIAL);
// dispatch_async(dispatch_queue_create("test2", DISPATCH_QUEUE_CONCURRENT), ^{
// sleep(5);
// dispatch_sync(queue, ^{
//
// NSLog(@"\n 2222");
// });
// });
for (int i = 0; i<10; i++) {

NSLog(@"\n before");
dispatch_sync(queue, ^{
// sleep(5);
NSLog(@"\n i = %d",i);
});
dispatch_sync(queue, ^{
// sleep(5);
NSLog(@"\n i = %d",i);
});
NSLog(@"\n end");
}

进入主要的流程: _dispatch_sync_f_inline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// dq:queue, ctxt:gcd block,_dispatch_Block_invoke: block的实现
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
//如果线程是穿行队列:并发数是1
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
//DISPATCH_LANE_TYPE 为并行队列类型。
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}

if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);

// dispatch_object_t obj = _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags);
// _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(obj));


_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

先来看下对于穿行队列的实现,_dispatch_barrier_sync_f的真正实现为_dispatch_barrier_sync_f_inline

_dispatch_barrier_sync_f_inline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static inline void
// dq:queue, ctxt:gcd block,_dispatch_Block_invoke: block的实现
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
//获取线程tid
dispatch_tid tid = _dispatch_tid_self();

//DISPATCH_LANE_TYPE 为并行队列类型。
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}t

dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE

//尝试给这个队列加锁,如果加锁失败,说明当前队列正在被调度,则进行慢速的同步流程,
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
//是否存在多级转发的情况,如果存在就进行递归处理
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
//到这里说明队列中的任务可以立即被执行,并且在执行完毕之后要重置队列的dq_state
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

此函数的逻辑如下:
对串行队列进行尝试加锁处理。通过对dq_state的维护来判断调度状态,如果加锁失败则:
说明当前串行队里有item正在被执行,需要等待前一个item执行,因此进入slow环节。
反之如果加锁成功:
则说明当前item可以直接被执行,则直接执行,并在完成后重置dq_state的状态。

接下来看一下不需要等待的流程

_dispatch_lane_barrier_sync_invoke_and_complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
* For queues we can cheat and inline the unlock code, which is invalid
* for objects with a more complex state machine (sources or mach channels)
*/
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
//调用_dispatch_client_callout,执行gcd block
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
//如果队列的末尾还有待工作的 item,或者不是串行。
if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
//因为还需要继续执行下一个item,因此需要下面的函数来触发下个item的调度
//主要这个函数的命名 没有 sync
return _dispatch_lane_barrier_complete(dq, 0, 0);
}

// Presence of any of these bits requires more work that only
// _dispatch_*_barrier_complete() handles properly
//
// Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
// checking the role is sloppy, but is a super fast check, and neither of
// these bits should be set if the lock was never contended/discovered.
const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;

// 0xff80000000000000ull|0x00000001|0x0000008000000000ull|0x0000000800000000ull|0x00000002|0x0000000800000000ull
uint64_t old_state, new_state;
//恢复线程的初始状态
// similar to _dispatch_queue_drain_try_unlock
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
if (unlikely(old_state & fail_unlock_mask)) {
os_atomic_rmw_loop_give_up({
//解锁失败,说明有其他线程再次修改了 该队列的state,也就是又有任务被追加了进来。
return _dispatch_lane_barrier_complete(dq, 0, 0);
});
}
});
if (_dq_state_is_base_wlh(old_state)) {
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
}

此函数的逻辑如下:

  1. 首先进行当前队列中的item回调,也就是执行我们写的GCD block中的代码。
  2. 如果队列中还有其他任务的调用_dispatch_lane_barrier_complete
  3. 尝试将队列的状态:dq_state 恢复到初始状态。
    1. 恢复成功则执行结束。
    2. 恢复失败(其他线程修改dq_state,调度了该队列),则调用_dispatch_lane_barrier_complete

下面是单纯的sync+串行队列的state 变化过程
dq_state
下面我们把伪代码中的test2队列的注释打开,来模拟调用_dispatch_lane_barrier_complete_dispatch_sync_f_slow的场景。
首先打开注释的执行结果应该是:

  1. async创建一个线程来执行并sleep(5)。
  2. 原来的deadLockTest会在主线程依次被执行。
  3. deadLockTest执行的某一个时刻,deadLockTest又通过async之前开辟的线程同时被调度。

加下来我们结合之前的结论来观察调用栈:
通过上图我们来梳理下线程和队列当前的状态:

  1. 主线程中在deadLockTest的一个任务执行完毕后,尝试恢复队列的初始状态,由于在test2中被
  2. 调度test2的子线程在尝试同步执行deadlock队列。调用_dispatch_queue_try_acquire_barrier_sync时,触发了slow,内部

我们先来看串行队列判断下一个任务需要等待执行的

_dispatch_queue_try_acquire_barrier_sync

_dispatch_queue_try_acquire_barrier_sync内部直接调用了_dispatch_queue_try_acquire_barrier_sync_and_suspend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width); //QUEUE_FULL and IN_BARRIER
//获得加锁(下一个item的执行需要被等待)的的状态,队列并发数上限,barrier,已经被加锁过的tid,和暂停状态。
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;

// bool _result = false;
// typeof(&(dq)->f) _p = (&(dq)->f);
// ov = os_atomic_load(_p, relaxed)//atomic_load_explicit(_os_atomic_c11_atomic(p), memory_order_relaxed)
// do {
// uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
// if (old_state != (init | role)) {
// os_atomic_rmw_loop_give_up(break);
// }
// new_state = value | role;
// _result = os_atomic_cmpxchgvw(_p, old_state, new_state, &old_state, acquire);
// }while(os_unlikely(!_result))
// return _result;
//这个函数的作用就是尝试更新dq_state:
//如果old_state != init(说明当前队列的dq_state已经被修改过),那么放弃修改直接返回false
//否则更新dq_state 返回true
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}

这个函数的作用就是对一个队列尝试加锁的操作,如果加锁失败则直接放弃,否则修改dq_state。流程如下:

  1. 获取队列执行需要等待的条件:value
  2. 获取队列的当前状态:old_state
    1. 如果不是初始状态则放弃并返回false:加锁失败,说明当前队列已经处在加锁状态。
    2. 加锁成功,把 value的值赋值给dq_state。其中包含被加锁的线程id:tid。

等待的条件value如下:

  1. 队列并发数上限。
  2. 处在barrier中。
  3. 已经被加锁过的tid。
  4. 暂停状态。

接下来是等待函数的处理

_dispatch_sync_f_slow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
//保存当前队列的运行上下文。
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};

_dispatch_trace_item_push(top_dq, &dsc);
//核心的等待函数,如果进入等待状态,会阻塞下面的执行。
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

//经过等待之后,执行item
if (dsc.dsc_func == NULL) {
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}

_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}

这个方法是慢速执行的流程,可以分为3个步骤:

  1. 构造队列执行上下文,并保存。
  2. 进入等待。
  3. 等待结束,被信号量或者kevent唤醒,开始继续执行。

当进入到 __DISPATCH_WAIT_FOR_QUEUE__ 就会阻塞下面代码的执行,直到等待结束。下面我们来看一下等待的实现

__DISPATCH_WAIT_FOR_QUEUE__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
//死锁检测
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}

// Blocks submitted to the main thread MUST run on the main thread, and
// dispatch_async_and_wait also executes on the remote context rather than
// the current thread.
//
// For both these cases we need to save the frame linkage for the sake of
// _dispatch_async_and_wait_invoke
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);

//根据dq_state 选择等待的策略,
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
//如果不是上面的两种,就重新计算等待策略
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}

//初始化
if (dsc->dc_data == DISPATCH_WLH_ANON) {
dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
(uint8_t)_dispatch_get_basepri_override_qos_floor();
_dispatch_thread_event_init(&dsc->dsc_event);
}
// 将要执行的任务入队
dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
_dispatch_trace_runtime_event(sync_wait, dq, 0);
if (dsc->dc_data == DISPATCH_WLH_ANON) {
//基于信号量的等待
_dispatch_thread_event_wait(&dsc->dsc_event); // acquire
} else {
//基于kevent:_dispatch_kq_poll
_dispatch_event_loop_wait_for_ownership(dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_destroy(&dsc->dsc_event);
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
}
}
}

先来看一下死锁检测,在上面分析过的_dispatch_queue_try_acquire_barrier_sync_and_suspend,也就是修改队列的dq_state时,会记录下正在调度的线程的tid,而在构建上下文的过程中,同样会对dsc->dsc_waiter写入线程的 tid,所以_dq_state_drain_locked_by这个其实就是:
即将进入等待的线程的tid,是否已经持有了锁。(线程已经处在持有锁的的状态,如果在进入等待,就是死锁)

接下来的就是等待的策略了。如下:

  1. 根据dq_state计算等待类型并更新给dsc->dc_data
    1. 根据暂停,wlh的状态分别给dsc->dc_data
    2. 如果都不满足就根据上下文和dq重新计算。
  2. 将本次任务加入对应的队列。
    1. 调用对应队列的dq_push,这里是_dispatch_lane_push
  3. 执行等待策略。
    1. 基于信号量的等待。
    2. 基于kevent的等待。

关于调用队列的_dispatch_lane_push方法,由于此时当前任务在等待执行,因此会进一步调用_dispatch_lane_push_waiter,该方法的主要相关流程如下:

  1. 更新dq_state的状态:包括设置为DIRTY,qos的合并等。
  2. 根据新的dq_state,执行对应方法:override,complete等。

    lldb调试如下:
    lldb
    lldb

基于kevent的等待事件(关于kevent,kqueue请查看GCD概念篇),最终会调用_dispatch_kq_poll来轮询等待内核事件,那么等待结束的时机是什么呢?同样通过一组调用栈来查看:
lldb

而另外正在执行的线程的调用栈如下:
lldb

其实就是前一个任务执行完毕时。回到我们上面说的item执行完毕的函数_dispatch_lane_barrier_sync_invoke_and_complete。在该方法内,会有两个判断分支调用_dispatch_lane_barrier_complete。分别为:队列的尾部不为空或者dq_state被其他线程修改了。也就是我们分析过的 wait内部的流程。

_dispatch_lane_barrier_complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
dispatch_lane_t dq = dqu._dl;
//后面还有任务 && 队列没有被暂停
if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
//串行 或者 是barrier
if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
if (_dispatch_object_is_waiter(dc)) {
//这里只需要判断是否在等待,如果没有在等待直接会走最下面的_dispatch_lane_class_barrier_complete
return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
}
} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
//并行或者没有处在barrier中
return _dispatch_lane_drain_non_barriers(dq, dc, flags);
}

if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}

这里有个小技巧,当分析 barrier相关方法时,可以这样想:

当栅栏中的一个任务执行完毕后(barrier_complete),应该通知下一个任务开始执行。同步函数也有这样的特性。

这里的三个分支,大体上逻辑相同:通过对dq_state的和队列的执行状态判断直接唤起还是redirecttargetQueue。这里我们直接看_dispatch_lane_drain_barrier_waiter

_dispatch_lane_drain_barrier_waiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static void
_dispatch_lane_drain_barrier_waiter(dispatch_lane_t dq,
struct dispatch_object_s *dc, dispatch_wakeup_flags_t flags,
uint64_t enqueued_bits)
{
dispatch_sync_context_t dsc = (dispatch_sync_context_t)dc;
struct dispatch_object_s *next_dc;
uint64_t next_owner = 0, old_state, new_state;

next_owner = _dispatch_lock_value_from_tid(dsc->dsc_waiter);
next_dc = _dispatch_queue_pop_head(dq, dc);
//这里下一个 item的执行过程中,dq_state的enqueued仍然有效,保证队列的正常执行(保持锁的状态)
transfer_lock_again:
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state;
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
new_state &= ~DISPATCH_QUEUE_DIRTY;
new_state |= next_owner;

if (_dq_state_is_base_wlh(old_state)) {
new_state |= DISPATCH_QUEUE_SYNC_TRANSFER;
if (next_dc) {
// we know there's a next item, keep the enqueued bit if any
} else if (unlikely(_dq_state_is_dirty(old_state))) {
os_atomic_rmw_loop_give_up({
os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
next_dc = os_atomic_load2o(dq, dq_items_head, relaxed);
goto transfer_lock_again;
});
} else {
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
new_state &= ~DISPATCH_QUEUE_ENQUEUED;
}
} else {
new_state -= enqueued_bits;
}
});

//唤起或redirect
return _dispatch_barrier_waiter_redirect_or_wake(dq, dc, flags,
old_state, new_state);
}

同步函数总结

下面通过一张图来总结同步函数的流程。
sync

async

dispatch_async

1
2
3
4
5
6
7
8
9
10
11
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
//先把GCD block 包装成continuation
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;

qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

这个函数主要就是对work的包装。

_dispatch_continuation_async

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}

dx_push的定义如下

1
2
#define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)

会调用dqdo_push方法,可以在init.c中查看到do_push的定义,这里直接给出结论_dispatch_lane_concurrent_push

_dispatch_lane_concurrent_push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}

_dispatch_lane_push(dq, dou, qos);
}

这里第一次看也想了很久,通过lldb调试,发现每次都命中了if中的条件,也就是dq->dq_items_tail == NULL

第一感觉是这里不应该为NULL的啊?因为在调用dispatch_async时,传入了block,那就应该会被追加到队里。

这句话只对了一半(后半句),确实block (continuation)会被追加到队列中,但并不是我们创建的队列,而是root queue。因此当前code>dq->dq_items_tail == NULL也就解释的通了。接下来就是追加root queue的过程。

_dispatch_continuation_redirect_push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
// 这里 dou 为dispatch_continuation_t
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}

dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
dx_push(dq, dou, qos);
}

再次感叹GCD命名的艺术,该方法的最终会把任务提交到targetq,一般为root queue。在创建队列的章节里已经说明过。因此会调用到_dispatch_root_queue_push

_dispatch_root_queue_push

1
2
3
4
5
6
7
8
9
10
11
12
13
static void 
_dispatch_root_queue_push(dispatch_queue_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
...//DISPATCH_USE_KEVENT_WORKQUEUE

// 一般情况下,无论自定义还是非自定义都会走进这个条件式(比如:dispatch_get_global_queue)
// 里面主要对比的是qos与root队列的qos是否一致。
if (_dispatch_root_queue_push_needs_override(rq, qos)) {
return _dispatch_root_queue_push_override(rq, dou, qos);
}
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

这个方法比较简单,就是看该任务是直接入队到根队列。还是先执行override。通常通过async获得的队列,都满足条件。

_dispatch_root_queue_push_needs_override

被标记为需要override的队列会重新在根队列池中,重新获取一个对应QOS 的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void 
_dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
dispatch_object_t dou, dispatch_qos_t qos)
{
bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
//从GCD 的根队列池中重新获取一个
dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
dispatch_continuation_t dc = dou._dc;
// 这个_dispatch_object_is_redirection函数其实就是return _dispatch_object_has_type(dou,DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));
// 所以自定义队列会走这个if语句,如果是dispatch_get_global_queue不会走if语句
if (_dispatch_object_is_redirection(dc)) {
dc->dc_func = (void *)orig_rq;
} else {
// dispatch_get_global_queue来到这里
dc = _dispatch_continuation_alloc();
// 相当于是下面的,也就是指定了执行函数为_dispatch_queue_override_invoke,所以有别于自定义队列的invoke函数。
// DC_VTABLE_ENTRY(OVERRIDE_OWNING,
// .do_kind = "dc-override-owning",
// .do_invoke = _dispatch_queue_override_invoke),
dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
_dispatch_trace_continuation_push(orig_rq, dou);
dc->dc_ctxt = dc;
dc->dc_other = orig_rq;
dc->dc_data = dou._do;
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
}
_dispatch_root_queue_push_inline(rq, dc, dc, 1);
}

该函数的主要作用就是GCD root queues 中重新回去一个队列,并且把对应的continuation和新申请的队列,执行_dispatch_root_queue_push_inline

1
2
3
4
5
6
7
8
9
10
11
12

static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
//当队列为的头为空时
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
// 保存队列头
return _dispatch_root_queue_poke(dq, n, 0);
}
}

os_mpsc_push_list被定义在inline_internal.h中,含义为:如果队列的head 为空,则返回true。

该函数为真正的操作任务入队的函数,以为被调用的路径不同,需要先判断入队之前是否队列的头部为空(override的路径到这里为空),如果为空需要进行_dispatch_global_queue_poke

_dispatch_global_queue_poke

还函数会进一步调用_dispatch_root_queue_poke_slow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;

_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);

#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
if (likely(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if (!--remaining) {
return;
}
}
}

bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (overcommit) {
os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
} else {
if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return;
}
}

int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));

pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
pthr = _dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#else
(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}

首先对root queues进行了初始化,_dispatch_root_queues_init函数会在很多地方被调用,目的是要保证在使用GCD之前,先初始化好根队列。其实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
_dispatch_fork_becomes_unsafe();
#if DISPATCH_USE_INTERNAL_WORKQUEUE
//使用pthread 线程池
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
_dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
_dispatch_root_queues[i].dq_priority);
}
#else
//使用_pthread_workqueue(XNU workq)
int wq_supported = _pthread_workqueue_supported();
int r = ENOTSUP;

if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
DISPATCH_INTERNAL_CRASH(wq_supported,
"QoS Maintenance support required");
}
//下面这两个init 函数非常重要。下层会将_dispatch_worker_thread2 函数绑定到线程的回调函数上。
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#if DISPATCH_USE_KEVENT_WORKQUEUE
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif
} else {
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}

if (r != 0) {
DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
"Root queue initialization failed");
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}

在初始化过程中,会判断使用哪种线程。分为pthread pool或者xnu workqueue
同时下面的两个_pthread_workqueue_init函数也十分重要。将_dispatch_worker_thread2 函数绑定到线程的回调函数上。

继续回到_dispatch_root_queue_poke_slow函数中。
这里有我们看到了有两种创建线程的方式:

  1. _pthread_workqueue_addthreads
  2. pthread_create。

下面通过lldb来确定GCD使用哪种方式
lldb

因此可以确定GCD使用_pthread_workqueue_addthreads来申请线程。我们先来看任务的执行,然后在回到申请线程的下层。

继续通过lldb获得GCD block 的调用堆栈。如图:
lldb
_pthread_wqthread会调用到_dispatch_worker_thread2_pthread_workqueue_addthreads_pthread_wqthread我们会在libpthread章节来说明。

_dispatch_worker_thread2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void
_dispatch_worker_thread2(pthread_priority_t pp)
{
bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
dispatch_queue_global_t dq;

pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);

_dispatch_introspection_thread_add();
_dispatch_trace_runtime_event(worker_unpark, dq, 0);

int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
dispatch_assert(pending >= 0);
//开始准备出队
_dispatch_root_queue_drain(dq, dq->dq_priority,
DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
_dispatch_voucher_debug("root queue clear", NULL);
_dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
_dispatch_trace_runtime_event(worker_park, NULL, 0);
}

_dispatch_root_queue_drain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void
_dispatch_root_queue_drain(dispatch_queue_global_t dq,
dispatch_priority_t pri, dispatch_invoke_flags_t flags)
{
#if DISPATCH_DEBUG
dispatch_queue_t cq;
if (unlikely(cq = _dispatch_queue_get_current())) {
DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
}
#endif
_dispatch_queue_set_current(dq);
_dispatch_init_basepri(pri);
_dispatch_adopt_wlh_anon();

struct dispatch_object_s *item;
bool reset = false;
dispatch_invoke_context_s dic = { };
#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
_dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
_dispatch_perfmon_start();
//依次出队
while (likely(item = _dispatch_root_queue_drain_one(dq))) {
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq);
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}

// overcommit or not. worker thread
if (pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
_dispatch_perfmon_end(perfmon_thread_worker_oc);
} else {
_dispatch_perfmon_end(perfmon_thread_worker_non_oc);
}

#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
_dispatch_reset_wlh();
_dispatch_clear_basepri();
_dispatch_queue_set_current(NULL);
}

这里可以看到,当任务开始出队列的时候,依次获取一个item,并调用_dispatch_continuation_pop_inline,这个函数比较重要,内部会区分dispatch_object_t的类型。

_dispatch_continuation_pop_inline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_class_t dqu)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
//到这里区分dispatch_object_t 的类型,如果是continuation 类型就直接调动,如果仍然有操作表,则先进行本来的dx_invoke
if (_dispatch_object_has_vtable(dou)) {
dx_invoke(dou._dq, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, flags, dqu);
}
if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

这里重要的就是其中的判断条件:

  1. 如果dispatch_object_t 是有vtable,如果有,优先执行操作表中的 invoke
  2. 如果没有,就直接调用_dispatch_continuation_invoke_inline,内部会调用_dispatch_client_callout

值得注意的是_dispatch_continuation_invoke_inline内部会区分dispatch_group和普通的callout。关于group我们将在下一章节说明。

回到_dispatch_client_callout,就比较熟悉了。在之前的同步中已经说明过了。

libpthread

当一个线程开始工作,会有内核通过汇编调用_pthread_wqthread函数。然后通过 flags来确定执行哪一个dispatch的工作函数。

_pthread_wqthread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// workqueue entry point from kernel
void
_pthread_wqthread(pthread_t self, mach_port_t kport, void *stacklowaddr,
void *keventlist, int flags, int nkevents)
{
if ((flags & WQ_FLAG_THREAD_REUSE) == 0) {
_pthread_wqthread_setup(self, kport, stacklowaddr, flags);
}

pthread_priority_t pp;
if (flags & WQ_FLAG_THREAD_OUTSIDEQOS) {
self->wqoutsideqos = 1;
pp = _pthread_priority_make_from_thread_qos(THREAD_QOS_LEGACY, 0,
_PTHREAD_PRIORITY_FALLBACK_FLAG);
} else {
self->wqoutsideqos = 0;
pp = _pthread_wqthread_priority(flags);
}

self->tsd[_PTHREAD_TSD_SLOT_PTHREAD_QOS_CLASS] = (void *)pp;

// avoid spills on the stack hard to keep used stack space minimal
if (nkevents == WORKQ_EXIT_THREAD_NKEVENT) {
goto exit;
} else if (flags & WQ_FLAG_THREAD_WORKLOOP) {
self->fun = (void *(*)(void*))__libdispatch_workloopfunction;
self->wq_retop = WQOPS_THREAD_WORKLOOP_RETURN;
self->wq_kqid_ptr = ((kqueue_id_t *)keventlist - 1);
self->arg = keventlist;
self->wq_nevents = nkevents;
} else if (flags & WQ_FLAG_THREAD_KEVENT) {
self->fun = (void *(*)(void*))__libdispatch_keventfunction;
self->wq_retop = WQOPS_THREAD_KEVENT_RETURN;
self->wq_kqid_ptr = NULL;
self->arg = keventlist;
self->wq_nevents = nkevents;
} else {
self->fun = (void *(*)(void*))__libdispatch_workerfunction;
self->wq_retop = WQOPS_THREAD_RETURN;
self->wq_kqid_ptr = NULL;
self->arg = (void *)(uintptr_t)pp;
self->wq_nevents = 0;
if (os_likely(__workq_newapi)) {
(*__libdispatch_workerfunction)(pp);
} else {
_pthread_wqthread_legacy_worker_wrap(pp);
}
goto just_return;
}

if (nkevents > 0) {
kevent_errors_retry:
if (self->wq_retop == WQOPS_THREAD_WORKLOOP_RETURN) {
((pthread_workqueue_function_workloop_t)self->fun)
(self->wq_kqid_ptr, &self->arg, &self->wq_nevents);
} else {
((pthread_workqueue_function_kevent_t)self->fun)
(&self->arg, &self->wq_nevents);
}
int rc = __workq_kernreturn(self->wq_retop, self->arg, self->wq_nevents, 0);
if (os_unlikely(rc > 0)) {
self->wq_nevents = rc;
goto kevent_errors_retry;
}
if (os_unlikely(rc < 0)) {
PTHREAD_INTERNAL_CRASH(self->err_no, "kevent (workloop) failed");
}
} else {
just_return:
__workq_kernreturn(self->wq_retop, NULL, 0, 0);
}

exit:
_pthread_wqthread_exit(self);
}

这里会调用__libdispatch_workerfunction方法。而该函数的赋值如下:

_pthread_workqueue_init_with_kevent
_pthread_workqueue_init_with_workloop
pthread_workqueue_setdispatch_with_workloop_np

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int
pthread_workqueue_setdispatch_with_workloop_np(pthread_workqueue_function2_t queue_func,
pthread_workqueue_function_kevent_t kevent_func,
pthread_workqueue_function_workloop_t workloop_func)
{
int res = EBUSY;
if (__libdispatch_workerfunction == NULL) {
// Check whether the kernel supports new SPIs
res = __workq_kernreturn(WQOPS_QUEUE_NEWSPISUPP, NULL, __libdispatch_offset, kevent_func != NULL ? 0x01 : 0x00);
if (res == -1){
res = ENOTSUP;
} else {
//gcd 回调函数赋值
__libdispatch_workerfunction = queue_func;
__libdispatch_keventfunction = kevent_func;
__libdispatch_workloopfunction = workloop_func;

// Prepare the kernel for workq action
(void)__workq_open();
if (__is_threaded == 0) {
__is_threaded = 1;
}
}
}
return res;
}

libdispatch的根队列初始化函数中,对把GCD的回调行数传递给下次的libpthread</code。
下面是libdispatch的申请线程流程。

_pthread_workqueue_addthreads

_pthread_workqueue_addthreads被定义在libpthread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int
_pthread_workqueue_addthreads(int numthreads, pthread_priority_t priority)
{
int res = 0;

if (__libdispatch_workerfunction == NULL) {
return EPERM;
}

#if TARGET_OS_OSX
// <rdar://problem/37687655> Legacy simulators fail to boot
//
// Older sims set the deprecated _PTHREAD_PRIORITY_ROOTQUEUE_FLAG wrongly,
// which is aliased to _PTHREAD_PRIORITY_SCHED_PRI_FLAG and that XNU
// validates and rejects.
//
// As a workaround, forcefully unset this bit that cannot be set here
// anyway.
priority &= ~_PTHREAD_PRIORITY_SCHED_PRI_FLAG;
#endif

res = __workq_kernreturn(WQOPS_QUEUE_REQTHREADS, NULL, numthreads, (int)priority);
if (res == -1) {
res = errno;
}
return res;
}

__workq_kernreturn被定义在xnupthread_workqueue.c
到这里断点就跟不下去了。因此我们来反汇编这个动态库,通过image list获取所以的库目录。
lldb
反汇编如图
lldb

XNU

实际上这里要使用syscall_codeSYS_workq_kernreturn 的系统调用函数来调用内核态的代码也就是workq_kernreturn

workq_kernreturn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
int
workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
{
// res = __workq_kernreturn(WQOPS_QUEUE_REQTHREADS, NULL, numthreads, (int)priority);
int options = uap->options;
int arg2 = uap->affinity;
int arg3 = uap->prio;
struct workqueue *wq = proc_get_wqptr(p);
int error = 0;

if ((p->p_lflag & P_LREGISTER) == 0) {
return EINVAL;
}

switch (options) {
case WQOPS_QUEUE_NEWSPISUPP: {
/*
* arg2 = offset of serialno into dispatch queue
* arg3 = kevent support
*/
int offset = arg2;
if (arg3 & 0x01) {
// If we get here, then userspace has indicated support for kevent delivery.
}

p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
break;
}
case WQOPS_QUEUE_REQTHREADS: {
/*
* arg2 = number of threads to start
* arg3 = priority
*/
error = workq_reqthreads(p, arg2, arg3);
break;
}
case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
/*
* arg2 = priority for the manager thread
*
* if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
* the low bits of the value contains a scheduling priority
* instead of a QOS value
*/
pthread_priority_t pri = arg2;

if (wq == NULL) {
error = EINVAL;
break;
}

/*
* Normalize the incoming priority so that it is ordered numerically.
*/
if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
_PTHREAD_PRIORITY_SCHED_PRI_FLAG);
} else {
thread_qos_t qos = _pthread_priority_thread_qos(pri);
int relpri = _pthread_priority_relpri(pri);
if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
qos == THREAD_QOS_UNSPECIFIED) {
error = EINVAL;
break;
}
pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
}

/*
* If userspace passes a scheduling priority, that wins over any QoS.
* Userspace should takes care not to lower the priority this way.
*/
workq_lock_spin(wq);
if (wq->wq_event_manager_priority < (uint32_t)pri) {
wq->wq_event_manager_priority = (uint32_t)pri;
}
workq_unlock(wq);
break;
}
case WQOPS_THREAD_KEVENT_RETURN:
case WQOPS_THREAD_WORKLOOP_RETURN:
case WQOPS_THREAD_RETURN: {
error = workq_thread_return(p, uap, wq);
break;
}

case WQOPS_SHOULD_NARROW: {
/*
* arg2 = priority to test
* arg3 = unused
*/
thread_t th = current_thread();
struct uthread *uth = get_bsdthread_info(th);
if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
(uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
error = EINVAL;
break;
}

thread_qos_t qos = _pthread_priority_thread_qos(arg2);
if (qos == THREAD_QOS_UNSPECIFIED) {
error = EINVAL;
break;
}
workq_lock_spin(wq);
bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
workq_unlock(wq);

*retval = should_narrow;
break;
}
case WQOPS_SETUP_DISPATCH: {
/*
* item = pointer to workq_dispatch_config structure
* arg2 = sizeof(item)
*/
struct workq_dispatch_config cfg;
bzero(&cfg, sizeof(cfg));

error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
if (error) {
break;
}

if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
error = ENOTSUP;
break;
}

/* Load fields from version 1 */
p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;

/* Load fields from version 2 */
if (cfg.wdc_version >= 2) {
p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
}

break;
}
default:
error = EINVAL;
break;
}

return error;
}

因为libpthread传入的参数为WQOPS_QUEUE_REQTHREADS因此会调用到WQOPS_QUEUE_REQTHREADS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* Entry point for libdispatch to ask for threads
*/

/**
typedef struct workq_threadreq_s {
union {
struct priority_queue_entry tr_entry;
thread_t tr_thread;
};
uint16_t tr_count;
workq_tr_flags_t tr_flags;
workq_tr_state_t tr_state;
thread_qos_t tr_qos; qos for the thread request

kqueue states, modified under the kqlock
kq_index_t tr_kq_override_index; highest wakeup override index
kq_index_t tr_kq_qos_index; QoS for the servicer
bool tr_kq_wakeup; an event has fired
} workq_threadreq_s, *workq_threadreq_t;
*/
/*
proc: proc_t process
*/
static int
workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp)
{
thread_qos_t qos = _pthread_priority_thread_qos(pp);
//根据进程获取 对应的 work queue
struct workqueue *wq = proc_get_wqptr(p);
uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;

if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
qos == THREAD_QOS_UNSPECIFIED) {
return EINVAL;
}

WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
wq, reqcount, pp, 0, 0);

workq_threadreq_t req = zalloc(workq_zone_threadreq);
priority_queue_entry_init(&req->tr_entry);
req->tr_state = WORKQ_TR_STATE_NEW;
req->tr_flags = 0;
req->tr_qos = qos;

if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
req->tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
}

WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
wq, workq_trace_req_id(req), req->tr_qos, reqcount, 0);

workq_lock_spin(wq);
do {
if (_wq_exiting(wq)) {
goto exiting;
}

/*
* When userspace is asking for parallelism, wakeup up to (reqcount - 1)
* threads without pacing, to inform the scheduler of that workload.
*
* The last requests, or the ones that failed the admission checks are
* enqueued and go through the regular creator codepath.
*
* If there aren't enough threads, add one, but re-evaluate everything
* as conditions may now have changed.
*/
if (reqcount > 1 && (req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0) {
unpaced = workq_constrained_allowance(wq, qos, NULL, false);
if (unpaced >= reqcount - 1) {
unpaced = reqcount - 1;
}
} else {
unpaced = reqcount - 1;
}

/*
* This path does not currently handle custom workloop parameters
* when creating threads for parallelism.
*/
assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));

/*
* This is a trimmed down version of workq_threadreq_bind_and_unlock()
*/
while (unpaced > 0 && wq->wq_thidlecount) {
struct uthread *uth;
bool needs_wakeup;
uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;

if (req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) {
uu_flags |= UT_WORKQ_OVERCOMMIT;
}

uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);

_wq_thactive_inc(wq, qos);
wq->wq_thscheduled_count[_wq_bucket(qos)]++;
workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
wq->wq_fulfilled++;

uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
uth->uu_save.uus_workq_park_data.thread_request = req;
if (needs_wakeup) {
workq_thread_wakeup(uth);
}
unpaced--;
reqcount--;
}
} while (unpaced && wq->wq_nthreads < wq_max_threads &&
workq_add_new_idle_thread(p, wq));

if (_wq_exiting(wq)) {
goto exiting;
}

req->tr_count = reqcount;
if (workq_threadreq_enqueue(wq, req)) {
/* This can drop the workqueue lock, and take it again */
workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
}
workq_unlock(wq);
return 0;

exiting:
workq_unlock(wq);
zfree(workq_zone_threadreq, req);
return ECANCELED;
}

通过注释我们可以知道,该方法会基于内核状态判断当前是否需要生成新的线程。

async+串行队列

async+串行队列 的场景,其实在讲sync的时候,我们已经操作过了,在该场景下,会调用_dispatch_lane_push。之后的流程也分析过了,这里不在赘述。

异步函数总结

async

主队列与runloop

主队列只能在主线程下执行,但是主线程不一定只运行主队列。
主队列的任务执行比较特殊,要依赖与runloop的运行。

参考文献

Concurrency Programming Guide
Threading Programming Guide
GCD Internals