这篇应当能结,简图以下。

  上一篇讲到了uv__work_submit要领,接着写了。

void uv__work_submit(uv_loop_t*loop,struct uv__work*w,enumuv__work_kind kind,void (*work)(struct uv__work*w),void (*done)(struct uv__work* w, intstatus)) {//上篇主要讲的这里 初始化线程池等
  uv_once(&once, init_once);
w
->loop =loop;
w
->work =work;
w
->done =done;
post(
&w->wq, kind);
}

  从post最先。

static void post(QUEUE* q, enumuv__work_kind kind) {//由于存在行列插进去操纵 须要加锁
  uv_mutex_lock(&mutex);if (kind ==UV__WORK_SLOW_IO) {//跳...
}

QUEUE_INSERT_TAIL(
&wq, q);//如果有余暇线程 叫醒 if (idle_threads > 0)
uv_cond_signal(
&cond);
uv_mutex_unlock(
&mutex);
}

  wq就是上一篇讲的线程都邑用到的谁人行列,这里卖力插进去义务,worker中掏出义务。

  没想到post到这里没了,这点器械并到上一篇就好了。今后写这类系列博客照样先计划一下,不克不及边看源码边写……

  函数到这里就断了,看似没有线索,现实上在上一节的worker要领中,还漏了一个处所。

static void worker(void*arg) {//...
  for(;;) {//这里挪用内部fs要领处置惩罚义务
    w = QUEUE_DATA(q, structuv__work, wq);
w
->work(w);

uv_mutex_lock(
&w->loop->wq_mutex);
w
->work =NULL;
QUEUE_INSERT_TAIL(
&w->loop->wq, &w->wq);//这个是漏了的症结 uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(
&w->loop->wq_mutex);//... }
}

  每一条线程在每次处置惩罚完一条事宜并将其插进去事情行列wq后,都邑挪用一下这个uv_async_send要领,上一篇没讲这个。

  这里的wq_async是一个在loop上面的变量,在轮询初始化的时刻涌现过,这里先不看。

  uv_async_send这个要领又涉及到别的一个大模块,以下。

int uv_async_send(uv_async_t*handle) {//错误处置惩罚...
  if (!uv__atomic_exchange_set(&handle->async_sent)) {
POST_COMPLETION_FOR_REQ(loop,
&handle->async_req);
}
return 0;
}
//将操纵效果推到iocp上面 #define POST_COMPLETION_FOR_REQ(loop, req) \ if (!PostQueuedCompletionStatus((loop)->iocp, \0, \0, \&((req)->u.io.overlapped))) { \
uv_fatal_error(GetLastError(),
"PostQueuedCompletionStatus"); \
}

  这个处所说实话我并非邃晓windows底层API的操纵道理,IOCP这局部我没有去研讨,只能从字面上去明白。

  关于PostXXX要领官网诠释以下:

Posts an I/O completion packet to an I/O completion port.

  将一个I/O完成的数据打包到I/O完成的端口,翻译过去就是如许,小我明白上的话大概是把一个async_req丢到IOCP那边保存起来。

 

  接下来终究能够回到事宜轮询局部,点题了。

int uv_run(uv_loop_t *loop, uv_run_mode mode) {//...

  while (r != 0 && loop->stop_flag == 0) {//...//call pending callbacks
    ran_pending =uv_process_reqs(loop);//...//poll for I/O
    if(pGetQueuedCompletionStatusEx)
uv__poll(loop, timeout);
elseuv__poll_wine(loop, timeout);//... }//... }

  截取了剩下的poll for I/O、call pending callback,也就是剩下的两局部了。if推断不消管,只是一个要领兼容,终究的目标是一样的。

  以是只看uv__poll局部。

static void uv__poll(uv_loop_t*loop, DWORD timeout) {//...//设定壅塞时候
uint64_t timeout_time;
timeout_time
= loop->time +timeout;for (repeat = 0; ; repeat++) {
success
= GetQueuedCompletionStatusEx(loop->iocp,
overlappeds,
ARRAY_SIZE(overlappeds),
&count,
timeout,
FALSE);
if(success) {for (i = 0; i < count; i++) {if(overlappeds[i].lpOverlapped) {
req
=uv_overlapped_to_req(overlappeds[i].lpOverlapped);
uv_insert_pending_req(loop, req);
}
}
uv_update_time(loop);
}
else if (GetLastError() !=WAIT_TIMEOUT) {//... } else if (timeout > 0) {//超时处置惩罚... }break;
}
}

  这里的GetQueueXXX要领与之前的PostQueueXXX正好是一对要领,都是基于IOCP,一个是存储,一个是掏出。

  遍历操纵就很轻易懂了,掏出数据后,一个个的塞到pending callback的行列中。

  把uv_insert_pending_req、uv_process_reqs两个要领结合起来看。

INLINE static void uv_insert_pending_req(uv_loop_t* loop, uv_req_t*req) {
req
->next_req =NULL;//插进去到pending_reqs_tail上 if (loop->pending_reqs_tail) {//DEBUG... req->next_req = loop->pending_reqs_tail->next_req;
loop
->pending_reqs_tail->next_req =req;
loop
->pending_reqs_tail =req;
}
else{
req
->next_req =req;
loop
->pending_reqs_tail =req;
}
}

INLINE
static int uv_process_reqs(uv_loop_t*loop) {//...//处置惩罚pending_reqs_tail first = loop->pending_reqs_tail->next_req;
next
=first;
loop
->pending_reqs_tail =NULL;while (next !=NULL) {
req
=next;
next
= req->next_req != first ? req->next_req : NULL;switch (req->type) {//handle各种req... }
}
return 1;
}

  就如许,圆满的把poll for I/O与call pending callback两块内容衔接到了一同,也同时明白了一个异步I/O操纵是如安在node内部被处置惩罚的。

 

  末了照样剩一个尾巴,就是丢到IOCP的谁人async_req怎么回事?这个变量在轮询的初始化要领中涌现,以下。

typedef structuv_loop_s uv_loop_t;structuv_loop_s {//...
UV_LOOP_PRIVATE_FIELDS
};
#define UV_LOOP_PRIVATE_FIELDS \ //其他变量 uv_async_t wq_async;//uv__word_done是这个handle的回调函数 int uv_loop_init(uv_loop_t*loop) {//... err = uv_async_init(loop, &loop->wq_async, uv__work_done);//... }//第一篇中演示过handle的初始化和运转 很通例的init、start两步 int uv_async_init(uv_loop_t* loop, uv_async_t*handle, uv_async_cb async_cb) {
uv_req_t
*req;

uv__handle_init(loop, (uv_handle_t
*) handle, UV_ASYNC);
handle
->async_sent = 0;
handle
->async_cb =async_cb;

req
= &handle->async_req;
UV_REQ_INIT(req, UV_WAKEUP);
req
->data =handle;

uv__handle_start(handle);
return 0;
}

# define UV_REQ_INIT(req, typ) \
do{ \
(req)
->type =(typ); \
} \
while (0)

  从代码内里能够晓得,loop上自身带有一个uv_async_t的变量wq_async,初始化后有四个属性。个中须要注重,这个范例的type被设置为UV_WAKEUP。

  再回到uv_process_reqs中,处置惩罚从IOCP掏出的req那块。

INLINE static int uv_process_reqs(uv_loop_t*loop) {//...

  while (next !=NULL) {//...
    switch (req->type) {//...
      caseUV_WAKEUP:
uv_process_async_wakeup_req(loop, (uv_async_t
*) req->data, req);break;//... }
}
return 1;
}

  我们找到了处置惩罚UV_WAKEUP的case,参数参考上面谁人初始化的代码也很轻易得知,req->data就是loop初始化的谁人handle,req是谁人async_req。

  要领代码以下。

void uv_process_async_wakeup_req(uv_loop_t* loop, uv_async_t* handle, uv_req_t*req) {//丢进IOCP的时刻被设置为1了 详细在uv_async_send的uv__atomic_exchange_set要领中
  handle->async_sent = 0;if (handle->flags &UV_HANDLE_CLOSING) {
uv_want_endgame(loop, (uv_handle_t
*)handle);
}
else if (handle->async_cb !=NULL) {//进的else分支 handle->async_cb(handle);
}
}

  这里的async_cb也是初始化就界说了,现实函数名是uv__work_done。

void uv__work_done(uv_async_t*handle) {//...
loop=container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(
&loop->wq_mutex);//照样谁人熟习的行列 QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(
&loop->wq_mutex);while (!QUEUE_EMPTY(&wq)) {//... w->done(w, err);
}
}

  这个done,就是用户从JS传过去的callback……

  也就是说call pending callback现实上是挪用用户传过去的callback,第二篇的图现实上是有题目的,系列结束撒花!

Last modification:March 25, 2020
如果觉得我的文章对你有用,请随意赞赏