C++11线程池


一、线程库

进程:运行中的程序

线程:进程中的进程

std::thread t(func,arg) 函数参数

  • func 调用的函数
  • arg 调用的函数的参数

实现示例

#include <iostream>
#include <thread>
#include <unistd.h>

void printfH(){
    while(1){
        std::cout<<"hello"<<std::endl;
        sleep(1);
    }
}

int main() {
    //创建线程
    std::thread thread1(printfH);
    return  0;
}

运行错误

错误原因:

再main函数中thread1开启了一个线程,但是执行完这一句就直接return 0了。

解决方法:

join()

#include <iostream>
#include <thread>
#include <unistd.h>

void printfH(){
    while(1){
        std::cout<<"hello"<<std::endl;
        sleep(1);
    }
}

int main() {
    //创建线程
    std::thread thread1(printfH);
 	thread1.join();	//解决方法
 	return  0;
}

调用join方法,这个函数会判断线程是否运行完毕,如果没有运行完成则就不return 0;

调用一个带有参数的函数

#include <iostream>
#include <thread>
#include <unistd.h>
#include <string>

void printfH(std::string msg){
    while(1){
        std::cout<<msg<<std::endl;
        sleep(1);
    }
}

int main() {
    std::string msg;
    std::cin>>msg;
    std::thread thread1(printfH,msg); //这里
 	thread1.join();
 	return  0;
}

detach()

线程分离

有一种应用场景是我希望主线程可以正常的运行完,但是子线程并不关闭,detach()方法

#include <iostream>
#include <thread>
#include <unistd.h>
#include <string>

void printfH(std::string msg){
    while(1){
        std::cout<<msg<<std::endl;
        sleep(1);
    }
}

int main() {
    std::string msg;
    std::cin>>msg;
    std::thread thread1(printfH,msg);
 	thread1.detach(); //这里
 	return  0;
}

运行结果并不会报错,而且终端的运行结果

test detach
test 

这里还没有打印完整个字符串就退出主程序了,并没有报错是因为子程序在后台运行中

joinable()

有一些线程并不能使用join,joinable就是用来判断那些不能使用的线程,返回值是一个bool类型

有两种情况下不能使用 join 函数:

  • 当线程是以分离状态(detached)创建时,不能使用 join。分离状态的线程是指该线程的资源会在其执行结束后自动释放,不需要其他线程调用 join 来等待其结束。如果尝试对分离状态的线程调用 join,会导致运行时错误。
  • 当线程对象已经被移动(moved)到另一个线程对象时,不能再对原始线程对象调用 join。因为线程对象只能与一个线程关联,一旦线程对象被移动,它就不再与原始线程关联,因此无法使用 join。

示例:

#include <iostream>
#include <thread>
#include <unistd.h>
#include <string>

void printfH(std::string msg){
    std::cout<<msg<<std::endl;
}
int main() {
    std::string msg;
    std::cin>>msg;
    std::thread thread1(printfH,msg);
 	thread1.detach(); //这里
 	bool isJoin = thread1.joinable();
    if (isJoin) {
        std::cout<<"success join"<<std::endl;
        thread1.join();     //如果是可以join的就调用join
    }else{
        std::cout<<"faile join"<<std::endl; //线程分离的是不能使用join的
    }
    return  0;
}

输出结果:

faile join

二、互斥量

互斥锁

在多个线程中共享数据是,需要注意线程安全问题,如果多个线程同时访问同一个变量,并且其中至少一个线程进行了写操作,则出现数据竞争问题。

示例:

#include <iostream>
#include <thread>

int a = 0;

void func(){
    for(int i = 0;i<100000;i++) {	//这里<100000,如果是<1000之类的由于数据量不是很大所以运行结果可能是正确的
        a += 1;
    }
}

int main() {
    std::thread t1(func);
    std::thread t2(func);
    t1.join();
    t2.join();
    std::cout<<a<<std::endl;
    return 0;
}

这段代码的预取值是200000但是得到的结果每次运行都不一致,这就是两个线程数据竞争导致的,可以使用互斥锁解决

#include <iostream>
#include <thread>
#include <mutex>

int a = 0;

std::mutex mtx;	//定义
void func(){
    for(int i = 0;i<100000;i++) {
        mtx.lock();//加锁
        a += 1;
        mtx.unlock();//解锁
    }
}

int main() {
    std::thread t1(func);
    std::thread t2(func);
    t1.join();
    t2.join();
    std::cout<<a<<std::endl;
    return 0;
}

代码在执行a+1的时候加一个锁,别的线程就不能操作,只有解锁了才能继续操作

死锁

场景

现在有两个互斥锁

t1先将mtx1加锁,然后对mtx2加锁

t2先将mtx2加锁,然后对mtx1加锁。

这个时候就出现死锁的情况,t1在获取mtx2时,由于t2已经将mtx2加锁,所以t1就等待t2释放这个锁,反之t2也在等待t1释放mtx1锁。

示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <unistd.h>

int a = 0;

std::mutex mtx1,mtx2;
void func1(){
    mtx1.lock();//加锁
    sleep(1);	
    mtx2.lock();//加锁

    mtx1.unlock();//解锁
    mtx2.unlock();//解锁
}

void func2(){
    mtx2.lock();//加锁
    mtx1.lock();//加锁

    mtx2.unlock();//解锁
    mtx1.unlock();//解锁
}

int main() {
    std::thread t1(func1);
    std::thread t2(func2);
    t1.join();
    t2.join();
    std::cout<<"test"<<std::endl;
    return 0;
}

程序卡死

解决方法,修改顺序就行

#include <iostream>
#include <thread>
#include <mutex>
#include <unistd.h>

int a = 0;

std::mutex mtx1,mtx2;
void func1(){
    mtx1.lock();//加锁
    sleep(1);
    mtx2.lock();//加锁

    mtx1.unlock();//解锁
    mtx2.unlock();//解锁
}

void func2(){
    mtx1.lock();//加锁
    mtx2.lock();//加锁

    mtx1.unlock();//解锁
    mtx2.unlock();//解锁
}

int main() {
    std::thread t1(func1);
    std::thread t2(func2);
    t1.join();
    t2.join();
    std::cout<<"test"<<std::endl;
    return 0;
}

其实这看起来蛮简单,但是在实际应用中由于项目的代码量过大,时不时的一没注意就会产生这种问题。

lock_guard

这是标准库中互斥锁的一个封装。

#include <iostream>
#include <thread>
#include <mutex>

int a = 0;

std::mutex mtx;	//定义
void func(){
    for(int i = 0;i<100000;i++) {
        std::lock_guard<std::mutex> lg(mtx);	//自动加锁,当运行完自动解锁
        a += 1;
    }
}

int main() {
    std::thread t1(func);
    std::thread t2(func);
    t1.join();
    t2.join();
    std::cout<<a<<std::endl;
    return 0;
}

源码

class lock_guard
{
public:
  typedef _Mutex mutex_type;

  explicit lock_guard(mutex_type& __m) : _M_device(__m)
  { _M_device.lock(); }	//加锁

  lock_guard(mutex_type& __m, adopt_lock_t) noexcept : _M_device(__m)
  { } // 函数重载,说明这个锁已经上锁,不需要再加锁

  ~lock_guard()
  { _M_device.unlock(); }	//析构函数:解锁

  lock_guard(const lock_guard&) = delete;
  lock_guard& operator=(const lock_guard&) = delete;

private:
  mutex_type&  _M_device;
};

std::unique_lock

基本用法跟lock_guard基本一致,不过新增了几个用法

std::unique_lock<std::timed_mutex> lg(mtx); 自动加锁,自动解锁

std::unique_lock<std::timed_mutex> lg(mtx,std::defer_lock); 手动加锁,自动解锁

可手动加的三种锁

1.普通加锁

lock() 普通加锁,如果没有等到就一直等

2.延时加锁

try_lock_for 比如死锁情况如果没有获取当前锁就会一直等,这个锁会等特定的时间,比如5秒没有等到就继续执行下面的函数了

#include <iostream>
#include <thread>
#include <mutex>

int a = 0;

std::timed_mutex mtx1;	//定义
std::timed_mutex mtx2;	//定义
void func1(){
    for(int i = 0;i<1000000;i++) {
        std::unique_lock<std::timed_mutex> lg(mtx1); 
        std::unique_lock<std::timed_mutex> lg2(mtx2); 
        a += 1;
    }
}
void func2(){
    for(int i = 0;i<1000000;i++) {
        std::unique_lock<std::timed_mutex> lg(mtx2,std::defer_lock); //第二个参数表示不自动加锁,我自己手动加锁,自动加锁的是普通锁,手动可以加时间锁
        lg.try_lock_for(std::chrono::seconds(2));
        a += 1;
    }
}

int main() {
    std::thread t1(func1);
    std::thread t2(func2);
    t1.join();
    t2.join();
    std::cout<<a<<std::endl;
    return 0;
}

注意时间锁要使用std::timed_mutex

3.根据时间加锁

try_lock_until 比如设定到12点如果这个锁还没获取到就继续执行

三、线程池实现

生产消费模型

线程池模型

#include <iostream>
#include <thread>
#include <memory>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <functional>

class ThreadPool {
public:
    // 构造函数,用给定的线程数初始化线程池
    ThreadPool(int numThreads) : stop(false) {
        // 创建指定数量的线程并将它们添加到线程池中
        for (int i = 0; i < numThreads; i++) {
            threads.emplace_back([this] {
                // 每个线程在循环中运行,直到设置了停止标志
                while (1) {
                    std::unique_lock<std::mutex> lock(mtx);
                    // 等待任务被添加到队列中或停止标志被设置
                    condition.wait(lock, [this] {
                        return !tasks.empty() || stop;
                    });
                    // 如果设置了停止标志并且没有更多任务,则退出线程
                    if (stop && tasks.empty()) {
                        return;
                    }
                    // 从队列中获取下一个任务并解锁互斥锁
                    std::function<void()> task(std::move(tasks.front()));
                    tasks.pop();
                    lock.unlock();
                    // 执行任务
                    task();
                }
            });
        }
    }

    // 析构函数,停止所有线程并等待它们完成
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        condition.notify_all();
        for (auto& t : threads) {
            t.join();
        }
    }

    // 将新任务添加到队列中
    template <class F, class... Args>
    void enqueue(F&& f, Args&&... args) {
        // 通过绑定函数和其参数创建一个新任务
        std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        {
            std::unique_lock<std::mutex> lock(mtx);
            // 将任务添加到队列中并通知等待的线程
            tasks.emplace(std::move(task));
        }
        condition.notify_one();
    }

private:
    std::vector<std::thread> threads; // 线程池中的线程
    std::queue<std::function<void()>> tasks; // 任务队列

    std::mutex mtx; // 用于同步访问任务队列的互斥锁
    std::condition_variable condition; // 用于等待新任务的条件变量
    bool stop; // 用于向线程发出停止信号的标志
};

int main() {
    ThreadPool pool(4); // 创建一个包含4个线程的线程池
    for (int i = 0; i < 8; ++i) {
        // 向线程池添加一个任务,该任务打印一条消息并休眠1秒钟
        pool.enqueue([i] {
            std::cout << "hello " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::cout << "world " << i << std::endl;
        });
    }
    return 0;
}

常见错误

传递临时变量

解决方法:ref(msg)

错误示例:

#include <iostream>
#include <thread>
#include <unistd.h>
#include <string>

void printfH(std::string &msg){
    while(1){
        std::cout<<msg<<std::endl;
        sleep(5);
    }
}

int main() {
    std::string msg;
    std::cin>>msg;	//第一次输入的值 first
    std::thread thread1(printfH,msg);	//区别在这
    std::cin>>msg;	//第二次输入的值 second
 	thread1.join();
 	return  0;
}

编译都不能通过!

正确示例:

#include <iostream>
#include <thread>
#include <unistd.h>
#include <string>

void printfH(std::string &msg){
    while(1){
        std::cout<<msg<<std::endl;
        sleep(5);
    }
}

int main() {
    std::string msg;
    std::cin>>msg;	//第一次输入的值 first
    std::thread thread1(printfH,std::ref(msg));//区别在这
    std::cin>>msg;	//第二次输入的值 second
 	thread1.join();
 	return  0;
}

image-20231027184152436

野指针

函数自动释放

#include <iostream>
#include <thread>
#include <unistd.h>

std::thread t;

void add(int &a){
    sleep(3);	//等待三秒,让test先执行完
    a+=1;
    std::cout<<a<<std::endl;
}
void test() {
    int a = 1;
    t = std::thread(add,std::ref(a));
    std::cout<<"test结束"<<std::endl;
}

int main() {
    test();
 	t.join();
 	return  0;
}

test先运行完

#include <iostream>
#include <thread>
#include <unistd.h>

std::thread t;

void add(int &a){
    a+=1;
    std::cout<<a<<std::endl;
}
void test() {
    int a = 1;
    t = std::thread(add,std::ref(a));
    sleep(3);//等待三秒,让test先执行完
    std::cout<<"test结束"<<std::endl;
}

int main() {
    test();
 	t.join();
 	return  0;
}

线程先运行完

由于int a是test里的局部变量,当test运行完int a就会释放掉,这时候a里的引用就是一个垃圾值。所以第一种得到的a值就是错误的

手动释放

#include <iostream>
#include <thread>
#include <unistd.h>

std::thread t;
void add(int *a){
	sleep(3);	//让delete prt先执行
    std::cout<<*a<<std::endl;
}

int main() {
    int* ptr = new int(1);
    t = std::thread(add,ptr);
    delete ptr;
 	t.join();
 	return  0;
}

原理和第一种相同,不过这里是自己手动释放,如果在t运行结束就释放ptr得到的也是垃圾值。

所以我们所期望的是当线程执行完了再释放指针

解决方法

使用智能指针

#include <iostream>
#include <thread>
#include <unistd.h>
#include <memory>

void func(int *a){
    std::cout<<*a<<std::endl;
}

int main() {
    std::shared_ptr<int> a = std::make_shared<int>(10);   //创建一个智能指针
    std::thread t (func, a.get());
    t.join();
    return 0;
}

文章作者: zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 zhang !
  目录