> 文章列表 > muduo源码剖析--Inetaddress/Socket/Acceptor

muduo源码剖析--Inetaddress/Socket/Acceptor

muduo源码剖析--Inetaddress/Socket/Acceptor

InetAddress类

实际上封装了传入socket地址类,包括ip、端口以及具体的协议簇

// 封装socket地址类型
class InetAddress
{
public:explicit InetAddress(uint16_t port = 0, std::string ip = "127.0.0.1");explicit InetAddress(const sockaddr_in &addr): addr_(addr){}std::string toIp() const;std::string toIpPort() const;uint16_t toPort() const;const sockaddr_in *getSockAddr() const { return &addr_; }void setSockAddr(const sockaddr_in &addr) { addr_ = addr; }private:sockaddr_in addr_;  //构造socket的地址类
};

就是封装,没什么好说的,直接上cpp实现:

InetAddress::InetAddress(uint16_t port, std::string ip)
{::memset(&addr_, 0, sizeof(addr_));addr_.sin_family = AF_INET; //IPV4addr_.sin_port = ::htons(port); // 本地字节序转为网络字节序addr_.sin_addr.s_addr = ::inet_addr(ip.c_str());
}std::string InetAddress::toIp() const
{// addr_char buf[64] = {0};::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);return buf;
}std::string InetAddress::toIpPort() const
{// ip:portchar buf[64] = {0};::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);size_t end = ::strlen(buf);uint16_t port = ::ntohs(addr_.sin_port);sprintf(buf+end, ":%u", port);return buf;
}uint16_t InetAddress::toPort() const
{return ::ntohs(addr_.sin_port);
}

Socket类

实际封装socketfd的类,socket地址采取上述封装的InetAddress类, 对bind、listen、accept函数进行了封装

// 封装socket fd
class Socket : noncopyable
{
public:explicit Socket(int sockfd): sockfd_(sockfd){}~Socket();int fd() const { return sockfd_; }void bindAddress(const InetAddress &localaddr);void listen();int accept(InetAddress *peeraddr);void shutdownWrite();//设置socket的相关属性void setTcpNoDelay(bool on);void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);private:const int sockfd_;
};

bind、listen、accept函数实现

void Socket::bindAddress(const InetAddress &localaddr)
{if (0 != ::bind(sockfd_, (sockaddr *)localaddr.getSockAddr(), sizeof(sockaddr_in))){LOG_FATAL("bind sockfd:%d fail\\n", sockfd_);}
}void Socket::listen()
{if (0 != ::listen(sockfd_, 1024)){LOG_FATAL("listen sockfd:%d fail\\n", sockfd_);}
}int Socket::accept(InetAddress *peeraddr)
{/*** 1. accept函数的参数不合法* 2. 对返回的connfd没有设置非阻塞* Reactor模型 one loop per thread* poller + non-blocking IO**/sockaddr_in addr;socklen_t len = sizeof(addr);::memset(&addr, 0, sizeof(addr));// fixed : int connfd = ::accept(sockfd_, (sockaddr *)&addr, &len);int connfd = ::accept4(sockfd_, (sockaddr *)&addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC); //接收连接if (connfd >= 0){peeraddr->setSockAddr(addr);  //传出参数}return connfd;  //返回接收的fd
} 

设置socket相关属性的实现:

void Socket::setTcpNoDelay(bool on)//延迟发送
{int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
void Socket::setReuseAddr(bool on) //ip复用
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
void Socket::setReusePort(bool on)//端口复用
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
void Socket::setKeepAlive(bool on)//连接保活
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}

半关闭shutdownWrite()

void Socket::shutdownWrite()
{if (::shutdown(sockfd_, SHUT_WR) < 0){LOG_ERROR("shutdownWrite error");}
}

释放,也即关闭所管理的socketfd

Socket::~Socket()
{::close(sockfd_);
}

Acceptor类

封装了服务端端接收新连接请求的套接字(acceptFd),复用InetAddress、Socket类

class Acceptor : noncopyable
{
public:using NewConnectionCallback = std::function<void(int sockfd, const InetAddress &)>; //新连接到来的回调Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport);~Acceptor();void setNewConnectionCallback(const NewConnectionCallback &cb) { NewConnectionCallback_ = cb; }bool listenning() const { return listenning_; }void listen();private:void handleRead();EventLoop *loop_; // Acceptor用的就是用户定义的那个baseLoop 也称作mainLoopSocket acceptSocket_; //接收新连接请求的fdChannel acceptChannel_; //acceptfd所绑定的channelNewConnectionCallback NewConnectionCallback_;bool listenning_;
};

构造函数实现:主要初始化acceptfd,以及设置相应的回调


static int createNonblocking()  //创建非阻塞的套接字
{int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);if (sockfd < 0){LOG_FATAL("%s:%s:%d listen socket create err:%d\\n", __FILE__, __FUNCTION__, __LINE__, errno);}return sockfd;
}Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport): loop_(loop), acceptSocket_(createNonblocking()), acceptChannel_(loop, acceptSocket_.fd()) //绑定channel,loop为baseLoop, listenning_(false)
{acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(true);acceptSocket_.bindAddress(listenAddr);// TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)// baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

监听listen()->Socket::listen()

void Acceptor::listen()
{listenning_ = true;acceptSocket_.listen();         // listenacceptChannel_.enableReading(); // acceptChannel_注册至Poller !重要
}

执行新连接请求,并执行相应的回调

// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0){if (NewConnectionCallback_){NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel}else{::close(connfd);}}else{LOG_ERROR("%s:%s:%d accept err:%d\\n", __FILE__, __FUNCTION__, __LINE__, errno);if (errno == EMFILE){LOG_ERROR("%s:%s:%d sockfd reached limit\\n", __FILE__, __FUNCTION__, __LINE__);/*原实现是,先创建一个ldfd占住一个位置(open("dev\\null"))如果文件描述符打开达到上限,那么先关闭当前的ldfd,利用空出来的这个ldfd去接收新连接,接收完之后在关闭这个连接,然后再(open("dev\\null"))将ldfd占住这个位置,达到一个比较优雅的处理方式。*/}}
}

释放操作:

Acceptor::~Acceptor()
{acceptChannel_.disableAll();    // 把从Poller中感兴趣的事件删除掉acceptChannel_.remove();        // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除
}