用户态线程实现

2023/7/10 项目

# 1. 概述

  所谓用户态线程就是把内核态的线程在用户态实现了一遍而已,目的是更轻量化(更少的内存占用、更少的隔离、更快的调度)和更高的可控性(可以自己控制调度器)。用户态所有东西内核态都「看得见」,只是对于内核而言「用户态线程」只是一堆内存数据而已。

  线程并不是一个具体的名词,在不同语境下可以指代不同的东西,但都代表多个任务共享同一个 CPU。例如 Intel 的超线程技术可以让操作系统以为有两个核,在 CPU 层面通过共用原件来做到一个物理核执行两个逻辑核的任务;操作系统层面的线程就是所谓的「内核态线程」;「用户态线程」则多种多样,只要能满足在同一个内核线程上执行多个任务的都算,例如 coroutine、golang 的 goroutine、C# 的 Task。不要被名词所局限,他们其实是相同的东西。

  用户态线程系统应该包含两个部分,一个部分是按内核代码原则设计用户态的线程库,由一系列的函数和以线程控制快TCB为核心的一系列数据结构组成;另一个部分是演示系统,调用线程库创建多线程的程序,使其并发执行,以展示系统的运行状态,显示系统的关键数据结构的内容。

# 2. 线程切换原理

  线程切换有两个要点。第1个要点是保存现场,第2个要点则是记录接下来需要执行的指令。我们需要细化的考虑一下——现场指的是什么,接下来需要执行的指令指的是什么,以及如何保存现场现场该保存在哪里?显而易见,“保存现场”则是保存CPU内部的寄存器状态。保存接下来需要执行的指令就是需要保存接下来要执行的指令的地址。这些内容该保存到哪里呢?当然是线程的栈里面。具体来说,线程的切换有以下几个要点:

  • 我们需要为每一个线程设立一个独立的,互相不干扰的栈空间。
  • 当线程发生切换的时候,当前线程被切换之前,需要把自己的现场进行完好的保留,同时记录下下一条需要执行指令的指令地址。
  • 把CPU的栈顶指针寄存器esp切换到即将被调入的线程的堆栈的栈顶地址,完成了线程栈空间的切换。

  经过上述这几个步骤,我们便完成了线程的切换,由于上面的步骤需要直接访问CPU的寄存器,于是这个过程往往是采用汇编的方式来进行。

# 3. 系统结构和主要的算法设计思路

# 3.1 需要做的工作

  1. 建立描述线程的数据结构TCB;
  2. 建立描述线程队列的数据结构tasks;
  3. 通过调用thread_create函数创建线程;
  4. 使用两种方式启动线程:
  • 通过调用函数detach实现线程的分离式启动(父线程不必等待子线程执行结束,可以继续执行)
  • 通过调用thread_join实现阻塞式启动(父调线程等待该子线程结束后才能继续执行)
  1. 实现多种线程状态,线程状态表如下:
状态名称 含义 状态特点
THREAD_READY 线程就绪 线程正在运行
THREAD_RUNNING 线程可调度 线程处于可调度队列,但暂时没有得到cpu资源
THREAD_SLEEP 线程睡眠 线程会在睡眠时间内不参与线程的调度,保持沉默
THREAD_BLOCK 线程阻塞 线程由于等待某个事件发生而阻塞,不接受cpu调度
THREAD_STOP 线程停止 线程收到调控,停止运行
THREAD_DISPOSED 线程撤销 线程的空间需要被撤销掉
  1. 实现多线程的状态转换,通过一系列的线程状态转换函数来实现线程的状态切换,这些函数如下表所示:
函数名 参数 函数作用
resume int tid 将标号为tid的线程状态设置为THREAD_RUNNING
wait int tid 将标号为tid的线程状态设置为THREAD_BLOCK
mysleep int tid 将标号为tid的线程状态设置为THREAD_SLEEP
dispose int tid 将标号为tid的线程状态设置为 THREAD_DISPOSED
remove_th int tid 将标号为tid的线程状态设置为 THREAD_STOP
  1. 实现多线程的切换,线程的切换需要通过汇编来完成。因此需要编写汇编代码完成线程上下文的切换。线程切换的方式有两种,第一种时主动切换,调用schedule完成到切换到指定线程的任务;另外一种则是根据线程调度来完成切换,当线程需要调度时,自动完成线程切换;
  2. 实现线程的调度算法。本次设计的线程库采用了时间片轮转调度算法,该算法根据线程优先级为每个线程设置了时间片,使得每一个线程都能有相对公平的机会得到系统资源。

# 3.2 算法设计的一些细节

  我们需要提几个问题,反反复考虑相关的实现细节之后再进行编码实现,这样会极大的提高我们编码的效率。

# 3.2.1 如何设计线程的栈空间?

  我们可以通过动态分配一个连续的地址空间来作为线程的栈空间。

# 3.2.2 如何保留CPU当前执行指令的下一条指令地址?

  通过call指令来完成对对下一条指令地址的保存,这里是对call指令一个非常巧妙的使用方法,需要非常丰富的编程经验以及对计算机汇编语言的升入理解。下面将详细讲解这一点。

  前面已经分析过了,我们需要设计两种线程切换的方式,即主动切换和时钟中断切换。主动切换需要当前线程调用调度函数schedule来完成,而时钟中断切换则是通过设置时钟中断,中断后执行schedule函数来完成。我们分别考虑这两种情况下该如何保留下一条指令的地址。提前说明:switch(tid)为一个汇编函数,作用为切换到tid线程。

  1. 主动调度

  当线程主动调度的时候,线程1调用switch函数切换到线程2,当线程1执行了call switch指令后,线程1的栈空间如下图所示: 线程1的栈空间标

ip则保留了switch 函数执行完成后需要执行的指令地址,也即为下一条指令的地址。当线程1完成了现场的保存后,线程1的栈空间如下图所示: 现场保存后线程1的栈空间

最后,将esp保存到TCB中即可。当线程1重新被调入执行的时候,只需要将esp切换为tcb中保存的esp, 这样便完成了线程栈空间的切换,然后switch函数将会执行回复线程的操作,将线程1栈中的状态一一弹出,最后esp指向了ip,然后ret操作之后, 线程1便恢复了原来的执行状态。

  1. 时间片结束中断调度

  线程1在执行过程中时间片执行结束,产生中断,执行中断处理函数 schedule,schedule函数会从线程队列tasks中挑选一个最合适的线程,然后将其调入cpu执行。因此,当中断产生后,线程1的栈变成如下状态,首先是保存线程状态(中断基操)。 中断后线程1的栈

注意,此处的ip为中断前线程下一条指令的地址。接下来,线程1响应中断,call schedule(实际响应中段并不是call,此处为简化描述暂记为call)之后线程1的栈为: call后线程1的栈

注:上图中红色部分为中断前保留的线程1现场,黄色部分为现场切换前保留的现场,dest_id为schedule选出的需要被调入执行的目标线程id,而ip1则为中断服务程序调用schedule并执行玩成后需要执行的下一条指令的地址。故由上可知,当线程1重新被调入时,线程切换部分(黄色部分)的现场保留与恢复上文已经说过,不再赘述,而黄色部分的现场被恢复后,紧接着就是中断现场的恢复了——红色部分的状态被恢复,也就是恢复到了线程1中断前的状态,使得线程1最终恢复到了原来的状态。

# 4. 主要数据结构

# 4.1 线程控制块TCB

  线程控制块(Thread Control Block,TCB)是与进程的控制块(PCB)相似的子控制块,只是TCB中所保存的线程状态比PCB中保存少而已。在用户态线程库中TCB是线程存在的唯一证明,通过控制TCB中的数据,我们可以对线程的状态等一系列的事务进行操作。因此TCB是线程的核心数据结构。据此,我们设计出了以下数据结构作为TCB。如下所示是TCB的数据结构:

struct task_struct {
 
  int id;  //线程的标识符
 
  void (*th_fn)(); //指向线程函数的函数指针
 
  int esp; //用来在发生线程切换是保存线程的栈顶地址
 
  unsigned int wakeuptime; // 线程唤醒时间
 
  int status; // 线程状态
 
  int counter; // 时间片
 
  int priority; // 线程优先级
 
  int stack[STACK_SIZE]; //现场的栈空间
 
};

task_struct即为TCB,此处只是命名不同而已。TCB中各个数据的详细作用已经标注在了上图中,但是,仍然有一些地方需要我们特别注意。th_fn为指向函数的指针,我们传入该参数的时候,需要把函数的地址以整数的形式传入。esp则记录了栈顶,一开始初始化的时候,stack栈中预先保存好了现场的初始状态以便线程进行调度,而esp初始时则指向了stack的栈顶,初始stack如下图所示:

stack[STACK_SIZE-11] = 0; // eflags
 
  stack[STACK_SIZE-10] = 0; // eax
 
  stack[STACK_SIZE-9] = 0; // edx
 
  stack[STACK_SIZE-8] = 0; // ecx
 
  stack[STACK_SIZE-7] = 0; // ebx
 
  stack[STACK_SIZE-6] = 0; // esi
 
  stack[STACK_SIZE-5] = 0; // edi
 
  stack[STACK_SIZE-4] = 0; // old ebp  
 
  stack[STACK_SIZE-3] = (int)start; // ret to start   线程第一次被调度时会在此启动
 
  // start 函数栈帧,刚进入 start 函数的样子 
 
  stack[STACK_SIZE-2] = 100;// ret to unknown,如果 start 执行结束,表明线程结束 
 
  stack[STACK_SIZE-1] = (int)tsk; // start 的参数

由上可知,在初始化线程的时候,我们需要在stack中设置好线程上下文的初始环境,同时传入start函数的地址作为启动函数的地址。

  wakeuptime是做什么用的呢?wakeuptime指定的线程从睡眠状态并唤醒的时间点。当线程调用sleep函数之后,wakeuptime便被设置为当前的时间加上线程需要休眠的时间,同时线程的状态被设置为thread_sleep。当线程发生调度的时候,调度函数会检查每一个处于睡眠状态的线程,如果当前的时间大于wakeuptime则将其状态设置为THREAD_RUNNING,重新参与现场的调度。

  另外要注意的是,counter是如何发挥作用的呢?我们为每一个线程设定了一定数量的时间片,counter记录了线程当前还有多少时间片,每一个时间片都是一个单位的时间,比如说10毫秒。每一个时间片结束都会发生一次调度中断,这个中断的中断服务子程序会检查当前线程的counter是否小于0,如果小于零则代表当前线程的时间片用完了而需要进行线程的调度,调度算法会从线程队列中寻找另外一个可调度的而且counter大于0且counter最大的线程进行调度,否则的话直接结束中断。

  priority代表着线程的优先级,当所有的县城时间片都已经用完的时候,需要重新为每一个线程分配时间片。每一个行程卡分配多少的时间变得这个就由priority来决定。优先级高的线程能够分配到更多的时间片,而优先级低的线程分配到时间片就相对的少,这样便实现了线程之间的优先级,让优先级高的线程能够得到更多的CPU处理时间,而线程优先级低的线程CPU处理时间则相对较少。

# 4.2 线程队列

  TCB的设计仅仅是针对于每一个线程来进行设计的,我们还需要一个数据结构来将所有的线程集合起来,让调度算法可以对其进行统一的操作。这个数据结构便是线程队列。队列有以下特点:

  1. 队列的容量一旦在构造时指定,后续不能改变;

  2. 插入元素时,在队尾进行;删除元素时,在队首进行;

  3. 队列满时,插入元素会阻塞线程;队列空时,删除元素也会阻塞线程;

  4. 支持公平/非公平策略,默认为非公平策略(上一节讲述了更具counter值的大小进行调度)。

同时,队列应该是多队列,包括阻塞队列,就绪队列,睡眠队列等,然而本设计只设计一个队列就完成了所有队列的功能,其巧妙之处就在于,在线程的TCB中我们设置了一个线程状态标志,调度算法根据这个标识就能够进行合理的调度了。具体,该线程的队列设计如下:

static struct task_struct init_task = {0, NULL,  0,THREAD_RUNNING, 0, 15, 15, {0}};
 
struct task_struct *current = &init_task;
 
struct task_struct *task[NR_TASKS] = {&init_task,};

init_task代表的是当前的主线程,其线程id为0。

# 5. 主要流程的程序实现

# 5.1 线程的创建

int thread_create(int *tid, void (*start_routine)()) {
 
  int id = -1;
 
  struct task_struct *tsk = (struct task_struct*)malloc(sizeof(struct task_struct));
 
  while(++id < NR_TASKS && task[id]);
 
  if (id == NR_TASKS) return -1;
 
  task[id] = tsk;
 
  if (tid) *tid = id;  //返回值
 
  tsk->id = id;
 
  tsk->th_fn = start_routine;
 
  int *stack = tsk->stack; // 栈顶界限
  tsk->esp = (int)(stack+STACK_SIZE-11);
 
  tsk->wakeuptime = 0;
 
  tsk->status = THREAD_STOP;
 
  tsk->counter = 15;
 
  tsk->priority = 15;
 
  // 初始 switch_to 函数栈帧
 
  stack[STACK_SIZE-11] = 0; // eflags
 
  stack[STACK_SIZE-10] = 0; // eax
 
  stack[STACK_SIZE-9] = 0; // edx
 
  stack[STACK_SIZE-8] = 0; // ecx
 
  stack[STACK_SIZE-7] = 0; // ebx
 
  stack[STACK_SIZE-6] = 0; // esi
 
  stack[STACK_SIZE-5] = 0; // edi
 
  stack[STACK_SIZE-4] = 0; // old ebp  
 
  stack[STACK_SIZE-3] = (int)start; // ret to start   线程第一次被调度时会在此启动
 
  // start 函数栈帧,刚进入 start 函数的样子 
 
  stack[STACK_SIZE-2] = 100;// ret to unknown,如果 start 执行结束,表明线程结束 
 
  stack[STACK_SIZE-1] = (int)tsk; // start 的参数
 
  /*
 
  汇编函数调用,c风格参数传递
 
  传入参数分别是IP,c1,c2
 
  */
  return 0;
 
}

# 5.2 线程调度

void schedule() {
 
   //线程的调度函数
 
    struct task_struct *next = pick();
 
    if (next) {
 
      switch_to(next); //线程的上下文切换
 
    }
 
}
 
 
static struct task_struct *pick() {
 
/*找到时间片最大的线程进行调度*/
 
  int i, next, c;
 
  for (i = 0; i < NR_TASKS; ++i) {
 
    if(!task[i])continue;
 
    if( task[i]->status == THREAD_EXIT){
 
      if(task[i]!=current)
 
        remove_th(i);
 
      continue;
 
    }
 
  
 
    if (task[i]->status == THREAD_DISPOSED)
 
   {
 
     if(task[i]!=current)
 
      remove_th(i);
 
     continue;
 
   }
 
    if (task[i]->status != THREAD_STOP&& task[i]->status != THREAD_BLOCK
 
        && getmstime() > task[i]->wakeuptime) {
 
      task[i]->status = THREAD_RUNNING;
 
    }
 
  }
 
  //上面的作用是唤醒睡眠的线程,使其可以接受调度
 
  while(1) {
 
    c = -1;
 
    next = 0;
 
    for (i = 0; i < NR_TASKS; ++i) {
 
      if (!task[i]) continue;
 
      if (task[i]->status == THREAD_RUNNING && task[i]->counter > c) {
 
        c = task[i]->counter;
 
        next = i;
 
      }
 
    }
 
    if (c) break;
 
    // 如果所有任务时间片都是 0,重新调整时间片的值
 
    if (c == 0) {
 
      for (i = 0; i < NR_TASKS; ++i) {
 
        if(task[i]) {
 
          task[i]->counter = task[i]->priority + (task[i]->counter >> 1);
        }
 
      }
 
    }
 
  }
 
  
 
  return task[next];
 
}

# 5.3 线程上下文切换

.section .text
 
.global switch_to
 
switch_to:
 
  call closealarm /* 模拟关中断 */
 
  push %ebp
 
  mov %esp, %ebp /* 更改栈帧,以便寻参 */
 
  /* 保存现场 */
 
    push %edi
 
    push %esi
 
    push %ebx
 
    push %edx
 
    push %ecx
 
    push %eax
 
  pushfl
 
  /* 准备切换栈 */
 
  mov current, %eax /* 取 current 基址放到 eax     */
 
  mov %esp, 8(%eax) /* 保存当前 esp 到线程结构体 */ 
 
  mov 8(%ebp), %eax /* 8(%ebp)即为c语言的传入参数next   取下一个线程结构体基址*/
 
  mov %eax, current /* 更新 current */
 
    mov 8(%eax), %esp /* 切换到下一个线程的栈 */
 
  /* 恢复现场, 到这里,已经进入另一个线程环境了,本质是 esp 改变 */
 
  popfl
 
    popl %eax
 
    popl %ecx
 
    popl %edx
 
    popl %ebx
 
 
    popl %esi
 
    popl %edi
 
    popl %ebp
 
  call openalarm /* 模拟开中断  */
 
ret

# 5.4 thread_join——阻塞式线程启动

int thread_join(int tid) {
 
  while(task[tid]&&task[tid]->status != THREAD_EXIT) {
 
      if(task[tid]->status==THREAD_STOP){
 
          task[tid]->status=THREAD_RUNNING;
 
      }
 
    schedule();
 
  }
 
}

# 5.5 detach——分离式线程启动

void detach(int tid){
 
 
  if(task[tid]!=NULL && task[tid]->status==THREAD_STOP&& task[tid]->status!=THREAD_EXIT){
 
    task[tid]->status=THREAD_RUNNING;
 
    schedule();
 
  }
 
}

# 5.6 等待子线程执行结束而阻塞父线程

void wait_all(){
 
int i=0;
 
int remain=0;
 
while(1){
 
  remain=0;
 
for(i=1;i<NR_TASKS;i++){
 
if(task[i]&&task[i]->status!=THREAD_EXIT){
 
   remain=1;
 
   schedule();
 
   break;
 
   continue;
 
}
 
}
 
if(!remain){
 
  break;
 
}
 
}
 
}
 
void wait_thread(int tid){
 
while (task[tid]&&task[tid]->status != THREAD_EXIT)
 
{
 
  schedule();
 
}
 
}

# 参考

用户态线程库原理、设计与实现 (opens new window)