GCD系列 文章之原理向,本文将通过libdispatch
libpthread
xnu
详细的剖析GCD实现原理。
背景
本文将直接从源码讲起,读者可以对照前一篇概念向的文章来阅读本文,所有源码均在苹果开源官网下可下载
源码 | 版本
-|-
libdispatch | 1008.250.7
libpthread | 330.250.2
xnu | 6153.11.26
队列的创建
通过dispatch_queue_create
或者通过dispatch_queue_create_with_target
来创建一个队列,二者的实现都一致,只不过第一个函数的target
为root queue
。下面直接看实现
dispatch_queue_create
1 | dispatch_queue_t |
直接调用_dispatch_lane_create_with_target
,该方法流程较长(其实源码的注释已经很清晰了),这里缩减如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
//
// Step 1: Normalize arguments (qos, overcommit, tq)
//
// tq 默认值 = _dispatch_get_root_queue
//
// Step 2: Initialize the queue
//
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
legacy = false;
}
}
const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqai.dqai_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
dqai.dqai_relpri);
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
if (!dqai.dqai_inactive) {
_dispatch_queue_priority_inherit_from_target(dq, tq);
_dispatch_lane_inherit_wlh_from_target(dq, tq);
}
_dispatch_retain(tq);
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq).rdxz_dq;
}
这个方法冲注释来看来还是比较直观:
- 初始化队列的参数:qos, overcommit, tq。
- 初始化队列。把1中的参数赋值给队列。返回。
While custom queues are a powerful abstraction, all blocks you schedule on them will ultimately trickle down to one of the system’s global queues and its thread pool(s).
有点需要注意的就是,自定义的队列block
,都将会在全局队列中被执行。
sync
先从dispatch_sync()
的方法定义,在dispatch/queue.h
中,我们可以得到如下信息:
- 提交一个工作项(workitem),但是直到这个
workitem
执行完成前,dispatch_sync()
都不会返回 - 在当前线程中调用
dispatch_sync()
,会造成死锁 dispatch_sync()
对于主队列和target为主队列的workitem
做了一些优化。
接下来我们查看下具体dispatch_sync()
实现,来到libdispatch
的源码,dispatch_sync()
会经过下面两个函数的包装之后,
dispatch_sync()
_dispatch_sync_f()
_dispatch_sync_f_inline()
_dispatch_sync_f_inline
这里放出伪代码11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 dispatch_queue_t queue = dispatch_queue_create("deadLockTest", DISPATCH_QUEUE_SERIAL);
// dispatch_async(dispatch_queue_create("test2", DISPATCH_QUEUE_CONCURRENT), ^{
// sleep(5);
// dispatch_sync(queue, ^{
//
// NSLog(@"\n 2222");
// });
// });
for (int i = 0; i<10; i++) {
NSLog(@"\n before");
dispatch_sync(queue, ^{
// sleep(5);
NSLog(@"\n i = %d",i);
});
dispatch_sync(queue, ^{
// sleep(5);
NSLog(@"\n i = %d",i);
});
NSLog(@"\n end");
}
进入主要的流程: _dispatch_sync_f_inline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// dq:queue, ctxt:gcd block,_dispatch_Block_invoke: block的实现
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
//如果线程是穿行队列:并发数是1
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
//DISPATCH_LANE_TYPE 为并行队列类型。
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
// dispatch_object_t obj = _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags);
// _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(obj));
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
先来看下对于穿行队列的实现,_dispatch_barrier_sync_f
的真正实现为_dispatch_barrier_sync_f_inline
_dispatch_barrier_sync_f_inline
1 | static inline void |
此函数的逻辑如下:
对串行队列进行尝试加锁处理。通过对dq_state
的维护来判断调度状态,如果加锁失败则:
说明当前串行队里有item
正在被执行,需要等待前一个item
执行,因此进入slow环节。
反之如果加锁成功:
则说明当前item
可以直接被执行,则直接执行,并在完成后重置dq_state
的状态。
接下来看一下不需要等待的流程
_dispatch_lane_barrier_sync_invoke_and_complete
1 | /* |
此函数的逻辑如下:
- 首先进行当前队列中的
item
回调,也就是执行我们写的GCD block
中的代码。 - 如果队列中还有其他任务的调用
_dispatch_lane_barrier_complete
- 尝试将队列的状态:
dq_state
恢复到初始状态。- 恢复成功则执行结束。
- 恢复失败(其他线程修改
dq_state
,调度了该队列),则调用_dispatch_lane_barrier_complete
。
下面是单纯的sync+串行队列的state 变化过程
下面我们把伪代码中的test2
队列的注释打开,来模拟调用_dispatch_lane_barrier_complete
和_dispatch_sync_f_slow
的场景。
首先打开注释的执行结果应该是:
async
创建一个线程来执行并sleep(5)。- 原来的
deadLockTest
会在主线程依次被执行。 - 在
deadLockTest
执行的某一个时刻,deadLockTest
又通过async
之前开辟的线程同时被调度。
加下来我们结合之前的结论来观察调用栈:
通过上图我们来梳理下线程和队列当前的状态:
- 主线程中在
deadLockTest
的一个任务执行完毕后,尝试恢复队列的初始状态,由于在test2
中被 - 调度test2的子线程在尝试同步执行deadlock队列。调用
_dispatch_queue_try_acquire_barrier_sync
时,触发了slow,内部
我们先来看串行队列判断下一个任务需要等待执行的
_dispatch_queue_try_acquire_barrier_sync
_dispatch_queue_try_acquire_barrier_sync
内部直接调用了_dispatch_queue_try_acquire_barrier_sync_and_suspend
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width); //QUEUE_FULL and IN_BARRIER
//获得加锁(下一个item的执行需要被等待)的的状态,队列并发数上限,barrier,已经被加锁过的tid,和暂停状态。
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
// bool _result = false;
// typeof(&(dq)->f) _p = (&(dq)->f);
// ov = os_atomic_load(_p, relaxed)//atomic_load_explicit(_os_atomic_c11_atomic(p), memory_order_relaxed)
// do {
// uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
// if (old_state != (init | role)) {
// os_atomic_rmw_loop_give_up(break);
// }
// new_state = value | role;
// _result = os_atomic_cmpxchgvw(_p, old_state, new_state, &old_state, acquire);
// }while(os_unlikely(!_result))
// return _result;
//这个函数的作用就是尝试更新dq_state:
//如果old_state != init(说明当前队列的dq_state已经被修改过),那么放弃修改直接返回false
//否则更新dq_state 返回true
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
这个函数的作用就是对一个队列尝试加锁的操作,如果加锁失败则直接放弃,否则修改dq_state
。流程如下:
- 获取队列执行需要等待的条件:
value
- 获取队列的当前状态:old_state
- 如果不是初始状态则放弃并返回false:加锁失败,说明当前队列已经处在加锁状态。
- 加锁成功,把
value
的值赋值给dq_state
。其中包含被加锁的线程id:tid。
等待的条件value
如下:
- 队列并发数上限。
- 处在barrier中。
- 已经被加锁过的tid。
- 暂停状态。
接下来是等待函数的处理
_dispatch_sync_f_slow
1 | DISPATCH_NOINLINE |
这个方法是慢速执行的流程,可以分为3个步骤:
- 构造队列执行上下文,并保存。
- 进入等待。
- 等待结束,被信号量或者kevent唤醒,开始继续执行。
当进入到 __DISPATCH_WAIT_FOR_QUEUE__
就会阻塞下面代码的执行,直到等待结束。下面我们来看一下等待的实现
__DISPATCH_WAIT_FOR_QUEUE__
1 | static void |
先来看一下死锁检测,在上面分析过的_dispatch_queue_try_acquire_barrier_sync_and_suspend
,也就是修改队列的dq_state
时,会记录下正在调度的线程的tid
,而在构建上下文的过程中,同样会对dsc->dsc_waiter
写入线程的 tid,所以_dq_state_drain_locked_by
这个其实就是:
即将进入等待的线程的tid,是否已经持有了锁。(线程已经处在持有锁的的状态,如果在进入等待,就是死锁)
接下来的就是等待的策略了。如下:
- 根据
dq_state
计算等待类型并更新给dsc->dc_data
。- 根据暂停,wlh的状态分别给
dsc->dc_data
。 - 如果都不满足就根据上下文和dq重新计算。
- 根据暂停,wlh的状态分别给
- 将本次任务加入对应的队列。
- 调用对应队列的
dq_push
,这里是_dispatch_lane_push
。
- 调用对应队列的
- 执行等待策略。
- 基于信号量的等待。
- 基于kevent的等待。
关于调用队列的_dispatch_lane_push
方法,由于此时当前任务在等待执行,因此会进一步调用_dispatch_lane_push_waiter
,该方法的主要相关流程如下:
- 更新
dq_state
的状态:包括设置为DIRTY
,qos的合并等。 根据新的
dq_state
,执行对应方法:override,complete等。lldb调试如下:
基于kevent的等待事件(关于kevent,kqueue请查看GCD概念篇),最终会调用_dispatch_kq_poll
来轮询等待内核事件,那么等待结束的时机是什么呢?同样通过一组调用栈来查看:
而另外正在执行的线程的调用栈如下:
其实就是前一个任务执行完毕时。回到我们上面说的item执行完毕的函数_dispatch_lane_barrier_sync_invoke_and_complete
。在该方法内,会有两个判断分支调用_dispatch_lane_barrier_complete
。分别为:队列的尾部不为空或者dq_state被其他线程修改了。也就是我们分析过的 wait内部的流程。
_dispatch_lane_barrier_complete
1 | static void |
这里有个小技巧,当分析 barrier相关方法时,可以这样想:
当栅栏中的一个任务执行完毕后(barrier_complete),应该通知下一个任务开始执行。同步函数也有这样的特性。
这里的三个分支,大体上逻辑相同:通过对dq_state
的和队列的执行状态判断直接唤起还是redirect
到targetQueue
。这里我们直接看_dispatch_lane_drain_barrier_waiter
_dispatch_lane_drain_barrier_waiter
1 | static void |
同步函数总结
下面通过一张图来总结同步函数的流程。
async
dispatch_async
1 | void |
这个函数主要就是对work的包装。
_dispatch_continuation_async
1 | static inline void |
dx_push
的定义如下1
2#define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
会调用dq
的
do_push
方法,可以在
init.c
中查看到
do_push
的定义,这里直接给出结论
_dispatch_lane_concurrent_push
_dispatch_lane_concurrent_push
1 | void |
这里第一次看也想了很久,通过lldb调试,发现每次都命中了if中的条件,也就是dq->dq_items_tail == NULL
。
第一感觉是这里不应该为
NULL
的啊?因为在调用dispatch_async
时,传入了block,那就应该会被追加到队里。
这句话只对了一半(后半句),确实block (continuation)会被追加到队列中,但并不是我们创建的队列,而是root queue
。因此当前code>dq->dq_items_tail == NULL也就解释的通了。接下来就是追加root queue
的过程。
_dispatch_continuation_redirect_push
1 | static void |
再次感叹GCD命名的艺术,该方法的最终会把任务提交到targetq
,一般为root queue。在创建队列的章节里已经说明过。因此会调用到_dispatch_root_queue_push
_dispatch_root_queue_push
1 | static void |
这个方法比较简单,就是看该任务是直接入队到根队列。还是先执行override
。通常通过async
获得的队列,都满足条件。
_dispatch_root_queue_push_needs_override
被标记为需要override
的队列会重新在根队列池中,重新获取一个对应QOS 的队列。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29static void
_dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
dispatch_object_t dou, dispatch_qos_t qos)
{
bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
//从GCD 的根队列池中重新获取一个
dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
dispatch_continuation_t dc = dou._dc;
// 这个_dispatch_object_is_redirection函数其实就是return _dispatch_object_has_type(dou,DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));
// 所以自定义队列会走这个if语句,如果是dispatch_get_global_queue不会走if语句
if (_dispatch_object_is_redirection(dc)) {
dc->dc_func = (void *)orig_rq;
} else {
// dispatch_get_global_queue来到这里
dc = _dispatch_continuation_alloc();
// 相当于是下面的,也就是指定了执行函数为_dispatch_queue_override_invoke,所以有别于自定义队列的invoke函数。
// DC_VTABLE_ENTRY(OVERRIDE_OWNING,
// .do_kind = "dc-override-owning",
// .do_invoke = _dispatch_queue_override_invoke),
dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
_dispatch_trace_continuation_push(orig_rq, dou);
dc->dc_ctxt = dc;
dc->dc_other = orig_rq;
dc->dc_data = dou._do;
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
}
_dispatch_root_queue_push_inline(rq, dc, dc, 1);
}
该函数的主要作用就是GCD root queues 中重新回去一个队列,并且把对应的continuation
和新申请的队列,执行
_dispatch_root_queue_push_inline
。1
2
3
4
5
6
7
8
9
10
11
12
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
//当队列为的头为空时
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
// 保存队列头
return _dispatch_root_queue_poke(dq, n, 0);
}
}
os_mpsc_push_list
被定义在inline_internal.h
中,含义为:如果队列的head 为空,则返回true。
该函数为真正的操作任务入队的函数,以为被调用的路径不同,需要先判断入队之前是否队列的头部为空(override的路径到这里为空),如果为空需要进行_dispatch_global_queue_poke
。
_dispatch_global_queue_poke
还函数会进一步调用_dispatch_root_queue_poke_slow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
if (likely(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if (!--remaining) {
return;
}
}
}
bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (overcommit) {
os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
} else {
if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return;
}
}
int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
pthr = _dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#else
(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
首先对root queues进行了初始化,_dispatch_root_queues_init
函数会在很多地方被调用,目的是要保证在使用GCD之前,先初始化好根队列。其实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
_dispatch_fork_becomes_unsafe();
#if DISPATCH_USE_INTERNAL_WORKQUEUE
//使用pthread 线程池
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
_dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
_dispatch_root_queues[i].dq_priority);
}
#else
//使用_pthread_workqueue(XNU workq)
int wq_supported = _pthread_workqueue_supported();
int r = ENOTSUP;
if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
DISPATCH_INTERNAL_CRASH(wq_supported,
"QoS Maintenance support required");
}
//下面这两个init 函数非常重要。下层会将_dispatch_worker_thread2 函数绑定到线程的回调函数上。
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#if DISPATCH_USE_KEVENT_WORKQUEUE
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif
} else {
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}
if (r != 0) {
DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
"Root queue initialization failed");
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}
在初始化过程中,会判断使用哪种线程。分为pthread pool
或者xnu workqueue
。
同时下面的两个_pthread_workqueue_init
函数也十分重要。将_dispatch_worker_thread2 函数绑定到线程的回调函数上。
继续回到_dispatch_root_queue_poke_slow
函数中。
这里有我们看到了有两种创建线程的方式:
- _pthread_workqueue_addthreads
- pthread_create。
下面通过lldb来确定GCD使用哪种方式
因此可以确定GCD使用_pthread_workqueue_addthreads
来申请线程。我们先来看任务的执行,然后在回到申请线程的下层。
继续通过lldb获得GCD block 的调用堆栈。如图:_pthread_wqthread
会调用到_dispatch_worker_thread2
,_pthread_workqueue_addthreads
和_pthread_wqthread
我们会在libpthread
章节来说明。
_dispatch_worker_thread2
1 | static void |
_dispatch_root_queue_drain
1 | static void |
这里可以看到,当任务开始出队列的时候,依次获取一个item
,并调用_dispatch_continuation_pop_inline
,这个函数比较重要,内部会区分dispatch_object_t
的类型。
_dispatch_continuation_pop_inline
1 | static inline void |
这里重要的就是其中的判断条件:
- 如果dispatch_object_t 是有
vtable
,如果有,优先执行操作表中的invoke
。 - 如果没有,就直接调用
_dispatch_continuation_invoke_inline
,内部会调用_dispatch_client_callout
。
值得注意的是_dispatch_continuation_invoke_inline
内部会区分dispatch_group
和普通的callout。关于group我们将在下一章节说明。
回到_dispatch_client_callout
,就比较熟悉了。在之前的同步中已经说明过了。
libpthread
当一个线程开始工作,会有内核通过汇编调用_pthread_wqthread
函数。然后通过 flags
来确定执行哪一个dispatch的工作函数。
_pthread_wqthread
1 | // workqueue entry point from kernel |
这里会调用__libdispatch_workerfunction方法。而该函数的赋值如下:
_pthread_workqueue_init_with_kevent
_pthread_workqueue_init_with_workloop
pthread_workqueue_setdispatch_with_workloop_np
1 | static int |
在libdispatch
的根队列初始化函数中,对把GCD的回调行数传递给下次的libpthread</code。
下面是libdispatch
的申请线程流程。
_pthread_workqueue_addthreads
_pthread_workqueue_addthreads
被定义在libpthread
中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27int
_pthread_workqueue_addthreads(int numthreads, pthread_priority_t priority)
{
int res = 0;
if (__libdispatch_workerfunction == NULL) {
return EPERM;
}
#if TARGET_OS_OSX
// <rdar://problem/37687655> Legacy simulators fail to boot
//
// Older sims set the deprecated _PTHREAD_PRIORITY_ROOTQUEUE_FLAG wrongly,
// which is aliased to _PTHREAD_PRIORITY_SCHED_PRI_FLAG and that XNU
// validates and rejects.
//
// As a workaround, forcefully unset this bit that cannot be set here
// anyway.
priority &= ~_PTHREAD_PRIORITY_SCHED_PRI_FLAG;
#endif
res = __workq_kernreturn(WQOPS_QUEUE_REQTHREADS, NULL, numthreads, (int)priority);
if (res == -1) {
res = errno;
}
return res;
}
__workq_kernreturn
被定义在xnu
的pthread_workqueue.c
中
到这里断点就跟不下去了。因此我们来反汇编这个动态库,通过image list
获取所以的库目录。
反汇编如图
XNU
实际上这里要使用syscall_code
为
SYS_workq_kernreturn
的系统调用函数来调用内核态的代码也就是
workq_kernreturn
workq_kernreturn
1 | int |
因为libpthread
传入的参数为WQOPS_QUEUE_REQTHREADS
因此会调用到WQOPS_QUEUE_REQTHREADS
1 | /** |
通过注释我们可以知道,该方法会基于内核状态判断当前是否需要生成新的线程。
async+串行队列
async+串行队列
的场景,其实在讲sync
的时候,我们已经操作过了,在该场景下,会调用_dispatch_lane_push
。之后的流程也分析过了,这里不在赘述。
异步函数总结
主队列与runloop
主队列只能在主线程下执行,但是主线程不一定只运行主队列。
主队列的任务执行比较特殊,要依赖与runloop的运行。
参考文献
Concurrency Programming Guide
Threading Programming Guide
GCD Internals