【Linux】匿名管道代码实现-mypipe
文章目录
管道介绍
什么是管道:
- 管道是unix中最古老的进程间通信的形式
- 我们把从一个进程连接到另一个进程的数据流的叫做管道
管道也是一种文件
管道的原理
前提知识: 创建子进程的时候,fork子进程,只会复制进程相关的数据结构对象,不会复制父进程曾经打开的文件对象!
这就是为什么fork之后,父子进程cout,printf都会向同一个显示器终端打印数据的原因
我们在代码中需要做的,其实就是让父子进程看到同一份文件,对于父进程而言,只写该文件,对于子进程而言,只读该文件,即可进行进程间的通信
管道的特点
- 单向通信–>半双工的一种特殊情况(类似于对讲机)–>吵架是"全双工"
- 管道的本质是文件,因为fd的声明周期是随进程的,因此管道的生命周期也是随进程的
- 管道通信,通常用来进行具有"血缘"关系的进程,进行进程间的通信,常用于父子通信–pipe打开管道,并不清楚管道的名字,我们把这个管道称为匿名管道
- 在管道通信中,写入的次数和读取的次数不是严格匹配的–>读写的次数多少没有强相关–>字节流
所以管道叫"面向字节流" - 具有一定的协同能力,让reader和writer能够按照一定的步骤进行通信–自带同步机制
四种场景:
- 如果我们read读取完毕了所有的管道数据,如果对方不发,我们就只能等待
- 如果我们write端将管道写满了,我们还能写吗?不能
- 如果我关闭了写端,读取完毕管道数据,再读,就会read返回0,表明读到了文件结尾
- 写端一直写,读端关闭,会发生什么呢?此时再写也没有意义了,OS不会维护无意义低效率的,或者浪费资源的事情.OS会杀死一直在写入的进程,OS通过信号来终止进程,13)SIGPIPE
具体代码详写
创建初始文件
mkdir unnamedpipe //创建一个新的文件夹
cd unnamedpipe //进入该文件夹
touch makefile //编写makefile
touch mypipe.cc //主要程序
touch task.hpp //任务列表文件
以上文件具体内容及各自的作用将在下文中详细叙述.
makefile编写
我们是在linux上进行编程,而不是VS之类的集成开发环境,因此,写一个makefile方便我们调试代码
mypipe:mypipe.ccg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f mypipe
定义任务列表-task.hpp
分阶段代码编写
对于这个操作,其实我们在这个阶段已经做了很多次了,无非就是将不同的任务做成不同的函数,并且封装成函数指针数组,并在细节处不断解耦,已达到低耦合,高内聚的特点.
首先,要使用函数指针数组,我们得先重命名(也叫声明)函数指针类型:
typedef void(*fun_t)();
//参数列表为空,返回值为void函数类型,重命名为fun_t
然后我们就可以肆无忌惮地写一些函数(没有具体实现,仅仅是为了测试用):
我们假设有以下三个任务可供用户选择:
#include<iostream>
using namespace std;
//getpid的头文件:
#include<unistd.h>
void PrintLog()
{cout << getpid() << ":PrintLog..." << endl;
}
void InsertMySQL()
{cout << getpid() << " InsertMySQL..." << endl;
}
void NetRequest()
{cout << getpid() << " NetRequest..." << endl;
}
同时,为了更加解耦,更加方便我们用户操作,我们将上述操作宏定义为数字:
并且,我们约定每一个command都应该是四字节发送,接收端四字节接收.
//约定,每一个command都必须是4字节
#define COMMAND_LOG 0
#define INSERTMYSQL 1
#define NETREQUEST 2
我们在创建好三个函数之后,需要"组织"和"管理"好这些数据:
class Task
{
public:Task(){funcs.push_back(PrintLog);funcs.push_back(InsertMySQL);funcs.push_back(NetRequest);}void Excute(int command)//成员函数,执行某个命令{if(command >= 0 && command < funcs.size()){funcs[command]();}}
public:vector<fun_t> funcs;//定义一个函数指针数组
};
至此,我们的task.hpp
便创建好了,专门用来储存任务的数据结构.
总代码展示:
#pragma once
#include<iostream>
#include<functional>
#include<vector>
#include<unistd.h>
#define COMMAND_LOG 0
#define INSERTMYSQL 1
#define NETREQUEST 2
using namespace std;
typedef void(*fun_t)();//函数指针void PrintLog()
{cout << getpid() << ":PrintLog..." << endl;
}void InsertMySQL()
{cout << getpid() << " InsertMySQL..." << endl;
}void NetRequest()
{cout << getpid() << " NetRequest..." << endl;
}class Task
{
public:Task(){funcs.push_back(PrintLog);funcs.push_back(InsertMySQL);funcs.push_back(NetRequest);}void Excute(int command){if(command >= 0 && command < funcs.size()){funcs[command]();}}
public:vector<fun_t> funcs;
};
ctrlProcess.cc 编写
头文件包含(如有不会,自己查谷歌)
这个文件我们主要用于编写主要控制程序
我们先把所有需要的头文件包含了:
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
//记得包含这个任务列表头文件
#include "Task.hpp"
using namespace std;
定义全局变量以解耦
为了方便之后的修改,我们设置一个全局变量来保存管道个数,以及将一个Task对象实例化
const int gnum = 0;
Task t;
main,函数框架
整个程序,主要分为三个步骤:
- 初始化:构建控制结构并且创建管道
- 开启控制进程
- 回收子进程(退出整个系统)
int main()
{//1.先进行构建控制结构,父进程写入,子进程读取vector<EndPoint> end_points;//先描述,再组织,需要我们单独再定义EndPoint的结构creatProcess(&end_points);//为end_points向量创建管道//此时我们得到了一堆管道和多个正在等待的读取命令的子进程//2.开启控制进程函数ctrlProcess(end_points);//3.此时整个系统都退出了waitProcess(end_points);return 0;
}
EndPoint定义
class EndPoint
{//父进程要对他所创建的管道进行管理->先描述再组织
private:static int number;//在类中声明,类外定义public:pid_t _child_id;//子进程idint _write_fd;//写端文件描述符std::string processname;//该进程的名字public:EndPoint(int id,int fd):_child_id(id),_write_fd(fd){//process-0[pid:fd]char namebuff[64];snprintf(namebuff,sizeof(namebuff),"process-%d[%d:%d]",number++,_child_id,_write_fd);//每写一个记得将number++processname = namebuff;}string name() const{return processname;//以这种方式获得该进程的名字更加好}~EndPoint(){}
};int EndPoint::number = 0;//静态成员变量必须在类外定义,在类里面声明
creatProcess 创建管道
void creatProcess(vector<EndPoint> *end_points)
{for(int i = 0; i < gnum; i++)//需要创建gnum个管道{int pipefd[2] = {0};//pipe函数返回值是一个数组,含有两个元素,其中fd[0]是读端,fd[1]是写端int n = pipe(pipefd);//这里接收返回值仅仅是为了检查合法性assert(n == 0);(void)n;//为了使用一下n,否则编译器会以为这是定义了但是没有使用的冗余变量pid_t id = fork();//创建子进程assert(id != -1);//子进程if(id == 0){close(pipefd[1]);//关闭写端,只留下读端//我们期望,所有的子进程读取指令的时候,都从标准输入读取->输入重定向dup2(pipefd[0],0);//将屏幕输入重定向到pipefd[0](即读端)//->子进程开始等待获取命令WaitCommand();//->命令完成以后关闭读端close(pipefd[0]);exit(0);}//父进程close(pipefd[0]);//将新的子进程和他的管道写端,构建对象,插入到管理列表中(*end_points).push_back(EndPoint(id,pipefd[1]));}
}
WaitCommand-子进程开始读取
void WaitCommand()
{while(true){int command = 0;int n = read(0,&command,sizeof(int));//以四字节为单位读取,read返回的是读到的数据字节数大小if(n == sizeof(int))//如果得到的是一个合法的命令-即四字节数{t.Excute(command);//->调用命令}else if(n == 0){//此时已经读取完毕,即父进程不再发送,管道可以关闭了cout << "父进程让我退出: " << getpid() << endl;break;}else break;}
}
到此为止,我们现在已经得到了gnum个管道以及对应数量的子进程,正在嗷嗷待哺.
我们也通过pushback将他们的数据结构管理进了end_points,到时候我们只需要用下标访问end_points就能通过不同的管道实现父子进程的通信了
ctrlProcess 开始指点天下
我们再回顾一下,我们已经走了哪些路了:
我们现在开始真正控制进程进行管道通信了.
void ctrlProcess(const vector<EndPoint> & end_points)
{int num = 0;int cnt = 0;while(true){cout << endl;int command = ShowBoard();//创建一个菜单if(command == 3) break;//令3命令为退出子进程if(command < 0 || command > 2) continue;//合法性检查int index = cnt++;//循环进程进行运行(从0-2进程)cnt %= end_points.size();cout << "选择了进程" << end_points[index].name() << "处理任务" << endl;write(end_points[index]._write_fd,&command,sizeof(command));}
}
写个小菜单,顺便接收command
int ShowBoard()
{cout << "#" << endl;cout << "# 0.执行日志任务 #" << endl;cout << "# 1.执行数据库任务 #" << endl;cout << "# 2.执行请求任务 #" << endl;cout << "# 3.退出 #" << endl;cout << "请选择# ";int command = 0;cin >> command;return command;
}
waitProcess - 最后一步
void waitProcess(const vector<EndPoint> & end_points)
{//1.我们需要让子进程全部退出 -- 只需要让父进程关闭所有的write_fd就可以了for(const auto &ep : end_points){close(ep._write_fd);}cout << "父进程让所有子进程全部退出" << endl;sleep(10);//2.父进程要回收子进程的僵尸状态for(const auto &ep : end_points){waitpid(ep._child_id,nullptr,0);}cout << "父进程让所有子进程全部退出" << endl;sleep(10);
}
这里我们是先关闭所有的读端之后再统一回收进程,就完美避免了我们下一步即将修改的小bug.
如果我们是用下面这种写法:
for(int end = 0; end < end_points.size(); end++){std::cout << "父进程让子进程退出:" << end_points[end]._child_id << std::endl;close(end_points[end]._write_fd);waitpid(end_points[end]._child_id, nullptr, 0);std::cout << "父进程回收了子进程:" << end_points[end]._child_id << std::endl;} sleep(10);
这样的话我们会只能关闭第一个子进程,而不能关闭后面打开的几个子进程.
最最最后一步,改一个小bug(creatProcess中子进程继承父进程文件描述符问题)
这里有个极不容易发现的问题:
当我们创建第一个进程的时候,没有毛病
但是当我们创建后面的进程的时候,子进程会继承父进程的写端文件!
举个例子
子进程1:打开了fd[0] -> 此时父进程有子进程1的写端
子进程2:打开了fd[0] -> 此时还继承了子进程1的写端!
…
从以往后,每次创建子进程都会继承前面那个进程的写端.在我们关闭的时候,如果只把父进程对应的写端关闭了,还会有子进程没有关闭这个写端,仍然无法关闭我们想要关的那个子进程.
所以,我们在每次创建子进程的时候要记得把相应的写端关闭了!
具体写法如下:
void createProcesses(vector<EndPoint> *end_points)
{vector<int> fds;for (int i = 0; i < gnum; i++){// 1.1 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;// 1.2 创建进程pid_t id = fork();assert(id != -1);// 一定是子进程if (id == 0){for(auto &fd : fds) close(fd);close(pipefd[1]);dup2(pipefd[0], 0);WaitCommand();close(pipefd[0]);exit(0);}// 一定是父进程close(pipefd[0]);end_points->push_back(EndPoint(id, pipefd[1]));fds.push_back(pipefd[1]);}
}
总代码展示
#include<iostream>
#include<string>
#include<unistd.h>
#include<cassert>
#include<vector>
#include "task.hpp"
#include<stdlib.h>
#include<sys/wait.h>
#include<sys/types.h>
using namespace std;
const int gnum = 3;class EndPoint
{//父进程要对他所创建的管道进行管理->先描述再组织
private:static int number;
public:pid_t _child_id;int _write_fd;std::string processname;
public:EndPoint(int id,int fd):_child_id(id),_write_fd(fd){//process-0[pid:fd]char namebuff[64];snprintf(namebuff,sizeof(namebuff),"process-%d[%d:%d]",number++,_child_id,_write_fd);processname = namebuff;}string name() const{return processname;}~EndPoint(){}};int EndPoint::number = 0;Task t;//子进程执行的方法:
void WaitCommand()
{while(true){int command = 0;int n = read(0,&command,sizeof(int));if(n == sizeof(int)){t.Excute(command);}else if(n == 0) {cout << "父进程让我退出:" << getpid() << endl;break;}else break;}
}void createProcesses(vector<EndPoint> *end_points)
{vector<int> fds;for (int i = 0; i < gnum; i++){// 1.1 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;// 1.2 创建进程pid_t id = fork();assert(id != -1);// 一定是子进程if (id == 0){for(auto &fd : fds) close(fd);// 1.3 关闭不要的fdclose(pipefd[1]);// 我们期望,所有的子进程读取"指令"的时候,都从标准输入读取// 1.3.1 输入重定向,可以不做dup2(pipefd[0], 0);// 1.3.2 子进程开始等待获取命令WaitCommand();close(pipefd[0]);exit(0);}// 一定是父进程// 1.3 关闭不要的fdclose(pipefd[0]);// 1.4 将新的子进程和他的管道写端,构建对象end_points->push_back(EndPoint(id, pipefd[1]));fds.push_back(pipefd[1]);}
}int ShowBoard()
{cout << "#" << endl;cout << "# 0.执行日志任务 #" << endl;cout << "# 1.执行数据库任务 #" << endl;cout << "# 2.执行请求任务 #" << endl;cout << "# 3.退出 #" << endl;cout << "请选择# ";int command = 0;cin >> command;return command;
}void waitProcess(const vector<EndPoint> & end_points)
{//1.我们需要让子进程全部退出 -- 只需要让父进程关闭所有的write_fd就可以了for(const auto &ep : end_points){close(ep._write_fd);}cout << "父进程让所有子进程全部退出" << endl;sleep(10);//2.父进程要回收子进程的僵尸状态for(const auto &ep : end_points){waitpid(ep._child_id,nullptr,0);}cout << "父进程让所有子进程全部退出" << endl;sleep(10);
}void ctrlProcess(const vector<EndPoint> & end_points)
{//我们可以写成自动化的,也可以搞成交互式的int num = 0;int cnt = 0;// while(true)// {// srand((unsigned)time(NULL));// //1.选择任务// int command = INSERTMYSQL;// //2.选择进程// int index = rand() % end_points.size();// //3.下发任务// write(end_points[index]._write_fd,&command,sizeof(command));// sleep(1);// }while(true){cout << endl;int command = ShowBoard();if(command < 0 || command > 2) continue;if(command == 3) break;int index = cnt++;cnt %= end_points.size();cout << "选择了进程" << end_points[index].name() << "处理任务" << endl;write(end_points[index]._write_fd,&command,sizeof(command));}
}
int main()
{//1.构建控制结构,父进程写,子进程读vector<EndPoint> end_points;//2.此时我们得到了一批end_points结构creatProcess(&end_points);ctrlProcess(end_points);//3.此时整个系统都退出了waitProcess(end_points);return 0;
}
总结
以上,我们写的都是匿名管道,并且只能在两个有亲缘关系的进程中进行通信.
最后再复习一下管道的五种特点及四种场景:
- 单向通信–>半双工的一种特殊情况(类似于对讲机)–>吵架是"全双工"
- 管道的本质是文件,因为fd的声明周期是随进程的,因此管道的生命周期也是随进程的
- 管道通信,通常用来进行具有"血缘"关系的进程,进行进程间的通信,常用于父子通信–pipe打开管道,并不清楚管道的名字,我们把这个管道称为匿名管道
- 在管道通信中,写入的次数和读取的次数不是严格匹配的–>读写的次数多少没有强相关–>字节流
所以管道叫"面向字节流" - 具有一定的协同能力,让reader和writer能够按照一定的步骤进行通信–自带同步机制
四种场景:
- 如果我们read读取完毕了所有的管道数据,如果对方不发,我们就只能等待
- 如果我们write端将管道写满了,我们还能写吗?不能
- 如果我关闭了写端,读取完毕管道数据,再读,就会read返回0,表明读到了文件结尾
- 写端一直写,读端关闭,会发生什么呢?此时再写也没有意义了,OS不会维护无意义低效率的,或者浪费资源的事情.OS会杀死一直在写入的进程,OS通过信号来终止进程,13)SIGPIPE