javascript和JQuery焦点图和代码特效大全
当前最流行的开源CMS网站系统大全
当前位置:首页 > 编程开发 > C语言C++

使用c++11新特性实现线程池

来源:IT技术网编辑:雨天发布于:2017-07-10人围观

使用c++ 11新特性实现线程池

分析

线程池的主要思想是将任务和执行任务的线程分开,任务可以分配给线程池里的线程执行,线程执行完当前任务之后会查看任务列表是否为空,如果不为空继续取任务执行。

其中,任务其实就是一个函数,执行任务就是执行一个特定函数的过程。所以,任务列表可以用一个存放函数指针的列表来实现。但是列表中的元素必须一致,也就是函数指针形式必须相同,如果用普通的shared_ptr去存放函数指针,无法解决这个问题。而使用c++11中的function和bind则可以很好地实现对函数的封装,使函数形式一致。

执行任务的线程们可以用一个vector来存放,但由于thread是不可复制的,所以不能直接放到vector当中,一个通用的做法是用智能指针封装线程,将智能指针存放到vector当中。

基于以上考虑,我们用lsit来存放任务,使用vector来存放用shared_ptr封装的线程。取并执行任务,添加任务,这是一个典型的生产者消费者问题,所以使用条件变量加互斥锁来控制线程之间的同步。

代码实现

1. thread_pool.h

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<code><code><code><code><code><code><code><code><code>#ifndef MY_THREADPOOL_H
#define MY_THREADPOOL_H
 
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <vector>
#include <list>
#include <functional>
#include <cstdio>
 
class ThreadPool{
public:
    using Task = std::function<void(void)>;
    explicit ThreadPool(int num);
    ~ThreadPool();
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    void append(const Task &task);
    void append(Task &&task);
    void start();
    void stop();
 
private:
    bool isrunning;
    int threadNum;
    void work();
    std::mutex m;
    std::condition_variable cond;
    std::list<task> tasks;
    std::vector<std::shared_ptr<std::thread>> threads;
};
 
#endif
</std::shared_ptr<std::thread></task></void(void)></cstdio></functional></list></vector></memory></condition_variable></mutex></thread></code></code></code></code></code></code></code></code></code>

2. thread_pool.cc

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<code><code><code><code><code><code><code><code><code>#include "thread_pool.h"
 
ThreadPool::ThreadPool(int num) : threadNum(num), isrunning(false),m()
{
}
 
ThreadPool::~ThreadPool()
{
    if (isrunning)
    {
        stop();
    }
}
 
void ThreadPool::start()
{
    isrunning = true;
    threads.reserve(threadNum);
    for (int i = 0; i < threadNum; ++i)
    {
        threads.push_back(std::make_shared<std::thread>(&ThreadPool::work, this));
    }
}
 
void ThreadPool::stop()
{
    //线程池关闭,并通知所有线程可以取任务了
    {
        std::unique_lock<std::mutex> locker(m);
        isrunning = false;
        cond.notify_all();
    }
    for (int i = 0; i < threads.size(); ++i)
    {
        auto t = threads[i];
        if (t->joinable())
            t->join();
    }
}
//添加任务,参数为左值
void ThreadPool::append(const Task &task)
{
    if(isrunning){
        std::unique_lock<std::mutex> locker(m);
        tasks.push_back(task);
        cond.notify_one();
    }
}
//添加任务,参数为一个右值
void ThreadPool::append(Task &&task)
{
    if(isrunning){
        std::unique_lock<std::mutex> locker(m);
        tasks.push_back(std::move(task));
        cond.notify_one();
    }
 
}
 
void ThreadPool::work()
{
    while (isrunning)
    {
        Task task;
        {
            std::unique_lock<std::mutex> locker(m);
            //如果线程池在运行,且任务列表为空,等待任务
            if (isrunning && tasks.empty())
            {
                cond.wait(locker);
            }
            //如果任务列表不为空,无论线程池是否在运行,都需要执行完任务
            if (!tasks.empty())
            {
                task = tasks.front();
                tasks.pop_front();
            }
        }
        if(task)
            task();
    }
}
</std::mutex></std::mutex></std::mutex></std::mutex></std::thread></code></code></code></code></code></code></code></code></code>

3. test.cc

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<code><code><code><code><code><code><code><code><code>#include "thread_pool.h"
void func(int n){
    printf("hello from %d\n", n);
}
 
void test(){
    ThreadPool pool(10);
    pool.start();
 
    for(int i = 0; i < 100; ++i){
        pool.append(std::bind(func, i));
    }
}
 
int main(){
    test();
}</code></code></code></code></code></code></code></code></code>

疑问与总结

多线程对代码细节的要求性很高,因为在单线程环境下可以很容易地模拟出程序执行的过程,但多线程环境下由于多道程序交叉执行,混乱执行,有些在单线程环境下运行地很好的代码放到多线程环境下就会发生莫名崩溃或者卡住的情况。这个时候就需要仔细考虑临界区的实现以及细节,考虑什么时候加锁,什么时候解锁,什么时候通知等待线程,以及什么时候开始等待。

这个线程池一开始参考的代码并不能很好地在多线程环境下运行,考虑了很久不知道问题在哪里,后来参考了陈硕的muduo库中线程池的实现,进行了以下修改,终于变成了能用的样子:

work函数中修改等待的条件,从之前的if(tasks.empty())修改为if(isrunning && tasks.empty())。 在线程池中线程join之前通知所有线程去任务列表中取任务,防止有些线程阻塞在work函数中; 添加参数为右值的append函数; 

与相关的文章
有时间的话来看看IT界的突发事件