十一. 线程同步

乱序输出

前面完成了多线程之后,那么肯定会涉及到线程的同步问题。因为线程的执行是随机的,乱序的。虽然我们这个小kernel实现的调度器算法比较简陋,它的随机性没那么强,但是每次进行线程切换的时候,还是有可能产生问题。并且问题已经产生了。

乱序输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int main(void)
{
put_str("I am kernel\n");
init_all();
thread_start("thread_a", 31, k_thread, "argA ");
thread_start("thread_b", 8, k_thread, "argB ");
intr_enable();
while (1)
{
put_str("Main ");
}
return 0;
}

/* 在线程中运行的函数 */
void k_thread(void *arg)
{
while (1)
{
put_str((char*)arg);
}
}

产生这总结果的原因是由上面这段代码所致。

我们预期的输出结果应该是这样的

mark

可以看到,上面的输出结果显得比较杂乱,主要是字符串的交界处,会出现却字符,多空格的现象。而且还引发了GP异常。

这个结果正是证实了之前所说的,每发生一次线程切换,都有可能带来问题。

而带来问题的原因就是这个里面的put_str函数。

这个函数主要做了三个事情实现打印

  1. 获取光标值
  2. 将光标值转换为显存地址,在此地址处写入字符
  3. 更新光标的值

很明显这三步应该同时完成,不可拆分的。那么想象一下这种情况,线程1执行完了第一步之后,时间片用完,就被换下了CPU。现在轮到线程2执行,线程2又会执行第一步,且这个第一步获取到的光标值会和之前线程1获取到的一样。然后线程2开始向后打印。

终于又轮到线程1执行了,当时他是执行到第一步,那么接下来他就开始执行第二步。可以看到,问题就已经产生了。线程1获取到的光标值早就被线程2用到了,那么线程1在打印的过程中就会覆盖线程2打印的数据。

这个是输出的混乱问题,引发GP异常的原因主要是由于这个简陋的滚屏操作所致,在滚屏的过程中产生了线程切换,导致最后获取到的显存地址会超过显存段而引发的GP异常

信号量

既然要进行线程同步,那么肯定要在需要同步的地方阻止线程的切换。这里主要通过信号量的机制对公共资源加锁,达到同步的目的。

信号量的原理本省比较简单。通过P、V操作来表示信号量的增减。

P、V操作的执行过程

P操作,减少信号量

  1. 判断信号量是否大于0
  2. 如果大于0, 将其减一
  3. 如果小于0,将当前线程阻塞

V操作,增加信号量

  1. 将信号量的值加一
  2. 唤醒等待的线程

信号量是一个全局的共享资源,所以对其进行增减操作的时候必须是原子操作,这个原子操作通过关中断来实现。

将线程阻塞的操作也很容易实现,只需要将其从就绪队列中移除即可,下面简单的看一下实现过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* 信号量结构 */
struct semaphore
{
uint8_t value;
struct list waiters;
};

/* 锁结构 */
struct lock
{
task_struct* holder; // 锁的持有者
struct semaphore semaphore; // 用二元信号量实现锁
uint32_t holder_repeat_nr; // 锁的持有者重复申请锁的次数
};

P操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 信号量down操作 */
void sema_down(struct semaphore *psema)
{
/* 关中断来保证原子操作 */
enum intr_status old_status = intr_disable();
while (psema->value == 0)
{
// 若value为0,表示已经被别人持有
ASSERT(!elem_find(&psema->waiters, &running_thread()->general_tag));
/* 当前线程不应该已在信号量的waiters队列中 */
if (elem_find(&psema->waiters, &running_thread()->general_tag))
{
PANIC("sema_down: thread blocked has been in waiters_list\n");
}

/* 若信号量的值等于0,则当前线程把自己加入该锁的等待队列,然后阻塞自己 */
list_append(&psema->waiters, &running_thread()->general_tag);
thread_block(TASK_BLOCKED); // 阻塞线程,直到被唤醒
}
/* 若value为1或被唤醒后,会执行下面的代码,也就是获得了锁。*/
psema->value--;
ASSERT(psema->value == 0);
/* 恢复之前的中断状态 */
intr_set_status(old_status);
}

V操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* 信号量的up操作 */
void sema_up(struct semaphore *psema)
{
/* 关中断,保证原子操作 */
enum intr_status old_status = intr_disable();
ASSERT(psema->value == 0);

if (!list_empty(&psema->waiters))
{
task_struct *thread_blocked = elem2entry(task_struct, general_tag, list_pop(&psema->waiters));
thread_unblock(thread_blocked);
}

psema->value++;
ASSERT(psema->value == 1);
/* 恢复之前的中断状态 */
intr_set_status(old_status);
}

获取锁和释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* 获取锁plock */
void lock_acquire(struct lock *plock)
{
/* 排除曾经自己已经持有锁但还未将其释放的情况。*/
if (plock->holder != running_thread())
{
sema_down(&plock->semaphore); // 对信号量P操作,原子操作
plock->holder = running_thread();
ASSERT(plock->holder_repeat_nr == 0);
plock->holder_repeat_nr = 1;
}
else
{
plock->holder_repeat_nr++;
}
}

/* 释放锁plock */
void lock_release(struct lock *plock)
{
ASSERT(plock->holder == running_thread());
if (plock->holder_repeat_nr > 1)
{
plock->holder_repeat_nr--;
return;
}
ASSERT(plock->holder_repeat_nr == 1);

plock->holder = NULL; // 把锁的持有者置空放在V操作之前
plock->holder_repeat_nr = 0;
sema_up(&plock->semaphore); // 信号量的V操作,也是原子操作
}

阻塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* 当前线程将自己阻塞,标志其状态为stat. */
void thread_block(task_status stat)
{
/* stat取值为TASK_BLOCKED,TASK_WAITING,TASK_HANGING,也就是只有这三种状态才不会被调度*/
ASSERT(stat == TASK_BLOCKED || stat == TASK_WAITING || stat == TASK_HANGING);

enum intr_status old_status = intr_disable();
task_struct* cur_thread = running_thread();

cur_thread->status = stat; // 置其状态为stat
schedule(); // 将当前线程换下处理器

/* 待当前线程被解除阻塞后才继续运行下面的intr_set_status */
intr_set_status(old_status);
}

唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* 将线程pthread解除阻塞 */
void thread_unblock(task_struct* pthread)
{
enum intr_status old_status = intr_disable();
ASSERT((pthread->status == TASK_BLOCKED) || (pthread->status == TASK_WAITING) || (pthread->status == TASK_HANGING));

if (pthread->status != TASK_READY)
{
ASSERT(!elem_find(&thread_ready_list, &pthread->general_tag));

if (elem_find(&thread_ready_list, &pthread->general_tag))
{
PANIC("thread_unblock: blocked thread in ready_list\n");
}

list_push(&thread_ready_list, &pthread->general_tag); // 放到队列的最前面,使其尽快得到调度
pthread->status = TASK_READY;
}
intr_set_status(old_status);
}