muduo源码剖析--TcpConnection
TcpConnection类
封装了一个个的tcp连接,实现了socket的四种回调,以及注册一些上层的回调
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr);~TcpConnection();EventLoop *getLoop() const { return loop_; }const std::string &name() const { return name_; }const InetAddress &localAddress() const { return localAddr_; }const InetAddress &peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }// 发送数据void send(const std::string &buf);// 关闭连接void shutdown();//设置上层回调void setConnectionCallback(const ConnectionCallback &cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb){ messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb){ writeCompleteCallback_ = cb; }void setCloseCallback(const CloseCallback &cb){ closeCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark){ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }// 连接建立void connectEstablished();// 连接销毁void connectDestroyed();private:enum StateE{kDisconnected, // 已经断开连接kConnecting, // 正在连接kConnected, // 已连接kDisconnecting // 正在断开连接};void setState(StateE state) { state_ = state; }void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void *data, size_t len);void shutdownInLoop();EventLoop *loop_; // 这里是baseloop还是subloop由TcpServer中创建的线程数决定 若为多Reactor 该loop_指向subloop 若为单Reactor 该loop_指向baseloopconst std::string name_;std::atomic_int state_;bool reading_;// Socket Channel 这里和Acceptor类似 Acceptor => mainloop TcpConnection => subloopstd::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;// 这些回调TcpServer也有 用户通过写入TcpServer注册 TcpServer再将注册的回调传递给TcpConnection TcpConnection再将回调注册到Channel中ConnectionCallback connectionCallback_; // 有新连接时的回调MessageCallback messageCallback_; // 有读写消息时的回调WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_;size_t highWaterMark_;// 数据缓冲区Buffer inputBuffer_; // 接收数据的缓冲区Buffer outputBuffer_; // 发送数据的缓冲区 用户send向outputBuffer_发
};
构造函数实现:初始化loop对象(子loop)、socketfd、设置相应的对端和本端的地址以及设置socket的四种回调
TcpConnection::TcpConnection(EventLoop *loop,const std::string &nameArg,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr): loop_(CheckLoopNotNull(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64 * 1024 * 1024) // 64M
{// 下面给channel设置相应的回调函数 poller给channel通知感兴趣的事件发生了 channel会回调相应的回调函数channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\\n", name_.c_str(), sockfd);socket_->setKeepAlive(true);
}
处理socketfd读回调的handRead函数,当维护的socket有可读事件的时候回调这个函数,可读意味着内核缓冲区里面有数据来了,需要读取到inputBuffer缓冲区中,上层回调可以根据inputBuffer的数据进行业务操作,当读出错时进行相应的错误处理以及错误回调。
// 读是相对服务器而言的 当对端客户端有数据到达 服务器端检测到EPOLLIN 就会触发该fd上的回调 handleRead取读走对端发来的数据
void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0) // 有数据到达{// 已建立连接的用户有可读事件发生了 调用用户传入的回调操作onMessage shared_from_this就是获取了TcpConnection的智能指针messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0) // 客户端断开{handleClose();}else // 出错了{errno = savedErrno;LOG_ERROR("TcpConnection::handleRead");handleError();}
}
处理socketfd写回调的handWrite()函数,当维护的socket有可写事件的时候回调这个函数,可写意味着内核态缓冲区的数据由满状态变为了不满的状态,这样可以将outPutBuffer的数据写入内核缓冲区,如果数据还未写完(代表内核缓冲区已满),需要等待下次内核由满状态变为不满的状态,最后写完所有数据之后需要调用disableWriting()关闭写事件,防止busy loop,并可以进行相应的写回调。需要说明的是调用handWrite()是在channel可写的状态下进行的(也即channel像poller注册了POLLOUT事件)
void TcpConnection::handleWrite()
{if (channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){// TcpConnection对象在其所在的subloop中 向pendingFunctors_中加入回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop(); // 在当前所属的loop中把TcpConnection删除掉}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing", channel_->fd());}
}
//关闭操作
void TcpConnection::shutdown()
{if (state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}
void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明当前outputBuffer_的数据全部向外发送完成{socket_->shutdownWrite();}
}
向对端发送数据通过send()->sendInloop()实现,发送数据分为两个部分,第一次如果直接能将数据一次性写入,那么会直接调用write()函数写入到内核缓冲区,如果未写完所有数据内核缓冲区就满了,那么先将剩余的数据写入outputBuffer缓冲区,随后注册socketfd的可写事件,即调用channel_->enableWriting()注册POLLOUT,下次等待内核缓冲区的数据由满状态变为不满状态之后,会回调handWrite()函数将outputBuffer缓冲区里的数据继续写入内核缓冲区,一直循环调用直至写完缓冲区里的所有数据,调用channel_->disableWriting()关闭写事件,防止busy loop。
这里进行第二种写入的时候,加入了一个高水位的操作,目的是防止发送的数据过快内核缓冲区来不及接收
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()) // 这种是对于单个reactor的情况 用户调用conn->send时 loop_即为当前线程{sendInLoop(buf.c_str(), buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}/* 发送数据 应用写的快 而内核发送数据慢 需要把待发送数据写入缓冲区,而且设置了水位回调/
void TcpConnection::sendInLoop(const void *data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected) // 之前调用过该connection的shutdown 不能再进行发送了{LOG_ERROR("disconnected, give up writing");}// 表示channel_第一次开始写数据或者缓冲区没有待发送数据if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){// 既然在这里数据全部发送完成,就不用再给channel设置epollout事件了loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // nwrote < 0{nwrote = 0;if (errno != EWOULDBLOCK) // EWOULDBLOCK表示非阻塞情况下没有数据后的正常返回 等同于EAGAIN{LOG_ERROR("TcpConnection::sendInLoop");if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET{faultError = true;}}}}/* 说明当前这一次write并没有把数据全部发送出去 剩余的数据需要保存到缓冲区当中* 然后给channel注册EPOLLOUT事件,Poller发现tcp的发送缓冲区有空间后会通知* 相应的sock->channel,调用channel对应注册的writeCallback_回调方法,* channel的writeCallback_实际上就是TcpConnection设置的handleWrite回调,* 把发送缓冲区outputBuffer_的内容全部发送完成/if (!faultError && remaining > 0){// 目前发送缓冲区剩余的待发送的数据的长度size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append((char *)data + nwrote, remaining);if (!channel_->isWriting()){channel_->enableWriting(); // 这里一定要注册channel的写事件 否则poller不会给channel通知epollout}}
}
关闭连接和错误回调
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d\\n", channel_->fd(), (int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); // 执行连接关闭的回调closeCallback_(connPtr); // 执行关闭连接的回调 执行的是TcpServer::removeConnection回调方法 // must be the last line
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){err = errno;}else{err = optval;}LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d\\n", name_.c_str(), err);
}
连接建立和连接销毁操作
// 连接建立
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); // 向poller注册channel的EPOLLIN读事件// 新连接建立 执行回调connectionCallback_(shared_from_this());
}
// 连接销毁
void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 把channel的所有感兴趣的事件从poller中删除掉connectionCallback_(shared_from_this());}channel_->remove(); // 把channel从poller中删除掉
}