muduo源码剖析--Thread/EventLoopThread/EventLoopThreadPool
Thread类
muduo网络库的基础线程类,封装了线程的基本操作。
class Thread : noncopyable
{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = std::string());~Thread();void start(); //开启线程void join(); //回收线程bool started() { return started_; }pid_t tid() const { return tid_; } const std::string &name() const { return name_; } //获取线程名static int numCreated() { return numCreated_; } //获取创建线程的个数private:void setDefaultName();bool started_;bool joined_;std::shared_ptr<std::thread> thread_; //这里采取的是c11提供的线程类pid_t tid_; // 在线程创建时再绑定ThreadFunc func_; // 线程回调函数std::string name_;static std::atomic_int numCreated_;
};
构造函数:
Thread::Thread(ThreadFunc func, const std::string &name): started_(false), joined_(false), tid_(0), func_(std::move(func)) //c11移动构造, name_(name)
{setDefaultName();
}
//若未设置初始名字,则起一个默认的名字
void Thread::setDefaultName()
{int num = ++numCreated_;if (name_.empty()){char buf[32] = {0};snprintf(buf, sizeof buf, "Thread%d", num);name_ = buf;}
}
开启线程:
void Thread::start() // 一个Thread对象 记录的就是一个新线程的详细信息
{started_ = true;sem_t sem;//这里直接使用信号量,muduo源码采取的是互斥锁加条件变量实现sem_init(&sem, false, 0); // false指的是 不设置进程间共享// 开启线程thread_ = std::shared_ptr<std::thread>(new std::thread([&]() {tid_ = CurrentThread::tid(); // 获取线程的tid值sem_post(&sem); //用于唤醒下面的sem_wait()func_(); // 开启一个新线程 专门执行该线程函数}));// 这里必须等待获取上面新创建的线程的tid值sem_wait(&sem);
}
回收线程:
// C++ std::thread 中join()和detach()的区别:https://blog.nowcoder.net/n/8fcd9bb6e2e94d9596cf0a45c8e5858a
void Thread::join()
{joined_ = true;thread_->join();
}
释放资源:
Thread::~Thread()
{if (started_ && !joined_){thread_->detach(); // thread类提供了设置分离线程的方法 线程运行后自动销毁(非阻塞)}
}
EventLoopThread类
将Thred与EventLoop绑定在一起的类,用于执行子Reactor模式,一个EventLoop对象在一个线程中执行。
class EventLoop;class EventLoopThread : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>; //类似于一个上层回调,用于在线程开启时做的初始化操作EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(),const std::string &name = std::string());~EventLoopThread();EventLoop *startLoop();private:void threadFunc();EventLoop *loop_; //绑定的loop对象bool exiting_;Thread thread_; //绑定的线程类std::mutex mutex_; // 互斥锁std::condition_variable cond_; // 条件变量ThreadInitCallback callback_;
};
构造函数:
EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,const std::string &name): loop_(nullptr) //初始化为空,在线程函数里面进行初始化, exiting_(false), thread_(std::bind(&EventLoopThread::threadFunc, this), name) //初始化绑定的线程,及其要执行的回调函数, mutex_(), cond_(), callback_(cb)
{
}
开启一个事件循环,事件循环在线程中执行
EventLoop *EventLoopThread::startLoop()
{thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(mutex_);while(loop_ == nullptr) //阻塞,直到在线程函数中唤醒{cond_.wait(lock);}loop = loop_;}return loop;
}
线程函数,在创建的线程中执行上层的初始化回调,并创建属于这个线程的EventLoop对象,随后通过startLoop方法将这个对象返回给主调线程
// 下面这个方法 是在单独的新线程里运行的
void EventLoopThread::threadFunc()
{EventLoop loop; // 创建一个独立的EventLoop对象 和上面的线程是一一对应的 级one loop per threadif (callback_){callback_(&loop);}{std::unique_lock<std::mutex> lock(mutex_);loop_ = &loop;cond_.notify_one();}loop.loop(); // 执行EventLoop的loop() 开启了底层的Poller的poll(),阻塞状态std::unique_lock<std::mutex> lock(mutex_);loop_ = nullptr; //意味着事件循环已经结束
}
释放:
EventLoopThread::~EventLoopThread()
{exiting_ = true;if (loop_ != nullptr){loop_->quit(); //退出循环thread_.join(); //回收线程}
}
EventLoopThreadPool类
EventLoopThread线程池类,用于创建多个loop对象和thread类绑定的对象,主Loop(主Reactor)采取轮询的操作下发channel给子loop(子Reactor)。
class EventLoop;
class EventLoopThread;class EventLoopThreadPool : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; } //设置线程个数void start(const ThreadInitCallback &cb = ThreadInitCallback());// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoopEventLoop *getNextLoop();std::vector<EventLoop *> getAllLoops();bool started() const { return started_; }const std::string name() const { return name_; }private:EventLoop *baseLoop_; // 用户使用muduo创建的loop 如果线程数为1 那直接使用用户创建的loop 否则创建多EventLoopstd::string name_;bool started_;int numThreads_; //线程池中线程的个数int next_; // 轮询的下标std::vector<std::unique_ptr<EventLoopThread>> threads_; std::vector<EventLoop *> loops_; //存放所有loop对象的数组(为了轮询得到子loop)
};
构造函数:
主要就是初始化baseloop对象(主Reactor)
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg): baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(0), next_(0)
{
}
start()方法初始化线程池并执行
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{started_ = true;//多Reactor方式,一个主Reactor多个子Reactorfor(int i = 0; i < numThreads_; ++i) //循环创建EventLoopThread线程类{char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread *t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop()); // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址}//单Reactor模式if(numThreads_ == 0 && cb) // 整个服务端只有一个线程运行baseLoop{cb(baseLoop_);}
}
getNextLoop()方法实现具体的轮询操作
// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{EventLoop *loop = baseLoop_; // 如果只设置一个线程 也就是只有一个mainReactor 无subReactor 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_if(!loops_.empty()) // 通过轮询获取下一个处理事件的loop{loop = loops_[next_];++next_;if(next_ >= loops_.size()){next_ = 0;}}return loop;
}
getAllLoops()获取创建的所有loop对象
std::vector<EventLoop *> EventLoopThreadPool::getAllLoops()
{if(loops_.empty()){return std::vector<EventLoop *>(1, baseLoop_);}else{return loops_;}
}
释放操作:什么都不做,由于loop对象在子线程中创建的是栈对象,所以不需要释放,且线程类EventLoopThreadPool对象由智能指针管理也无需释放。
EventLoopThreadPool::~EventLoopThreadPool()
{// Don't delete loop, it's stack variable
}