Skip to content

Latest commit

 

History

History
69 lines (60 loc) · 5.34 KB

day10-加入线程池到服务器.md

File metadata and controls

69 lines (60 loc) · 5.34 KB

day10-加入线程池到服务器

今天是本教程的第十天,在之前,我们已经编码完成了一个完整的单线程服务器,最核心的几个模块都已经抽象出来,Reactor事件驱动大体成型(除了线程池),各个类的生命周期也大体上合适了,读者应该完全理解之前的服务器代码后再开始今天的学习。

观察当前的服务器架构,不难发现我们的Reactor模型少了最关键、最重要的一个模块:线程池。当发现socket fd有事件时,我们应该分发给一个工作线程,由这个工作线程处理fd上面的事件。而当前我们的代码是单线程模式,所有fd上的事件都由主线程(也就是EventLoop线程)处理,这是大错特错的,试想如果每一个事件相应需要1秒时间,那么当1000个事件同时到来,EventLoop线程将会至少花费1000秒来传输数据,还有函数调用等其他开销,服务器将直接宕机。

在之前的教程已经讲过,每一个Reactor只应该负责事件分发而不应该负责事件处理。今天我们将构建一个最简单的线程池,用于事件处理。

线程池有许多种实现方法,最容易想到的一种是每有一个新任务、就开一个新线程执行。这种方式最大的缺点是线程数不固定,试想如果在某一时刻有1000个并发请求,那么就需要开1000个线程,如果CPU只有8核或16核,物理上不能支持这么高的并发,那么线程切换会耗费大量的资源。为了避免服务器负载不稳定,这里采用了固定线程数的方法,即启动固定数量的工作线程,一般是CPU核数(物理支持的最大并发数),然后将任务添加到任务队列,工作线程不断主动取出任务队列的任务执行。

关于线程池,需要特别注意的有两点,一是在多线程环境下任务队列的读写操作都应该考虑互斥锁,二是当任务队列为空时CPU不应该不断轮询耗费CPU资源。为了解决第一点,这里使用std::mutex来对任务队列进行加锁解锁。为了解决第二个问题,使用了条件变量std::condition_variable

关于std::functionstd::mutexstd::condition_variable基本使用方法本教程不会涉及到,但读者应当先熟知,可以参考欧长坤《现代 C++ 教程》

线程池定义如下:

class ThreadPoll {
private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex tasks_mtx;
    std::condition_variable cv;
    bool stop;
public:
    ThreadPoll(int size = 10);  // 默认size最好设置为std::thread::hardware_concurrency()
    ~ThreadPoll();
    void add(std::function<void()>);
};

当线程池被构造时:

ThreadPoll::ThreadPoll(int size) : stop(false){
    for(int i = 0; i < size; ++i){  //  启动size个线程
        threads.emplace_back(std::thread([this](){  //定义每个线程的工作函数
            while(true){    
                std::function<void()> task;
                {   //在这个{}作用域内对std::mutex加锁,出了作用域会自动解锁,不需要调用unlock()
                    std::unique_lock<std::mutex> lock(tasks_mtx);
                    cv.wait(lock, [this](){     //等待条件变量,条件为任务队列不为空或线程池停止
                        return stop || !tasks.empty();
                    });
                    if(stop && tasks.empty()) return;   //任务队列为空并且线程池停止,退出线程
                    task = tasks.front();   //从任务队列头取出一个任务
                    tasks.pop();
                }
                task();     //执行任务
            }
        }));
    }
}

当我们需要添加任务时,只需要将任务添加到任务队列:

void ThreadPoll::add(std::function<void()> func){
    { //在这个{}作用域内对std::mutex加锁,出了作用域会自动解锁,不需要调用unlock()
        std::unique_lock<std::mutex> lock(tasks_mtx);
        if(stop)
            throw std::runtime_error("ThreadPoll already stop, can't add task any more");
        tasks.emplace(func);
    }
    cv.notify_one();    //通知一次条件变量
}

在线程池析构时,需要注意将已经添加的所有任务执行完,最好不采用外部的暴力kill、而是让每个线程从内部自动退出,具体实现参考源代码。

这样一个最简单的线程池就写好了,在源代码中,当Channel类有事件需要处理时,将这个事件处理添加到线程池,主线程EventLoop就可以继续进行事件循环,而不在乎某个socket fd上的事件处理。

至此,今天的教程已经结束,一个完整的Reactor模式才正式成型。这个线程池只是为了满足我们的需要构建出的最简单的线程池,存在很多问题。比如,由于任务队列的添加、取出都存在拷贝操作,线程池不会有太好的性能,只能用来学习,正确做法是使用右值移动、完美转发等阻止拷贝。另外线程池只能接受std::function<void()>类型的参数,所以函数参数需要事先使用std::bind(),并且无法得到返回值。针对这些缺点,将会在明天的教程进行修复。

完整源代码:https://github.com/yuesong-feng/30dayMakeCppServer/tree/main/code/day10