跳转至

Linux线程互斥与同步

约 5477 个字 713 行代码 3 张图片 预计阅读时间 27 分钟

前置知识

在进程间通信部分已经提到过,当两个进程访问同一个资源时,这个资源就被称为是共享资源,当这个共享资源被保护时就属于临界资源,对应的访问临界资源的代码就属于临界区。在这个概念基础之上,本节主要讨论共享资源的保护措施,在线程部分一般有两种保护措施:

  1. 互斥:保证共享资源同一时刻只会有一个线程正在访问
  2. 同步:在互斥的基础之上保证合理和高效

互斥锁(互斥量)的由来

互斥量是实现互斥的基本方式,对于一个没有互斥保护的共享资源,如果有多个线程同时访问该资源就会出现意外问题,例如下面的代码:

Note

下面的代码用到了上一节封装的线程库

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <vector>
#include "thread.hpp"

using namespace ThreadModule;
#define NUM 4

int tickets = 1000;

void getTicket()
{
    while (true)
    {
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程获取到一张票:" << tickets-- << std::endl;
        }
        else
        {
            break;
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 等待多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].join();

    return 0;
}

上面代码的主要功能是实现4个线程之间进行“抢票”,这段代码看似没有任何问题,判断逻辑也正常,但是一旦运行上面的代码就会看到最终票数减到了-1和-2。此时就是问题所在,在上面的逻辑中,明明当票数小于等于0时就会走break,却会将票数减少到负数

下面分析上面的代码为什么会出现问题:

当多线程被创建后启动时,所有的线程都会看到tickets这个变量,当tickets变量值为1时,满足if条件进入到usleep()接口,当前线程就会休眠1秒,此时可能存在线程被切换的情况,假设已经被切换,那么当前线程就没有对tickets变量进行修改,并且因为线程切换时会保存上下文数据,所以此时这个线程看到的tickets依旧是1,如果刚好4个线程都是这种情况,那么就会出现4个线程全部在usleep()接口处

当下一次所有线程都从usleep接口出来时,其上下文数据中tickets变量都是1,但是又因为都进入了if判断,所以都可以执行tickets--操作,所以最后tickets变量中的值减少到负数

整个过程示意图如下:

当然,也有其他的情况导致减少到负数,例如下面的两种情况:

  1. if判断为真,但是因为if判断并不是原子操作,导致刚结束if判断成立时,线程就被切换,此时线程也进入了if分支中
  2. tickets--操作并不是原子操作,当线程刚准备进行自减操作时就被切换,此时因为tickets变量还是1,导致其他线程都进入了if分支中

不论是哪一种原因,最后的结果都是tickets变量减少到了负数,为了解决这个问题,就需要考虑一种策略:当一个线程访问tickets变量时,其他线程不可以访问该变量。而因为tickets变量在整个过程中属于共享资源,所以这个策略也可以推广到:当一个线程访问共享资源时,其他线程不可以访问该共享资源,这种策略也被称为互斥锁(或互斥量,简称为互斥)

通过上面的了解,对互斥锁下一个定义:互斥锁(Mutual exclusion,缩写:Mutex)是一种用于多线程编程中,防止多个线程同时对同一公共资源进行读写的机制

Linux下的互斥锁与封装

基本接口介绍

在Linux下实现互斥需要利用到用户级线程库中的操作:

  1. 创建互斥锁,使用pthread_mutex_t类型创建互斥锁变量
  2. 初始化互斥锁,使用PTHREAD_MUTEX_INITIALIZER初始化互斥锁变量或者使用pthread_mutex_init接口进行初始化
  3. 进入临界区之前加锁,使用pthread_mutex_lock接口
  4. 出临界区进行解锁,使用pthread_mutex_unlock接口
  5. 销毁互斥锁,使用pthread_mutex_destroy接口

对于创建互斥锁的两种方式:如果使用PTHREAD_MUTEX_INITIALIZER直接初始化互斥锁变量,那么就不需要使用pthread_mutex_init对互斥锁变量进行初始化以及不需要使用pthread_mutex_destroy对互斥锁进行销毁

Note

注意,不可以通过初始化赋值的形式初始化互斥锁变量

使用PTHREAD_MUTEX_INITIALIZER初始化并实现互斥

创建并初始化互斥锁变量

因为互斥锁本质也是一种资源,并且为了确保多个线程可以访问到同一个互斥锁变量,需要将该变量设置为全局变量,即:

C
1
2
3
4
// ...
int tickets = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// ...

进入临界区前加锁

加锁前首先需要确定临界区,何为临界区,就是访问临界资源的代码,而临界资源就是加了保护的共享资源,所以先找到共享资源,在前面的代码中,访问的tickets变量就是共享资源,所以访问这个共享资源的代码段属于临界区,所以在if判断之前进行加锁

加锁时需要使用到pthread_mutex_lock接口,其原型如下:

C
1
int pthread_mutex_lock(pthread_mutex_t *mutex);

该接口只有一个参数,表示使用的锁对象

针对前面的代码,使用如下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 访问临界区之前加锁
pthread_mutex_lock(&lock);
if(tickets > 0)
{
    // ...
}
else
{
    break;
}

离开临界区后解锁

所谓理解离开临界区,就是后续的代码没有再访问共享资源,在前面的代码中对应的就是else语句之后,解锁使用到的接口为pthread_mutex_unlock,原型如下:

C
1
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_lock一样,传递一个参数,该参数表示使用的锁对象

Note

需要注意,加锁和解锁必须使用同一个锁对象,否则加锁无意义

对前面的代码修改如下:

C
1
2
3
4
5
6
7
// ...
else
{
    break;
}
// 离开临界区后解锁
pthread_mutex_unlock(&lock);

测试

整体代码修改如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <vector>
#include "thread.hpp"

using namespace ThreadModule;
#define NUM 4

int tickets = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

void getTicket()
{
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程获取到一张票" << tickets-- << std::endl;
        }
        else
        {
            break;
        }
        pthread_mutex_unlock(&lock);
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 等待多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].join();

    return 0;
}

现在再运行上面的代码即可看到tickets变量不会再减少负数,当没有票可以抢时,按理说所有线程进入break,此时主线程依次回收所有线程,整个进程结束,但是上面的代码却卡死,如下图所示:

出现上面这种问题,本质是因为如果一个线程当前持有锁,但是因为tickets小于或等于0,导致走到了else分支直接跳出循环,但是跳出循环时持有锁的线程并没有释放锁,导致其他线程一直处于等待锁的状态,从而导致整个进程卡死

所以为了解决这个问题,考虑改变解锁的位置如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ...

void getTicket()
{
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            // ...
            pthread_mutex_unlock(&lock);
        }
        else
        {
            pthread_mutex_unlock(&lock);
            break;
        }
    }
}

// ...

此时再运行代码即可发现问题已经解决:

使用pthread_mutex_init初始化并实现互斥

Note

因为实现互斥的方式和前面是一样的,所以下面只会讨论创建和销毁锁对象

创建并初始化互斥锁变量

除了使用PTHREAD_MUTEX_INITIALIZER初始化互斥锁变量外,还可以使用pthread_mutex_init接口,其原型如下:

C
1
int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr_t * attr);

该接口第一个参数表示使用的锁,第二个参数表示锁属性,一般设置为NULL即可

基本使用如下:

C
1
2
pthread_mutex_t lock;
pthread_mutex_init(&lock, NULL);

销毁锁变量

如果使用的是pthread_mutex_init初始化锁变量,就需要使用pthread_mutex_destroy销毁锁变量,其原型如下:

C
1
int pthread_mutex_destroy(pthread_mutex_t *mutex);

该接口的参数表示使用的锁

基本使用如下:

C
1
pthread_mutex_destroy(&lock);

修改前面的代码

结合创建接口和销毁接口修改前面的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iostream>
#include <vector>
#include "thread.hpp"

using namespace ThreadModule;
#define NUM 4

int tickets = 1000;
pthread_mutex_t lock;

void getTicket()
{
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程获取到一张票" << tickets-- << std::endl;
            pthread_mutex_unlock(&lock);
        }
        else
        {
            pthread_mutex_unlock(&lock);
            break;
        }
    }
}

int main()
{
    pthread_mutex_init(&lock, NULL);
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 等待多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].join();

    pthread_mutex_destroy(&lock);

    return 0;
}

运行上面的代码同样可以实现正常访问临界资源

互斥之后代码运行缓慢

前面的抢票代码如果对比加锁前的代码运行速度和加锁后的代码运行速度可以发现,加锁后的代码运行速度慢了些许,主要的原因是加锁后只能保证同一时刻只有一个线程在访问临界资源,但是当有多个线程时,其他线程就需要等待正在访问临界资源的线程释放锁再抢锁才能访问临界资源,所以整体的并发性降低了,从而导致加锁后的代码整体运行缓慢

互斥锁的基本实现原理

前面提到tickets变量属于共享资源,并且判断tickets变量是否大于0和tickets变量自减的操作都不是原子的,所以就有可能出现在这两步执行期间出现线程切换从而出现问题,对此解决的方案是加锁,但是锁本质上也是一个共享资源,既然是共享资源,那么从前面的理解来看,锁本身也需要保护。但是实际上,操作锁的过程是原子性的,即在汇编层下只需要一条指令就可以完成,所以只存在两种状态:执行前和执行后,既然操作锁的过程本身是原子性的,那么就可以不需要对这个锁进行额外保护

为了实现互斥锁操作,大多数体系结构都提供了swapexchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。根据这个原理,现在把lockunlock的伪代码改一下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
lock:
    movb $0, %eax
    xchgb %eax, mutex
    if(eax寄存器的内容 > 0)
    {
        // 执行逻辑
    }
    else
    {
        挂起等待;
    }

    goto lock;

unlock:
    movb $1, mutex
    唤醒等待锁mutex的线程
    // 执行逻辑

在上面的伪代码中,lock:表示下面的代码为加锁访问临界区,unlock:表示下面的代码离开临界区解锁

对于lock来说,当执行到movb $0, %eax表示将数字0移动到当前线程的寄存器中,接着执行xchgb %eax, mutex将互斥锁mutex中的值(默认是1)和当前线程寄存器中的值进行交换,此时当前线程的寄存器就是1,mutex变量中的值就是0,接着判断当前线程寄存器的值是否是1决定是否可以执行对应的逻辑,否则就需要等待锁。当线程被操作系统唤醒时,就需要执行goto lock继续抢锁,重复上面的逻辑。

对于unlock来说,执行movb $1, mutex将当前线程寄存器中的1移动到mutex变量中,这个操作就相当于释放锁,接着唤醒等待锁的线程即可让这些线程走到goto lock

上面对锁的操作之所以可以实现操作锁不需要额外保护就是因为加锁和释放锁都只是一行汇编代码,所以此时就只有执行对应汇编代码之前(没加锁或没解锁)和执行对应汇编代码之后(已加锁或已释放锁)两种状态,不论是在这两种状态的哪一种状态之前或者之后执行都不会有问题,所以操作锁不需要额外保护

所以,互斥锁的本质就是二元信号量,其变化状态也只有两种,这也是互斥锁不允许用户进行直接赋值初始化的原因之一

封装互斥锁

本次封装互斥锁分为两步:

  1. 对互斥锁的基本使用接口进行封装
  2. 采用C++的RAII思想封装互斥锁的加锁和解锁

对互斥锁的基本使用接口进行封装

封装基本使用接口相对比较容易,下面是参考代码:

Note

为了保证使用封装的互斥锁时始终只使用一把锁,需要保证这把锁不能被拷贝和赋值给新锁

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
namespace MutexModule
{
    class Mutex
    {
    private:
        Mutex(const Mutex &m) = delete;
        Mutex operator=(const Mutex &m) = delete;
    public:
        Mutex()
        {
            pthread_mutex_init(&_mutex, NULL);
        }

        void lock()
        {
            pthread_mutex_lock(&_mutex);
        }

        void unlock()
        {
            pthread_mutex_unlock(&_mutex);
        }

        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

    private:
        pthread_mutex_t _mutex;
    };
}

根据上面的代码修改前面的抢票代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <iostream>
#include <vector>
#include "thread.hpp"
#include "mutex.hpp"

using namespace ThreadModule;
using namespace MutexModule;
#define NUM 4

int tickets = 1000;

// 创建锁对象
Mutex lock;

void getTicket()
{
    while (true)
    {
        // 加锁
        lock.lock();
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程获取到一张票" << tickets-- << std::endl;
            // 解锁
            lock.unlock();
        }
        else
        {
            // 解锁
            lock.unlock();
            break;
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 等待多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].join();

    return 0;
}

采用C++的RAII思想封装互斥锁的加锁和解锁

RAII思想就是利用对象的生命周期对资源进行控制,而在上面的代码中,控制资源的操作就是加锁和解锁,所以只需要封装这两个操作即可,具体代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class MutexGuard
{
public:
    MutexGuard(Mutex &mutex)
        : _mutex(mutex)
    {
        // 创建对象时就自动上锁
        _mutex.lock();
    }

    ~MutexGuard()
    {
        // 销毁对象时就自动解锁
        _mutex.unlock();
    }

private:
    Mutex &_mutex;
};

需要注意,在上面的代码中,为了保证多个线程访问的是同一把锁,必须在构造时使用引用,并且成员变量也必须是引用,因为在前面封装互斥锁时限制了拷贝和赋值构造函数

此时再修改前面抢票的代码:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iostream>
#include <vector>
#include "thread.hpp"
#include "mutex.hpp"

using namespace ThreadModule;
using namespace MutexModule;
#define NUM 4

int tickets = 1000;

// 创建锁对象
Mutex lock;

void getTicket()
{
    while (true)
    {
        // 加锁
        MutexGuard lockGuard(lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程获取到一张票" << tickets-- << std::endl;
        }
        else
        {
            break;
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 等待多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].join();

    return 0;
}

此时之所以同样可以做到对tickets进行保护是因为创建MutexGuard对象时始终使用的是同一个Mutex对象,一旦一个线程离开了while循环的代码块就会销毁MutexGuard对象,此时就会调用对应的解锁函数

同步的由来

前面互斥锁已经解决了共享资源被多个线程访问时的并发问题,但是现在又有一个新问题,还是基于前面的抢票场景,如果一个线程抢到了票,其执行完了抢票逻辑,就需要释放锁并且唤醒其他进程一起再抢锁,乍一看的确没有问题:所有线程都重新竞争同一把锁,然而常识告诉我们一个拥有锁的线程可以做到刚把锁放回去又抢到锁,这种情况下导致这个线程抢到锁的概率很大,其他线程就只能一直处于等待->唤醒->等待,循环往复始终无法拿到锁从而产生线程饥饿问题。所以为了解决这个问题,就需要保证某一个线程释放锁时不能再次立即拿锁,而是需要和其他线程一起抢锁,并且为了保证所有线程都可以拿到锁,还需要保证这些线程是有顺序拿到锁,这种机制就是同步

从上面对同步的作用可以看出,同步的本质就是在原本的互斥基础上保证了运行有序和高效

在Linux中,实现同步是通过控制条件变量的方式。所谓条件变量,就是一种基于某种条件的变量,如果这个条件成立,就允许执行,否则不允许执行,例如上面的例子,对于已经保护的tickets来说,如果当前有票,就代表条件成立,所有线程都可以去抢锁,否则所有线程都需要等待直到条件成立,一般情况下,这些线程在底层是按照队列的模式进行等待,即先进先出,并且释放锁的线程需要重新排到队尾与其他线程一起等待直到条件满足被唤醒。在这整个过程中,条件变量就相当于是个信号,当有资源时通知对应的线程可以开始访问,否则就必须等待

Linux下的同步与封装

基本接口介绍

在Linux下实现同步需要经过下面的步骤:

  1. 创建条件变量,使用pthread_cond_t类型创建条件变量
  2. 初始化条件变量,使用PTHREAD_COND_INITIALIZER值或者使用pthread_cond_init接口进行初始化
  3. 条件不成立时等待,使用pthread_cond_wait接口使指定线程等待
  4. 条件成立时唤醒,使用pthread_cond_signal接口唤醒一个线程或者使用pthread_cond_broadcast唤醒所有线程
  5. 销毁条件变量,使用pthread_cond_destroy接口进行销毁

与互斥锁一样,如果初始化条件变量使用的是PTHREAD_COND_INITIALIZER就不需要使用pthread_cond_init再初始化以及pthread_cond_destroy销毁

Note

注意,不可以通过初始化赋值的形式初始化条件变量

使用PTHREAD_COND_INITIALIZER初始化并使用条件变量

与互斥锁一样,为了保证多个线程可以访问到同一种条件,条件变量需要作为共享资源:

C
1
2
3
4
5
// ...
int tickets = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// ...

条件不成立时等待

在Linux下,当条件不成立时使线程等待的接口为pthread_cond_wait,其原型如下:

C
1
int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mutex);

这个接口的第一个参数表示需要使用的条件变量,第二个参数表示需要释放的锁

在前面的抢票代码中,需要等待的情况就是没有票时,所以只需要在没有票的时候让所有线程进行等待即可,但是因为前面已经加锁,理论上来说,当一个线程进入等待就已经没有必要再抱着锁等待了,所以一个线程进入等待之前需要先释放当前拥有的锁,但是需要注意,并不需要手动调用pthread_mutex_unlock进行释放,因为pthread_cond_wait本身会先对当前线程的锁进行释放(即该接口的第二个参数),再让其进入等待队列

另外,本次实现的逻辑是当没有票时,线程通过pthread_cond_wait进入等待队列进行等待,当下一次有票时再被唤醒继续抢锁买票,所以也不需要再使用break,代码修改如下:

C
1
2
3
4
5
6
// ...
else
{
    // 条件不满足时等待
    pthread_cond_wait(&cond, &lock);
}

条件成立时唤醒

在Linux中,在指定条件变量下唤醒一个线程可以使用pthread_cond_signal接口,其原型如下:

C
1
int pthread_cond_signal(pthread_cond_t *cond);

该接口具有一个参数,表示指定的条件变量

如果需要一次性唤醒所有在指定条件变量下等待的线程,可以使用pthread_cond_broadcast接口,其原型如下:

C
1
int pthread_cond_broadcast(pthread_cond_t *cond);

在前面的抢票代码中,票的数量是固定的,所以本次需要对放票逻辑修改为主线程定期放票而不是程序运行时固定票数,每当主线程放票一次,就唤醒一个等待队列中的线程进行抢票即可:

C
1
2
3
4
5
6
7
8
int count = 10;
while (count--)
{
    tickets = 10;
    // 唤醒线程
    pthread_cond_signal(&cond);
    sleep(1);
}

测试

整体修改的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <iostream>
#include <vector>
#include "thread.hpp"

using namespace ThreadModule;
#define NUM 4

int tickets = 0;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void getTicket()
{
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程:" << pthread_self() % 10 << "获取到一张票:" << tickets-- << std::endl;
            pthread_mutex_unlock(&lock);
        }
        else
        {
            // 条件不满足时等待
            std::cout << "新线程:" << pthread_self() % 10 << "等待" << std::endl;
            pthread_cond_wait(&cond, &lock);
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 主线程放票10次,一次放票10张
    int count = 10;
    while (count--)
    {
        tickets = 10;
        std::cout << "主线程放票" << std::endl;
        // 唤醒线程
        pthread_cond_signal(&cond);
        sleep(1);
    }

    // 等待多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].join();

    return 0;
}

编译运行上面的代码,预期上需要看到的效果是主线程放10张票,4个线程轮流抢到10张票,但是实际上看到的却是一个线程(可能抢到票,也可能没有抢到,取决于主线程和抢票线程哪个线程先运行)进入等待后,只剩下主进程正在放票的输出。之所以会出现这个问题是因为当等待的线程被唤醒时,pthread_cond_wait会让线程重新持有指定的锁,但是走完else后回到循环体的第一条语句时又开始执行pthread_mutex_lock抢锁,此时就导致持有锁的线程一直等着锁,而其他线程永远拿不到锁,所以就只有主线程一个线程在跑

为了解决这个问题,可以考虑一种解决方案:在pthread_cond_wait后先释放锁,再让其去执行pthread_mutex_lock抢锁,代码如下:

C++
1
2
3
4
5
6
7
8
// ...
else
{
    // 条件不满足时等待
    std::cout << "新线程:" << pthread_self() % 1000 << "等待" << std::endl;
    pthread_cond_wait(&cond, &lock);
    pthread_mutex_unlock(&lock);
}

再次运行代码即可发现4个线程已经可以轮流抢票了,但是因为当有票时一次只唤醒一个线程,所以有票的时候只有一个线程在抢票,其他线程还在等待,所以就出现了10张票被一个线程抢完的情况

优化

为了看出4个线程是按照队列的顺序依次拿票,可以先让主进程在放票前休眠一段时间,此时所有抢票线程就会都进入等待队列,当被唤醒时就可以看到唤醒的顺序和等待的顺序是一致的,另外为了保证最后进程不会卡死,所以考虑将等待的逻辑修改为取消,即主线程放票完毕后终止所有线程

修改后的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <vector>
#include "thread.hpp"

using namespace ThreadModule;
#define NUM 4

int tickets = 0;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void getTicket()
{
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程:" << pthread_self() % 1000 << "获取到一张票:" << tickets-- << std::endl;
            pthread_mutex_unlock(&lock);
        }
        else
        {
            // 条件不满足时等待
            std::cout << "新线程:" << pthread_self() % 1000 << "等待" << std::endl;
            pthread_cond_wait(&cond, &lock);
            pthread_mutex_unlock(&lock);
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 主线程等待一段时间再放票,看出所有线程都在等待队列,再依次唤醒
    sleep(1);

    // 主线程放票10次,一次放票10张
    int count = 10;
    while (count--)
    {
        tickets = 10;
        std::cout << "主线程放票" << std::endl;
        // 唤醒线程
        pthread_cond_signal(&cond);
        sleep(1);
    }

    // 结束多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].cancel();

    return 0;
}

编译运行上面的代码即可看到刚开始4个线程正在等待,紧接着当主线程放票时,第一个等待的线程被唤醒抢到10张票,第二个等待的线程被唤醒抢到10张票,以此类推,最后主线程结束所有线程

使用pthread_cond_init初始化并使用条件变量

使用方式与互斥锁一致,不再演示

封装条件变量

原生版

封装条件变量本质还是封装对应的使用接口,所以直接给出参考代码:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#pragma once

#include <iostream>
#include <pthread.h>

namespace ConditionModule
{
    class Condition
    {
    public:
        Condition()
        {
            // 初始化条件变量
            pthread_cond_init(&_cond, NULL);
        }

        void wait(pthread_mutex_t &mutex)
        {
            pthread_cond_wait(&_cond, &mutex);
        }

        void notify()
        {
            // 唤醒一个线程
            pthread_cond_signal(&_cond);
        }

        void notifyAll()
        {
            // 唤醒多个线程
            pthread_cond_broadcast(&_cond);
        }

        ~Condition()
        {
            // 销毁条件变量
            pthread_cond_destroy(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
}

在上面的代码中,需要注意wait函数,因为让线程在条件变量下等待时,pthread_cond_wait接口会释放线程所持有的锁,所以需要让该接口接收一个参数,用于pthread_cond_wait的第二个参数

测试

有了上面的封装,对前面的代码进行修改如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include "cond.hpp"

#include <iostream>
#include <vector>
#include "thread.hpp"
#include "cond.hpp"

using namespace ThreadModule;
using namespace ConditionModule;
#define NUM 4

int tickets = 0;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
Condition cond;

void getTicket()
{
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程:" << pthread_self() % 1000 << "获取到一张票:" << tickets-- << std::endl;
            pthread_mutex_unlock(&lock);
        }
        else
        {
            // 条件不满足时等待
            std::cout << "新线程:" << pthread_self() % 1000 << "等待" << std::endl;
            cond.wait(lock);
            pthread_mutex_unlock(&lock);
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 主线程等待一段时间再放票,看出所有线程都在等待队列,再依次唤醒
    sleep(1);

    // 主线程放票10次,一次放票10张
    int count = 10;
    while (count--)
    {
        tickets = 10;
        std::cout << "主线程放票" << std::endl;
        // 唤醒线程
        cond.notify();
        sleep(1);
    }

    // 结束多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].cancel();

    return 0;
}

结合前面封装的Mutex

在封装的条件变量中唯一需要用到Mutex对象的位置就是让线程等待的接口,并且该接口需要用到锁成员的地址,所以此时还需要在Mutex中提供一个返回锁成员地址的接口:

C++
1
2
3
4
pthread_mutex_t *getLockAddress()
{
    return &_mutex;
}

对封装的条件变量的wait函数修改如下:

C++
1
2
3
4
void wait(Mutex &mutex)
{
    pthread_cond_wait(&_cond, mutex.getLockAddress());
}

测试

有了上面的封装,对前面的代码进行修改如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include "cond.hpp"

#include <iostream>
#include <vector>
#include "thread.hpp"
#include "cond.hpp"
#include "mutex.hpp"

using namespace ThreadModule;
using namespace ConditionModule;
using namespace MutexModule;
#define NUM 4

int tickets = 0;
Mutex lock;
Condition cond;

void getTicket()
{
    while (true)
    {
        MutexGuard mutex(lock);
        // 有票时,抢票,否则直接跳出循环
        if (tickets > 0)
        {
            usleep(1000);
            std::cout << "当前线程:" << pthread_self() % 1000 << "获取到一张票:" << tickets-- << std::endl;
        }
        else
        {
            // 条件不满足时等待
            std::cout << "新线程:" << pthread_self() % 1000 << "等待" << std::endl;
            cond.wait(lock);
        }
    }
}

int main()
{
    // 创建多个线程
    std::vector<Thread> threads;

    for (int i = 0; i < NUM; i++)
        threads.emplace_back(getTicket);

    // 启动多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].start();

    // 主线程等待一段时间再放票,看出所有线程都在等待队列,再依次唤醒
    sleep(1);

    // 主线程放票10次,一次放票10张
    int count = 10;
    while (count--)
    {
        tickets = 10;
        std::cout << "主线程放票" << std::endl;
        // 唤醒线程
        cond.notify();
        sleep(1);
    }

    // 结束多个线程
    for (int i = 0; i < NUM; i++)
        threads[i].cancel();

    return 0;
}

Note

需要注意,在抢票逻辑的死循环中创建的MutexGuard对象会在每一次循环迭代时重新创建,也就是说,该对象会在本次循环体结束时(即进入下一次循环时)被析构,从而释放锁,所以不会出现前面提到的「只有主进程正在放票的输出」