> 文章列表 > muduo源码剖析--EventLoop类

muduo源码剖析--EventLoop类

muduo源码剖析--EventLoop类

EventLoop类

Reactor模式的实现类,连通Channel类和Poller类的桥梁,也是上层注册和回调的实际调用类。

// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 开启事件循环void loop();// 退出事件循环void quit();Timestamp pollReturnTime() const { pollRetureTime_; }// 在当前loop中执行void runInLoop(Functor cb);// 把上层注册的回调函数cb放入队列中 唤醒loop所在的线程执行cbvoid queueInLoop(Functor cb);// 通过eventfd唤醒loop所在的线程void wakeup();// EventLoop的方法 => Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判断EventLoop对象是否在自己的线程里bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // threadId_为EventLoop创建时的线程id CurrentThread::tid()为当前线程idprivate:void handleRead();        // 给eventfd返回的文件描述符wakeupFd_绑定的事件回调 当wakeup()时 即有事件发生时 调用handleRead()读wakeupFd_的8字节 同时唤醒阻塞的epoll_waitvoid doPendingFunctors(); // 执行上层回调using ChannelList = std::vector<Channel *>;std::atomic_bool looping_; // 原子操作 底层通过CAS实现std::atomic_bool quit_;    // 标识退出loop循环const pid_t threadId_; // 记录当前EventLoop是被哪个线程id创建的 即标识了当前EventLoop的所属线程idTimestamp pollRetureTime_; // Poller返回发生事件的Channels的时间点std::unique_ptr<Poller> poller_;int wakeupFd_; // 作用:当mainLoop获取一个新用户的Channel 需通过轮询算法选择一个subLoop 通过该成员唤醒subLoop处理Channelstd::unique_ptr<Channel> wakeupChannel_;ChannelList activeChannels_; // 返回Poller检测到当前有事件发生的所有Channel列表std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作std::vector<Functor> pendingFunctors_;    // 存储loop需要执行的所有回调操作std::mutex mutex_;                        // 互斥锁 用来保护上面vector容器的线程安全操作
};

构造函数实现

EventLoop::EventLoop(): looping_(false), quit_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_))  //用于绑定wakeFd的通道
{LOG_DEBUG("EventLoop created %p in thread %d\\n", this, threadId_);if (t_loopInThisThread){LOG_FATAL("Another EventLoop %p exists in this thread %d\\n", t_loopInThisThread, threadId_);}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this)); // 设置wakeupfd的事件类型以及发生事件后的回调操作wakeupChannel_->enableReading(); // 每一个EventLoop都将监听wakeupChannel_的EPOLL读事件了
}

CurrentThread::tid()函数获取当前线程的id,关联isInLoopThread(),用于判断当前Loop对象调用是否在本线程中

namespace CurrentThread
{extern __thread int t_cachedTid; // 保存tid缓存 因为系统调用非常耗时 拿到tid后将其保存void cacheTid();inline int tid() // 内联函数只在当前文件中起作用{if (__builtin_expect(t_cachedTid == 0, 0)) // __builtin_expect 是一种底层优化 此语句意思是如果还未获取tid 进入if 通过cacheTid()系统调用获取tid{cacheTid();}return t_cachedTid;}
}namespace CurrentThread
{__thread int t_cachedTid = 0;void cacheTid(){if (t_cachedTid == 0){t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid)); //系统调用拿到线程id,并缓存在t_cachedTid }}
}

poller_变量即是当前EventLopp对象所执行IO对路复用的方式实例,默认通过newDefaultPoller()方法获取

Poller *Poller::newDefaultPoller(EventLoop *loop)
{if (::getenv("MUDUO_USE_POLL")){return nullptr; // 生成poll的实例}else{return new EPollPoller(loop); // 生成epoll的实例}
}

createEventfd()方法用于创建wakeFd,创建wakeupfd 用来notify唤醒subReactor处理新来的channel, 需要在构造函数之中监听对应wakefd的读事件,并设置读回调。
这是wakefd设计的一个巧妙之处,考虑一个主Reactor多个subReactor场景,上层注册一个紧急回调让这个EventLoop对象分发给子Reactor去执行,但是子Reactor所监测的通道并没有关心的活跃事件,那么会阻塞到poll()方法(实际上阻塞在epoll_wait()方法),导致并不会立即去执行上层注册的紧急回调,需直到有读写事件发生之后才能解除阻塞。这样的话,wakefd的作用就来了,再上层注册紧急回调但当前又阻塞再epoll_wait时,会调用wakeup()方法会控制当前子Reactor向wakefd写入数据,前面说过这个EventLoop对象在构造之前监听了这个wakefd的读事件并设置了相应的回调,那么当前的这个子Reactor监听到活跃事件会解除阻塞,执行这个wakefd的回调,随后便能执行上层的回调了。

int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_FATAL("eventfd error:%d\\n", errno);}return evtfd;
}

上层应用通过runInLoop()方法注册回调给Reactor

// 在当前loop中执行cb
void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()) // 当前EventLoop中执行回调,考虑一个Reactor模式的场景,所有回调均在这个loop对象中执行{cb();}else // 在非当前EventLoop线程中执行cb,就需要唤醒EventLoop所在线程执行cb{queueInLoop(cb);}
}

queueInLoop()方法把上层的回调cb放入队列中,子Reactor唤醒loop所在的线程执行上层cb

void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.emplace_back(cb);}/*** || callingPendingFunctors的意思是 当前loop正在执行回调中 但是loop的pendingFunctors_中又加入了新的回调 需要通过wakeup写事件* 唤醒相应的需要执行上面回调操作的loop的线程 让loop()下一次poller_->poll()不再阻塞(阻塞的话会延迟前一次新加入的回调的执行),然后* 继续执行pendingFunctors_中的回调函数**/if (!isInLoopThread() || callingPendingFunctors_){wakeup(); // 唤醒loop所在线程}
}

wakeup方法,写wakefd操作唤醒loop

// 用来唤醒loop所在线程 向wakeupFd_写一个数据 wakeupChannel就发生读事件 当前loop线程就会被唤醒
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = write(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)){LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\\n", n);}
}//读回调
void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)){LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\\n", n);}
}

核心方法loop(),用于开启事件循环,在里面会调用poller实例监听相应的通道,poller实例实际上调用的是由具体子类实现的poll()方法进行IO事件的多路复用,直到监测的通道有感兴趣的事件发生,那么会解除阻塞获取到对应的活动channel列表,遍历这个列表调用通道所关心的事件回调,最后执行上层注册的回调。

// 开启事件循环
void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO("EventLoop %p start looping\\n", this);while (!quit_){activeChannels_.clear();pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件channel->handleEvent(pollRetureTime_);}/*** 执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:* accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理** mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒**/doPendingFunctors();}LOG_INFO("EventLoop %p stop looping.\\n", this);looping_ = false;
}

doPendingFunctors()函数实际执行上层注册的回调,这里是存在线程安全问题的,所以需要用到互斥锁,这里在实际执行上层回调操作时有一个巧妙之处,先加锁将当前存有回调的队列,并将需要执行的回调cb全部换出到一个栈对象之中,换出操作结束之后,执行这个栈对象的回调队列即可,这样很好的提升了性能,避免了频繁的加锁解锁操作。

void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_); // 交换的方式减少了锁的临界区范围 提升效率 同时避免了死锁 如果执行functor()在临界区内 且functor()中调用queueInLoop()就会产生死锁}for (const Functor &functor : functors){functor(); // 执行当前loop需要执行的回调操作}callingPendingFunctors_ = false;
}

xxxxChannel()方法实际调用的时poller实例的xxxxChannel()方法

// EventLoop的方法 => Poller的方法
void EventLoop::updateChannel(Channel *channel)
{poller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel *channel)
{poller_->removeChannel(channel);
}bool EventLoop::hasChannel(Channel *channel)
{return poller_->hasChannel(channel);
}

结束事件循环quit()方法

/*** 退出事件循环* 1. 如果loop在自己的线程中调用quit成功了 说明当前线程已经执行完毕了loop()函数的poller_->poll并退出* 2. 如果不是当前EventLoop所属线程中调用quit退出EventLoop 需要唤醒EventLoop所属线程的epoll_wait** 比如在一个subloop(worker)中调用mainloop(IO)的quit时 需要唤醒mainloop(IO)的poller_->poll 让其执行完loop()函数** !!! 注意: 正常情况下 mainloop负责请求连接 将回调写入subloop中 通过生产者消费者模型即可实现线程安全的队列* !!!       但是muduo通过wakeup()机制 使用eventfd创建的wakeupFd_ notify 使得mainloop和subloop之间能够进行通信**/
void EventLoop::quit()
{quit_ = true;if (!isInLoopThread()){wakeup();}
}

释放操作

EventLoop::~EventLoop()
{wakeupChannel_->disableAll(); // 给Channel移除所有感兴趣的事件wakeupChannel_->remove();     // 把Channel从EventLoop上删除掉::close(wakeupFd_);t_loopInThisThread = nullptr;
}

幼峰? 部落格