双向链表的两种实现

双向链表作为常用数据结构之一,已经有多种有趣的实现,本文尝试剖析其中两种:

  1. lua虚拟机的官方实现中,lua函数调用嵌套层次是由一个双向链表实现的(基于Lua5.3.5版本)
  2. libuv异步IO库中,对于异步IO部分采用的是队列+线程池的方式实现的,而其队列的底层实现,就是双向环形链表(Linux下libuv的异步IO是由主线程调用pthread_cond_signal发起,由线程池的线程实际阻塞执行完IO之后写eventfd反馈给主线程的epoll)

Lua函数调用嵌套

对于我们所熟悉的C语言函数嵌套调用过程,有如下特性:

  1. 每一级函数都存在自己的栈帧,用于存放临时变量、函数返回地址等(函数返回地址就是函数返回的时候上一级函数下一条指令的地址)
  2. 栈指针在调用函数的时候是从栈底一直往栈顶发展,当函数返回时,栈指针回滚
  3. main函数调用A函数,A函数调用B函数,B函数调用C函数,当执行到B函数,则栈中存在至少3个栈帧,当执行到C函数时,则至少存在4个栈帧

对于Lua语言,也同样存在类似的结构,区别在于,Lua虚拟机中的栈是用于闭包执行过程的寄存器分配(Lua的VM是基于寄存器的),因此栈上不存在记录函数返回值之类的用于区分栈帧边界的内容。Lua的做法是,将函数调用的层次信息,单独通过一个双向链表来实现,链表的每一个节点代表一个栈帧。函数嵌套调用越深,链表的节点就越多。

lua官方实现中相关结构体成员:(添加了注释的Lua-5.3.5源码

typedef TValue *StkId;
struct lua_State { // 代表一个Lua线程
  ...
  unsigned short nci; // 函数调用链的深度
  CallInfo *ci; // 指向当前执行函数的调用块,也就是指向base_ci为链头的双向链表中的某一个节点
  CallInfo base_ci; // 函数调用双向链表的链头(第一块,表示C调用Lua)
  ...
  int stacksize; // stack数组的大小
  StkId stack; // Lua栈,TValue数组,每一个槽是一个TValue结构体
  StkId stack_last; // 指向Lua栈最后一个可用的槽
  StkId top; // 栈顶(栈中的第一个空闲槽),Lua栈是向上生长的空栈
  ...
};
typedef struct lua_State lua_State;
typedef struct CallInfo {
  ...
  struct CallInfo *previous, *next;  /* dynamic call link */
  ...
} CallInfo;

一个CallInfo结构体为一个链表节点,整个链表由其中的previous和next指针链接起来。扩展链表和缩减链表的操作分别如下:

// Extend CallInfo Linked List
CallInfo *luaE_extendCI (lua_State *L) {
  CallInfo *ci = luaM_new(L, CallInfo);
  lua_assert(L->ci->next == NULL);
  L->ci->next = ci;
  ci->previous = L->ci;
  ci->next = NULL;
  L->nci++;
  return ci;
}
/*
** free half of the CallInfo structures not in use by a thread
*/
void luaE_shrinkCI (lua_State *L) {
  CallInfo *ci = L->ci;
  CallInfo *next2;  /* next's next */
  /* while there are two nexts */
  while (ci->next != NULL && (next2 = ci->next->next) != NULL) {
    luaM_free(L, ci->next);  /* free next */
    L->nci--;
    ci->next = next2;  /* remove 'next' from the list */
    next2->previous = ci;
    ci = next2;  /* keep next's next */
  }
}

当Lua进行函数调用的时候,就需要扩展此链表,相关代码为:

#define next_ci(L) (L->ci = (L->ci->next ? L->ci->next : luaE_extendCI(L)))
int luaD_precall (lua_State *L, StkId func, int nresults) {
  ...
  CallInfo *ci;
  switch (ttype(func)) {
    ...
    case LUA_TLCF:  /* light C function */
      ...
      ci = next_ci(L);  /* now 'enter' new function */
      ...
      return 1;
    }
    case LUA_TLCL: {  /* Lua function: prepare its call */
      ...
      ci = next_ci(L);  /* now 'enter' new function */
      ...
      return 0;
    }
    ...
  }
}

可以看到,在Lua中还区分C函数调用和Lua函数调用,但是不管是哪一种,都会调用next_ci扩展链表并赋值L->ci。Lua为了提高执行效率,每一层函数返回并没有立刻做链表节点的清除工作,而是保留在链表中,供后续使用,最后,在缩减Lua栈的时候,顺带一次性缩减此链表:

void luaD_shrinkstack (lua_State *L) {
  ...
  if (L->stacksize > LUAI_MAXSTACK)  /* had been handling stack overflow? */
    luaE_freeCI(L);  /* free all CIs (list grew because of an error) */
  else
    luaE_shrinkCI(L);  /* shrink list */
  ...
}

Lua函数调用嵌套层次的双向链表实现中规中矩,与我们在数据结构教科书中所学到的方式一模一样。我们所熟悉的debug.traceback()方法获取函数调用堆栈信息的时候,就会遍历此双向链表并访问一部分节点信息,具体内容可查看相应源码,这里不做展开。

底层实现逻辑图:

Lua函数调用嵌套层次.png

libuv中的双向环形链表

libuv的实现源码中,有一个内部头文件”queue.h”,这个头文件用纯C语言宏实现了双向环形链表,关于这个头文件内部源码原理的解析,网上已经有不少博客讲解的很清楚了,我们这里通过一些示例梳理一下相关宏的操作对象。

首先,给出头文件中所有宏的操作原型:

typedef void* QUEUE[2];
/* Private macros. */
QUEUE* QUEUE_NEXT(QUEUE* q);
QUEUE* QUEUE_PREV(QUEUE* q);
QUEUE* QUEUE_PREV_NEXT(QUEUE* q);
QUEUE* QUEUE_NEXT_PREV(QUEUE* q);
/* Public macros. */
type* QUEUE_DATA(ptr, type, field);
QUEUE_FOREACH(QUEUE* q, QUEUE* h);
bool QUEUE_EMPTY(QUEUE* q);
QUEUE* QUEUE_HEAD(QUEUE* q);
void QUEUE_INIT(QUEUE* q);
void QUEUE_ADD(QUEUE* h, QUEUE* n);
void QUEUE_SPLIT(QUEUE* h, QUEUE* q, QUEUE* n);
void QUEUE_MOVE(QUEUE* h, QUEUE* n);
void QUEUE_INSERT_HEAD(QUEUE* h, QUEUE* q);
void QUEUE_INSERT_TAIL(QUEUE* h, QUEUE* q);
void QUEUE_REMOVE(QUEUE* q);

可以看出,几乎所有宏都以QUEUE*为参数或者返回值。通过QUEUE_DATA宏我们知道,QUEUE这个数组可以存在于目标结构体的任何位置。

考虑如下队列使用示例:(源码可从Github获取)

#include "queue.h"
#include <stdio.h>

typedef struct User {
  int age;
  char* name;
  QUEUE node;
} User;

int main() {
  QUEUE queue;

  User john;
  john.name = "john";
  john.age = 44;

  User henry;
  henry.name = "henry";
  henry.age = 32;

  QUEUE_INIT(&queue);
  QUEUE_INIT(&john.node);
  QUEUE_INIT(&henry.node);

  ((*(&queue))[0]) = john.node;
  (*(QUEUE**)&((*(&queue))[0])) = &john.node;

  QUEUE_INIT(&queue);
  printf("Is queue empty: %d\n", QUEUE_EMPTY(&queue));
  QUEUE_INSERT_TAIL(&queue, &john.node);
  QUEUE_INSERT_TAIL(&queue, &henry.node);
  printf("Is queue empty: %d\n", QUEUE_EMPTY(&queue));

  QUEUE* q = QUEUE_HEAD(&queue);
  User* user = QUEUE_DATA(q, User, node);
  printf("Received first inserted user: %s who is %d.\n", user->name, user->age);
  QUEUE_REMOVE(q);
  QUEUE_FOREACH(q, &queue) {
    user = QUEUE_DATA(q, User, node);
    printf("Received rest inserted users: %s who is %d.\n", user->name, user->age);
  }

  QUEUE queue2;
  User mary;
  mary.name = "mary";
  mary.age = 24;
  User sana;
  sana.name = "sana";
  sana.age = 22;
  QUEUE_INIT(&queue2);
  QUEUE_INIT(&mary.node);
  QUEUE_INIT(&sana.node);
  QUEUE_INSERT_HEAD(&queue2, &mary.node);
  QUEUE_INSERT_HEAD(&queue2, &sana.node);

  QUEUE_ADD(&queue2, &queue);
  printf("Is queue empty: %d\n", QUEUE_EMPTY(&queue));

  QUEUE_FOREACH(q, &queue2) {
    user = QUEUE_DATA(q, User, node);
    printf("User: %s who is %d.\n", user->name, user->age);
  }

  QUEUE_SPLIT(&queue2, &mary.node, &queue);
  QUEUE qu;
  QUEUE_MOVE(&queue, &qu);
  QUEUE_FOREACH(q, &qu) {
    user = QUEUE_DATA(q, User, node);
    printf("Rest User: %s who is %d.\n", user->name, user->age);
  }

  return 0;
}

上述示例构成的基本逻辑结构图如下:

libuv双向环形链表.png

基于图中的双向循环链表:

  1. 一个QUEUE是一个具有两个元素的数组,每个元素都是一个指针,将QUEUE作为节点可构成一个双向循环链表
  2. 宏用的参数q是QUEUE*类型,q所指向的那个QUEUE也是循环链表中的一个节点(一个典型的结构为一个链表头节点加上若干链表项节点组成一个双向循环链表,从另一个角度来说,每一个节点都可以成为头节点,通过链表遍历其他节点)

宏的基本四个操作,基于循环链表结构:

  1. QUEUE_NEXT操作的是q指向节点中QUEUE数组第一个元素
  2. QUEUE_PREV操作的是q指向节点中QUEUE数组第二个元素
  3. QUEUE_PREV_NEXT操作的是q指向节点的前一个节点中QUEUE数组第一个元素
  4. QUEUE_NEXT_PREV操作的是q指向节点的下一个节点中QUEUE数组第二个元素

各个宏的操作对象如下:

  1. QUEUE_FOREACH的第一个参数为迭代变量的指针(需要用户自己定义对应的QUEUE变量),第二个参数为链表头节点的指针,在迭代过程中,此链表头节点对应的数据结构不会被扫描到。假如迭代过程通过QUEUE_INSERT_TAIL去添加链表项,则有可能导致死循环
  2. QUEUE_HEAD用于获取第一个链表项节点的指针
  3. QUEUE_INIT宏简单的将QUEUE的两个指针赋值q(指向自己)
  4. QUEUE_ADD的两个参数指针(h和n)指向的节点都是链表头节点,该宏的功能是将n的第一个链表项节点接到h的最后一个链表项节点,并将n的最后一个链表项节点接到h指向的链表节点。整个操作结束后,n指向的节点会变成一个独立的节点(注意此时并没有QUEUE_INIT该节点)
  5. QUEUE_SPLIT的三个参数中,h和n分别指向一个链表头节点,q指向h链表中的某一个链表项节点,该宏的功能是,h链表中,从q节点开始(包括q节点)后面的所有链表项节点都划分给n链表(注意,n指向的链表头节点必须是一个空节点,否则,此操作会覆盖该节点原本的内容)
  6. QUEUE_MOVE的两个参数h和n指向的节点都是链表头节点,该宏的作用是将h链表的所有链表项节点都给n链表,同样,n指向的节点必须为空节点,另外,此操作会变相对h指向的链表头节点进行QUEUE_INIT
  7. QUEUE_INSERT_HEAD第一个参数是链表头节点指针,第二个参数是链表项节点指针,将该链表项加入链表的表头位置,QUEUE_INSERT_TAIL则加入链表的尾端
  8. QUEUE_REMOVE宏让q指定的链表项节点脱离该循环链表,然而,脱离之后并没有重新QUEUE_INIT此链表项节点,因此可以通过骚操作让自己恢复回去(当某个链表的所有链表项都脱离了该链表,其实是变相的对该链表头节点进行QUEUE_INIT)

通过宏的封装,既保留了C语言实现的运行时高效,又兼顾了编码的简短和代码的可读性。在理解"queue.h”中所有宏操作的过程中,需要我们明确区分出链表头节点和链表项节点。

理解了宏的基本功能之后,我们看看libuv中是如何真正使用该循环链表:(代码来自uv-common.c源文件

void uv_walk(uv_loop_t* loop, uv_walk_cb walk_cb, void* arg) {
  QUEUE queue;
  QUEUE* q;
  uv_handle_t* h;

  QUEUE_MOVE(&loop->handle_queue, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_handle_t, handle_queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->handle_queue, q);

    if (h->flags & UV_HANDLE_INTERNAL) continue;
    walk_cb(h, arg);
  }
}

这段代码的功能是遍历以loop->handle_queue作为链表头节点的链表中每一个链表项节点,并对每个链表项节点调用walk_cb回调函数。

我们发现,代码中并不是直接对该链表进行QUEUE_FOREACH迭代,而是选择将该链表的所有链表项节点迁移给临时链表头queue,然后遍历queue,并且在遍历过程中将每一个节点加回loop->handle_queue。

仔细思考不难发现,libuv无法保证walk_cb回调函数中不去往loop->handle_queue链表中添加新的节点,假如直接对loop->handle_queue进行QUEUE_FOREACH迭代,那么很容易就会造成死循环,迭代过程永不停息!

总结

软件开发需要大量实践,从教科书学到的知识要善于运用到实际编码过程中,我们才能够做到知其然也知其所以然。从开源项目中学习相关数据结构往往能事半功倍,理解深刻,简单的一个双向链表,经过仔细推敲也能有些许收获。另外,系统头文件sys/queue.h也提供了许多队列的实现,同样也是使用纯C语言宏来实现,有兴趣的同学可参考此博客man手册