> 文章列表 > pg从磁盘读取文件

pg从磁盘读取文件

pg从磁盘读取文件

**瀚高数据库
目录
环境
文档用途
详细信息

环境
系统平台:Linux x86-64 Red Hat Enterprise Linux 7
版本:14

文档用途
了解存储管理器

详细信息

0. 相关数据类型

打开的每一个段用如下结构表示,pg中有MdfdVec数组并且记录了这个数组的长度。

typedef struct _MdfdVec
{File        mdfd_vfd;                      /* fd number in fd.c's pool */ 虚拟文件描述符表的下标BlockNumber mdfd_segno;        /* segment number, from 0 */ 段号,从0开始计数} MdfdVec;

在这里插入图片描述

1.smgrinit

smgr是storage manager的缩写,即磁盘管理器,它作为磁盘管理器的上层,对下层操作进行一定程度的封装。后端进程启动时会做初始化操作。

由于历史原因,只保留对磁盘(文件系统支持的存储体,用磁盘代表)的操作,NSmgr为1。

初始化只是为磁盘管理器申请了一块内存。

void smgrinit(void)
{int            i;for (i = 0; i < NSmgr; i++){if (smgrsw[i].smgr_init)smgrsw[i].smgr_init();}/* register the shutdown proc */on_proc_exit(smgrshutdown, 0);}voidmdinit(void){MdCxt = AllocSetContextCreate(TopMemoryContext,"MdSmgr",ALLOCSET_DEFAULT_SIZES);}

2.smgropen

不是真正打开磁盘文件,只是从hash表中查找一个项,该项缓存了包含了对当前关系中的打开的文件描述符以及文件描述符的数目。

hash表或者相应的entry没有就创建。

第一次初始化该项把表的各种类型打开的段的数目全都初始化为0。

关于该hash操作可看另一篇动态hash的support。


```cpp
/*
*    smgropen() -- Return an SMgrRelation object, creating it if need be.**        This does not attempt to actually open the underlying file.*/SMgrRelation smgropen(RelFileLocator rlocator, BackendId backend){RelFileLocatorBackend brlocator;SMgrRelation reln;bool        found;if (SMgrRelationHash == NULL){/* First time through: initialize the hash table */HASHCTL        ctl;ctl.keysize = sizeof(RelFileLocatorBackend);ctl.entrysize = sizeof(SMgrRelationData);SMgrRelationHash = hash_create("smgr relation table", 400,&ctl, HASH_ELEM | HASH_BLOBS);dlist_init(&unowned_relns);}/* Look up or create an entry */brlocator.locator = rlocator;brlocator.backend = backend;reln = (SMgrRelation) hash_search(SMgrRelationHash,&brlocator,HASH_ENTER, &found);/* Initialize it if not present before */if (!found){/* hash_search already filled in the lookup key */reln->smgr_owner = NULL;reln->smgr_targblock = InvalidBlockNumber;for (int i = 0; i <= MAX_FORKNUM; ++i)reln->smgr_cached_nblocks[i] = InvalidBlockNumber;reln->smgr_which = 0;    /* we only have md.c at present *//* implementation-specific initialization */smgrsw[reln->smgr_which].smgr_open(reln);/* it has no owner yet */dlist_push_tail(&unowned_relns, &reln->node);}return reln;}void mdopen(SMgrRelation reln){/* mark it not open */for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)reln->md_num_open_segs[forknum] = 0;}
**3. smgrread**smgrread从非易失性存储上根据存储体类型,文件类型,块号读取指定的表中指定的块,根据本块内容初始化shared buffer中缓存页。底层会调用md.c (magnetic disk)相关函数,文件名有误导性,因为凡是文件系统支持的存储介质操作都可以由md.c中函数完成该操作,不单单是磁盘。```cpp
/**    smgrread() -- read a particular block from a relation into the supplied buffer.**        This routine is called from the buffer manager in order to*        instantiate pages in the shared buffer cache.  All storage managers*        return pages in the format that POSTGRES expects.*/void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,char *buffer){smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer);}

3.1 mdread

从一个关系中读取指定的块号,默认大小是8KB,比较关键的函数是_mdfd_getseg(),根据指定的参数获得一段,一段默认是1GB。

/**    mdread() -- Read the specified block from a relation.*/void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer){off_t        seekpos;int            nbytes;MdfdVec    *v;// 获取MdfdVec指针,包含了虚拟文件描述符和relation的segment numberv = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);// 计算读取位置seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));// 断言,位置不超过1GBAssert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);// 该函数主要通过pread系统调用,从指定位置读取BLCKSZ字节的数据到buffer中nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);// nbytes作为返回结果,判断是否是读出错还是有坏块以及是否在恢复模式if (nbytes != BLCKSZ){if (nbytes < 0)ereport(ERROR,(errcode_for_file_access(),errmsg("could not read block %u in file \\"%s\\": %m",blocknum, FilePathName(v->mdfd_vfd))));/** Short read: we are at or past EOF, or we read a partial block at* EOF.  Normally this is an error; upper levels should never try to* read a nonexistent block.  However, if zero_damaged_pages is ON or* we are InRecovery, we should instead return zeroes without* complaining.  This allows, for example, the case of trying to* update a block that was later truncated away.*/if (zero_damaged_pages || InRecovery)MemSet(buffer, 0, BLCKSZ);elseereport(ERROR,(errcode(ERRCODE_DATA_CORRUPTED),errmsg("could not read block %u in file \\"%s\\": read only %d of %d bytes",blocknum, FilePathName(v->mdfd_vfd),nbytes, BLCKSZ)));}}

3.2 _mdfd_getseg()

从relation中找一个segment,这个segment包含了blocknum指定的block。

  返回值:要根据behavior判断,可能为NULL,可能是指向MdfdVec的指针,中间可能会创建一个新的段。
/**    _mdfd_getseg() -- Find the segment of the relation holding the specified block.**  If the segment doesn't exist, we ereport, return NULL, or create the segment, according to "behavior".  *  Note: skipFsync is only used in the EXTENSION_CREATE case.*/static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, bool skipFsync, int behavior){MdfdVec    *v;BlockNumber targetseg;BlockNumber nextsegno;/* some way to handle non-existent segments needs to be specified */// 根据blkno算出来的segment可能并不存在,但是某些情况下并不是一种错误,比如在恢复模式// 针对不存在的segment采取的措施主要是:报告错误、无则创建、返回NULL几种。// EXTENSION_DONT_OPEN:当前段在磁盘存在,但是之前没有打开,那么将不进行打开操作 // 该函数上层的调用者会根据自己的情况设置相应的behaviorAssert(behavior & (EXTENSION_FAIL | EXTENSION_CREATE | EXTENSION_RETURN_NULL | EXTENSION_DONT_OPEN));/* 默认情况下:RELSEG_SIZE = 2^30 / 2 ^ 13 = 2 ^17 = 131072 , 1GB的表文件,8KB的block *//* targetset是一个relation在物理存储上的segment number,从0开始 */targetseg = blkno / ((BlockNumber) RELSEG_SIZE);/* md_num_open_segs是文件类型数组,一个relation关联的类型有普通的数据文件、fsm、vm等,不同的文件类型在pg用fork(分支)表示* forknum是对应于某个fork,reln->md_num_open_segs[forknum]表示某个reln中某种fork打开的段的数量* 比如forknum = MAIN_FORKNUM(枚举类型),即普通的数据文件,reln->md_num_open_segs[forknum] == 5, 那么segment = {0, 1, 2, 3, 4}的都已经打开* 此时若满足条件,直接返回就可以。*/if (targetseg < reln->md_num_open_segs[forknum]){v = &reln->md_seg_fds[forknum][targetseg];return v;}/*若behavior中设置了EXTENSION_DONT_OPEN,表示只读打开的文件,但是走到这里意味着当前段没有打开,返回NULL *//* The caller only wants the segment if we already had it open. */if (behavior & EXTENSION_DONT_OPEN)return NULL;/*打开段要保证连续,例如不能出现0,1, 3, 4只能是0,1,2,3,4*md_num_open_segs[forknum]确定当前类型有没有打开的segment, 有则直接打开最近的,没有就打开该类型上的第一个segment*/if (reln->md_num_open_segs[forknum] > 0)v = &reln->md_seg_fds[forknum][reln->md_num_open_segs[forknum] - 1];else{v = mdopenfork(reln, forknum, behavior);if (!v)return NULL; }// nextsegno表示还没打开的下一个段 for (nextsegno = reln->md_num_open_segs[forknum]; nextsegno <= targetseg;  nextsegno++){/* 获取单个磁盘文件block的数量,该磁盘文件由pg虚拟文件描述符索引 */BlockNumber nblocks = _mdnblocks(reln, forknum, v);int            flags = 0;/*保证连续打开*/Assert(nextsegno == v->mdfd_segno + 1);/*一个磁盘文件(segment)中的block的数量总是应该<= RELSEG_SIZE(131072)if (nblocks > ((BlockNumber) RELSEG_SIZE))elog(FATAL, "segment too big");// 处于恢复模式,会在段的末尾增加一个block, 并设置flagsif ((behavior & EXTENSION_CREATE) || (InRecovery && (behavior & EXTENSION_CREATE_RECOVERY))){if (nblocks < ((BlockNumber) RELSEG_SIZE)){char       *zerobuf = palloc0(BLCKSZ);mdextend(reln, forknum, nextsegno * ((BlockNumber) RELSEG_SIZE) - 1, zerobuf, skipFsync);pfree(zerobuf);}flags = O_CREAT;}//  EXTENSION_DONT_CHECK_SIZE为checkpointer时设置的标志, 此时mdnblocks会失效,因为这个标志会导致segment在没有被填满的情况下,分配一个新的segment。// 而计算mdnblocks会假设除最后的一个segment,所有的segment都有1GB/8KB个块(看配置,这里说的默认情况)else if (!(behavior & EXTENSION_DONT_CHECK_SIZE) && nblocks < ((BlockNumber) RELSEG_SIZE)){if (behavior & EXTENSION_RETURN_NULL){errno = ENOENT;return NULL;}ereport(ERROR,(errcode_for_file_access(),errmsg("could not open file \\"%s\\" (target block %u): previous segment is only %u blocks",_mdfd_segpath(reln, forknum, nextsegno),blkno, nblocks)));}  // end else if // 打开下一个段v = _mdfd_openseg(reln, forknum, nextsegno, flags);if (v == NULL){if ((behavior & EXTENSION_RETURN_NULL) && FILE_POSSIBLY_DELETED(errno))return NULL;ereport(ERROR,(errcode_for_file_access(),errmsg("could not open file \\"%s\\" (target block %u): %m",_mdfd_segpath(reln, forknum, nextsegno),blkno)));}} // end for return v;}

3.3 _mdfd_openseg

打开一个段,更新md_num_open_segs和md_seg_fds两个成员变量,并返回MdfdVec类型的指针。

/** Open the specified segment of the relation,* and make a MdfdVec object for it.  Returns NULL on failure.*/static MdfdVec *_mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, int oflags){MdfdVec    *v;File        fd;char       *fullpath;// 字符串,比如  pg_tblspc/PG_16_202208251/dbOid/relNumber.3 或者 base/dbOid/relNumberfullpath = _mdfd_segpath(reln, forknum, segno);// 获取虚拟文件描述符fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags);//释放fullpath内存pfree(fullpath);if (fd < 0)return NULL;/** Segments are always opened in order from lowest to highest, so we must* be adding a new one at the end.*/Assert(segno == reln->md_num_open_segs[forknum]);_fdvec_resize(reln, forknum, segno + 1);/* fill the entry */v = &reln->md_seg_fds[forknum][segno];v->mdfd_vfd = fd;v->mdfd_segno = segno;Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));/* all done */return v;}

3.4 _mdfd_segpath

比较简单,为了记一下在路径后追加segno。

/*
* Return the filename for the specified segment of the relation. The* returned string is palloc'd.*/static char *_mdfd_segpath(SMgrRelation reln, ForkNumber forknum, BlockNumber segno){char       *path,*fullpath;path = relpath(reln->smgr_rlocator, forknum);if (segno > 0){fullpath = psprintf("%s.%u", path, segno);pfree(path);}elsefullpath = path;return fullpath;}

3.5 _fdvec_resize

对SMgrRelationData中的两个成员变量做调整。

/**    _fdvec_resize() -- Resize the fork's open segments array*/static void _fdvec_resize(SMgrRelation reln, ForkNumber forknum, int nseg){if (nseg == 0){if (reln->md_num_open_segs[forknum] > 0){pfree(reln->md_seg_fds[forknum]);reln->md_seg_fds[forknum] = NULL;}}else if (reln->md_num_open_segs[forknum] == 0){reln->md_seg_fds[forknum] = MemoryContextAlloc(MdCxt, sizeof(MdfdVec) * nseg);}else{reln->md_seg_fds[forknum] = repalloc(reln->md_seg_fds[forknum],sizeof(MdfdVec) * nseg);}reln->md_num_open_segs[forknum] = nseg;}

3.6 _mdnblocks

获取单个文件占用的block数,用的整除没有把partial block计算在内。

/*
* Get number of blocks present in a single disk file*/static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg){off_t        len;// 从文件开始一直到EOF,该文件占用的字节数。len = FileSize(seg->mdfd_vfd);if (len < 0)ereport(ERROR,(errcode_for_file_access(),errmsg("could not seek to end of file \\"%s\\": %m",FilePathName(seg->mdfd_vfd))));/* note that this calculation will ignore any partial block at EOF */return (BlockNumber) (len / BLCKSZ);}

4. mdextend

在指定的关系中增加1个block。

/*
*    mdextend() -- Add a block to the specified relation.**        The semantics are nearly the same as mdwrite(): write at the*        specified position.  However, this is to be used for the case of*        extending a relation (i.e., blocknum is at or beyond the current*        EOF).  Note that we assume writing a block beyond current EOF*        causes intervening file space to become filled with zeroes.*/void mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,char *buffer, bool skipFsync){off_t        seekpos;int            nbytes;MdfdVec    *v;// 这个断言时间成本高,因统计一个关系的所有block数量,要打开所有的段,每个段要用lseek定位到文件末尾获取大小,大文件lseek定位到文件尾慢。/* This assert is too expensive to have on normally ... */#ifdef CHECK_WRITE_VS_EXTENDAssert(blocknum >= mdnblocks(reln, forknum));#endif// 一个relation的大小不能超过0xFFFFFFFF个块,最大的块号大小为0xFFFFFFFE// 判断走不到这里,上层buffer manager先行判断/** If a relation manages to grow to 2^32-1 blocks, refuse to extend it any* more --- we mustn't create a block whose number actually is* InvalidBlockNumber.  (Note that this failure should be unreachable* because of upstream checks in bufmgr.c.)*/if (blocknum == InvalidBlockNumber)ereport(ERROR,(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),errmsg("cannot extend file \\"%s\\" beyond %u blocks",relpath(reln->smgr_rlocator, forknum),InvalidBlockNumber)));// 获取段号v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);// 获取块号seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);// pwrite写入,因为信号打断或者没有足够的物理存储空间或者软资源限制,写入的字节数不够nbytes或者返回-1(出错)if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ){if (nbytes < 0)ereport(ERROR,(errcode_for_file_access(),errmsg("could not extend file \\"%s\\": %m",FilePathName(v->mdfd_vfd)),errhint("Check free disk space.")));/* short write: complain appropriately */ereport(ERROR,(errcode(ERRCODE_DISK_FULL),errmsg("could not extend file \\"%s\\": wrote only %d of %d bytes at block %u",FilePathName(v->mdfd_vfd),nbytes, BLCKSZ, blocknum),errhint("Check free disk space.")));}if (!skipFsync && !SmgrIsTemp(reln))register_dirty_segment(reln, forknum, v);Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));}

5. mdwrite

/**    mdwrite() -- Write the supplied block at the appropriate location.**        This is to be used only for updating already-existing blocks of a*        relation (ie, those before the current EOF).  To extend a relation,*        use mdextend().*/void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,char *buffer, bool skipFsync){off_t        seekpos;int            nbytes;MdfdVec    *v;/* This assert is too expensive to have on normally ... */#ifdef CHECK_WRITE_VS_EXTENDAssert(blocknum < mdnblocks(reln, forknum));#endifv = _mdfd_getseg(reln, forknum, blocknum, skipFsync,EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE);if (nbytes != BLCKSZ){if (nbytes < 0)ereport(ERROR,(errcode_for_file_access(),errmsg("could not write block %u in file \\"%s\\": %m",blocknum, FilePathName(v->mdfd_vfd))));/* short write: complain appropriately */ereport(ERROR,(errcode(ERRCODE_DISK_FULL),errmsg("could not write block %u in file \\"%s\\": wrote only %d of %d bytes",blocknum,FilePathName(v->mdfd_vfd),nbytes, BLCKSZ),errhint("Check free disk space.")));}if (!skipFsync && !SmgrIsTemp(reln))register_dirty_segment(reln, forknum, v);}