> 文章列表 > 进程间通信【管道】

进程间通信【管道】

进程间通信【管道】

前提知识

概念: 进程通信(IPC,Interprocess communication)是一组编程接口,让程序员能够协调不同的进程,使之能在一个操作系统里同时运行,并相互传递、交换信息。这使得一个程序能够在同一时间里处理许多用户的要求。IPC方法包括管道(PIPE)、消息排队、旗语、共用内存以及套接字(socket)。

通信目的:

  • 数据传输:一个进程需要将它的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源。
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变

如何实现通信?

要让两个不同的进程实现通信,前提条件是让它们看到同一份资源。所以要想办法让他们看到同一份资源,就需要采取一些手段,可以分为下面几种

1.管道(基于文件系统)

  • 匿名管道pipe
  • 命名管道

2.System V IPC(标准做法)

  • System V 消息队列
  • System V 共享内存
  • System V 信号量

3. POSIX IPC(标准做法 )

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

管道

什么是管道

管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。它的特点是单向传输数据的,先进先出。

例如:cat file.txt | head -1

cat是一个进程,这个进程先处理,然后将处理后得到的标准输出到管道中,再由head进程通过标准输入将管道中的数据读出,再进行处理。

进程间通信【管道】

进程具有独立性,通信的成本不低———1.OS需要直接或者间接给通信双方提供“内存空间“ 2.要通信的进程必须看到一份公共的资源

进程间通信【管道】

匿名管道:父子两进程互相通信

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码

用fork来共享管道原理

进程间通信【管道】

  • 一般而言我我们用管道实现单项通信
  • 父子进程通信可不可以创建全局缓冲区来完成通信呢? 不可以 ! 进程运行具有独立性,写时拷贝
#include <iostream>
#include <unistd.h>
#include <stdlib.h>
#include <cstdio>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
using namespace std;
int main()
{//fd[0]表示读,fd[1]表示写  int fds[2];int ret = pipe(fds);if (ret == -1){// 管道创建失败perror("make piep");exit(-1);}pid_t id = fork();assert(id>=0);if (id == 0){//子进程写,关闭读端close(fds[0]);//子进程的通信代码const char* s="我是子进程,我正在给你发消息";int cnt=0;while(1){// char buffer[1024];//只有子进程能看到// snprintf(buffer,sizeof(buffer),"child->parent say:%s[%d][%d]\\n", s,++cnt,getpid());const char* buffer = "hello father, I am child...";//向特定的文件描述符写数据//strlen不用+1,我们只需要写有效字段即可write(fds[1],buffer,strlen(buffer));sleep(5);}//子进程运行结束关闭写端close(fds[1]);exit(0);}//父进程读取,关闭写端close(fds[1]);//父进程的通信代码while(true){char buffer[1024];//-1可以避免缓冲区被写满cout << "AAAAA" << endl;ssize_t s=read(fds[0],buffer,sizeof(buffer)-1);cout << "BBBBB" << endl;if(s>0) buffer[s]=0;cout << "Get Message# " << buffer << "| my pid:" << getpid() << endl;}ret=waitpid(id,NULL,0);assert(ret=id);//父进程运行结束关闭读端close(fds[0]);//父进程没有sleepreturn 0;
}

前5秒

进程间通信【管道】

后5秒

进程间通信【管道】

我们发现第一次打印到AAAAA后会卡住,表明如果管道中没有数据,读端默认在读,默认会直接阻塞当前正在读取的进程

进程通信读写特征:

1.当没有数据可读时

  • O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来为止
  • O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。

2.当管道满的时候

  • O_NONBLOCK disable:write调用阻塞,直到有进程读走数据
  • O_NONBLOCK enable:write调用返回-1,errno值为EAGAIN

3.如果所有管道写端对应的文件描述符被关闭,则read返回0。

4.如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出。

管道的特点

  • 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
  • 一般而言,进程退出,管道释放,所以管道的生命周期随进程
  • 互斥与同步机制———对共享资源的保护方案
  • 半双工,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
  • 管道是面向字节流的(网络)

基于管道的进程池设计

#include<iostream>
#include<cassert>
#include<cstdlib>
#include<unistd.h>
#include<sys/types.h>
#include<ctime>
#include<sys/wait.h>
#include<vector>
#include<string>
using namespace std;
#define Process_NUM 5
typedef void (*func_t)();
#define MakeSeed() srand((unsigned long)time(nullptr)^getpid())
void downLoadTask()
{cout << getpid() << " :下载任务" << endl;sleep(1);
}
void ioTask()
{cout << getpid() << " :IO任务" << endl;sleep(1);
}
void flushTask()
{cout << getpid() << " :刷新任务" << endl;sleep(1);
}
class subEp
{
public:subEp(pid_t id,int writeFd):_id(id),_writeFd(writeFd){char namebuffer[1024];snprintf(namebuffer,sizeof(namebuffer),"process-%d[pid(%d)-fd(%d)]",num++,_id,_writeFd);_name=namebuffer;}static int num;string _name;pid_t _id; //子进程pidint _writeFd; //父进程连接子进程管道的写端
};
int subEp::num=0;
int receive(int fds)
{int code=0;//子进程读ssize_t s=read(fds,&code,sizeof(code));//如果父进程不写的时候,子进程会读到0,读到0的时候我们可以返回if(s<=0){return -1;}else{return code;}
}
void createSubProcess(vector<subEp>& subs,vector<func_t>& funcMap)
{vector<int> deleteFd;for(int i=0;i<Process_NUM;++i){int fds[2];int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){for(int j=0;i<deleteFd.size();++i){close(deleteFd[i]);}//父进程打开的文件,会被子进程共享//上一个子进程曾经打开的写端也会被这个子进程继承下去//这样会导致当前进程会继承多个写端,写端没有被完全关闭,这样就读不到0,会导致依旧阻塞//但是最后一个进程没有子进程了,他所对应的文件描述符表就被关了//child关闭写端close(fds[1]);while(1){//1.获取命令,如果没有发送,我们子进程应该阻塞int Commandcode=receive(fds[0]);//2.完成任务if(Commandcode>=0&&Commandcode<funcMap.size()){funcMap[Commandcode]();}else if(Commandcode==-1){break;}}exit(0);}//父进程关闭读端close(fds[0]);subEp sub(id,fds[1]);//放到vector里面subs.push_back(sub);//把子进程的写端保存下来deleteFd.push_back(fds[1]);}
}
void loadTaskFunc(vector<func_t>& funcMap)
{funcMap.push_back(downLoadTask);funcMap.push_back(ioTask);funcMap.push_back(flushTask);
}
void sendTask(const subEp& sub,int taskId)
{//write返回实际写入的个数cout << "send TaskId " << taskId << " send to ->" << sub._name << endl;int n=write(sub._writeFd,&taskId,sizeof(taskId));assert(n==sizeof(int));(void)n;
}
void waitProcess(vector<subEp>& subs)
{int processnum=subs.size();for(int i=0;i<processnum;++i){waitpid(subs[i]._id,nullptr,0);}
}
void BalancedControl(vector<subEp>& subs,vector<func_t>& funcMap,int cnt)
{int processnum=subs.size();//任务个数int tasknum=funcMap.size();int count=cnt;bool forever=false;if(cnt==0) forever=true;while(1){//1.选择一个子进程int subId=rand()%processnum;//2.选择一个任务int taskId=rand()%tasknum;//3.发送给选择的进程sendTask(subs[subId],taskId);if(!forever){--cnt;if(cnt==0) break;}}//写端关闭,子进程会读到0for(int i=0;i<processnum;++i){close(subs[i]._writeFd);}
}
int main()
{MakeSeed();vector<func_t> funcMap;//把函数放到vector里面loadTaskFunc(funcMap);vector<subEp> subs;//创建子进程,子进程读取createSubProcess(subs,funcMap);//走到这一定是父进程,控制子进程,父进程写入//子进程个数//负载均衡//子进程执行cnt次,如果cnt=0就永远执行int cnt=200;BalancedControl(subs,funcMap,cnt);//回收子进程waitProcess(subs);return 0;
}

bug的问题,如果不修复,他会正常执行,但是会到最后一个在关闭,想正着关闭,就必须保存每个进程创建的写端,到另一个进程的时候把之前的进程都关闭

进程间通信【管道】

命名管道

概念

命名管道是一种特殊类型(符号性)的文件。在不相关(没有亲缘关系)的进程之间交换数据,可以使用FIFO文件来做这项工作。

实验1:

做一个命令行的命名管道

mkfifo filename

进程间通信【管道】

输入一个shell脚本

cnt=0; while :; do echo "hello world -> $cnt "; let cnt++;sleep 1; done > named_pipe

进程间通信【管道】

管道文件大小没有变化!!!

命名管道如何做到让不同进程看到了同一份资源的?可以让不同进程打开指定名称(路径+文件名)的同一文件

(路径+文件名)=唯一性

实验2:

函数原型:
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
功能: 创建一个命名管道
参数:
pathname: 管道名称
mode: 权限
返回值: 创建成功返回0,失败返回-1

利用mkfifo函数可以做一个管道间的通信,client发送给server,server接受消息

comm.h

//让client 和 server看到同一份资源
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
using namespace std;
#define NAME_PIPE "mypipe.txt"
bool createFifo(const std::string& path)
{umask(0);int n = mkfifo(path.c_str(), 0600);if (n == 0)return true;else{std::cout << "errno: " << errno << " err string: " << strerror(errno) << std::endl;return false;}
}
void removeFifo(const std::string& path)
{//移除文件int n=unlink(path.c_str());assert(n==0);(void)n;
}

client.cpp

#include"comm.h"
int main()
{int wfd=open(NAME_PIPE,O_WRONLY);if(wfd  < 0){exit(-1);}//writechar buffer[1024];while(true){cout << "please Say# " << endl;fgets(buffer,sizeof(buffer)-1,stdin);//其实if不写是可以的,因为至少会读到一个/nif(strlen(buffer)>0){buffer[strlen(buffer)-1]=0;}ssize_t s=write(wfd,buffer,strlen(buffer));assert(s==strlen(buffer));(void)s;}close(wfd);removeFifo(NAME_PIPE);return 0;
}

server.cpp

#include"comm.h"
int main()
{cout << "hello server" << endl;createFifo(NAME_PIPE);//如果client不发消息,不会执行到server end,会在这里阻塞int rfd=open(NAME_PIPE,O_RDONLY);cout << "server end" << endl;if(rfd  < 0){exit(-1);}//readchar buffer[1024];while(true){ssize_t s=read(rfd,buffer,sizeof(buffer)-1);if(s>0){buffer[s]=0;cout << "client -> server# " << buffer << endl;}//对方关闭写描述符我们会读到0else if(s==0) {break;}//出错else{cout << "error" << endl;break;}}close(rfd);removeFifo(NAME_PIPE);return 0;
}