> 文章列表 > muduo源码剖析--Thread/EventLoopThread/EventLoopThreadPool

muduo源码剖析--Thread/EventLoopThread/EventLoopThreadPool

muduo源码剖析--Thread/EventLoopThread/EventLoopThreadPool

Thread类

muduo网络库的基础线程类,封装了线程的基本操作。

class Thread : noncopyable
{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = std::string());~Thread();void start();  //开启线程void join();   //回收线程bool started() { return started_; }pid_t tid() const { return tid_; } 			const std::string &name() const { return name_; } //获取线程名static int numCreated() { return numCreated_; } //获取创建线程的个数private:void setDefaultName();bool started_;bool joined_;std::shared_ptr<std::thread> thread_; //这里采取的是c11提供的线程类pid_t tid_;       // 在线程创建时再绑定ThreadFunc func_; // 线程回调函数std::string name_;static std::atomic_int numCreated_;
};

构造函数:

Thread::Thread(ThreadFunc func, const std::string &name): started_(false), joined_(false), tid_(0), func_(std::move(func))  //c11移动构造, name_(name)
{setDefaultName();
}
//若未设置初始名字,则起一个默认的名字
void Thread::setDefaultName()
{int num = ++numCreated_;if (name_.empty()){char buf[32] = {0};snprintf(buf, sizeof buf, "Thread%d", num);name_ = buf;}
}

开启线程:

void Thread::start()                                                        // 一个Thread对象 记录的就是一个新线程的详细信息
{started_ = true;sem_t sem;//这里直接使用信号量,muduo源码采取的是互斥锁加条件变量实现sem_init(&sem, false, 0);                                               // false指的是 不设置进程间共享// 开启线程thread_ = std::shared_ptr<std::thread>(new std::thread([&]() {tid_ = CurrentThread::tid();                                        // 获取线程的tid值sem_post(&sem); //用于唤醒下面的sem_wait()func_();                                                            // 开启一个新线程 专门执行该线程函数}));// 这里必须等待获取上面新创建的线程的tid值sem_wait(&sem);
}

回收线程:

// C++ std::thread 中join()和detach()的区别:https://blog.nowcoder.net/n/8fcd9bb6e2e94d9596cf0a45c8e5858a
void Thread::join()
{joined_ = true;thread_->join();
}

释放资源:

Thread::~Thread()
{if (started_ && !joined_){thread_->detach();          // thread类提供了设置分离线程的方法 线程运行后自动销毁(非阻塞)}
}

EventLoopThread类

将Thred与EventLoop绑定在一起的类,用于执行子Reactor模式,一个EventLoop对象在一个线程中执行。

class EventLoop;class EventLoopThread : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>; //类似于一个上层回调,用于在线程开启时做的初始化操作EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(),const std::string &name = std::string());~EventLoopThread();EventLoop *startLoop();private:void threadFunc();EventLoop *loop_;	//绑定的loop对象bool exiting_;Thread thread_;		//绑定的线程类std::mutex mutex_;             // 互斥锁std::condition_variable cond_; // 条件变量ThreadInitCallback callback_;
};

构造函数:

	EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,const std::string &name): loop_(nullptr)	 //初始化为空,在线程函数里面进行初始化, exiting_(false), thread_(std::bind(&EventLoopThread::threadFunc, this), name) 	//初始化绑定的线程,及其要执行的回调函数, mutex_(), cond_(), callback_(cb)
{
}

开启一个事件循环,事件循环在线程中执行

EventLoop *EventLoopThread::startLoop()
{thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(mutex_);while(loop_ == nullptr) //阻塞,直到在线程函数中唤醒{cond_.wait(lock);}loop = loop_;}return loop;
}

线程函数,在创建的线程中执行上层的初始化回调,并创建属于这个线程的EventLoop对象,随后通过startLoop方法将这个对象返回给主调线程

// 下面这个方法 是在单独的新线程里运行的
void EventLoopThread::threadFunc()
{EventLoop loop; // 创建一个独立的EventLoop对象 和上面的线程是一一对应的 级one loop per threadif (callback_){callback_(&loop);}{std::unique_lock<std::mutex> lock(mutex_);loop_ = &loop;cond_.notify_one();}loop.loop();    // 执行EventLoop的loop() 开启了底层的Poller的poll(),阻塞状态std::unique_lock<std::mutex> lock(mutex_);loop_ = nullptr; //意味着事件循环已经结束
}

释放:

EventLoopThread::~EventLoopThread()
{exiting_ = true;if (loop_ != nullptr){loop_->quit(); //退出循环thread_.join(); //回收线程}
}

EventLoopThreadPool类

EventLoopThread线程池类,用于创建多个loop对象和thread类绑定的对象,主Loop(主Reactor)采取轮询的操作下发channel给子loop(子Reactor)。

class EventLoop;
class EventLoopThread;class EventLoopThreadPool : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }  //设置线程个数void start(const ThreadInitCallback &cb = ThreadInitCallback());// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoopEventLoop *getNextLoop();std::vector<EventLoop *> getAllLoops();bool started() const { return started_; }const std::string name() const { return name_; }private:EventLoop *baseLoop_; // 用户使用muduo创建的loop 如果线程数为1 那直接使用用户创建的loop 否则创建多EventLoopstd::string name_;bool started_;int numThreads_;  //线程池中线程的个数int next_; // 轮询的下标std::vector<std::unique_ptr<EventLoopThread>> threads_;	 std::vector<EventLoop *> loops_;	//存放所有loop对象的数组(为了轮询得到子loop)
};

构造函数:
主要就是初始化baseloop对象(主Reactor)

EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg): baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(0), next_(0)
{
}

start()方法初始化线程池并执行

void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{started_ = true;//多Reactor方式,一个主Reactor多个子Reactorfor(int i = 0; i < numThreads_; ++i)  //循环创建EventLoopThread线程类{char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread *t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop());                           // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址}//单Reactor模式if(numThreads_ == 0 && cb)                                      // 整个服务端只有一个线程运行baseLoop{cb(baseLoop_);}
}

getNextLoop()方法实现具体的轮询操作

// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{EventLoop *loop = baseLoop_;    // 如果只设置一个线程 也就是只有一个mainReactor 无subReactor 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_if(!loops_.empty())             // 通过轮询获取下一个处理事件的loop{loop = loops_[next_];++next_;if(next_ >= loops_.size()){next_ = 0;}}return loop;
}

getAllLoops()获取创建的所有loop对象

std::vector<EventLoop *> EventLoopThreadPool::getAllLoops()
{if(loops_.empty()){return std::vector<EventLoop *>(1, baseLoop_);}else{return loops_;}
}

释放操作:什么都不做,由于loop对象在子线程中创建的是栈对象,所以不需要释放,且线程类EventLoopThreadPool对象由智能指针管理也无需释放。

EventLoopThreadPool::~EventLoopThreadPool()
{// Don't delete loop, it's stack variable
}