> 文章列表 > 初识C++之位图与布隆过滤器

初识C++之位图与布隆过滤器

初识C++之位图与布隆过滤器

一、位图

1.位图的概念

在了解位图前,先来看这么一道题:

假设给你40亿个没有排序的不重复的无符号整数。再给你一个无符号整数,让你判断这个整数是否在这40亿个数之中。

对于这道题,一般情况下的解法有很多,例如二分查找,或者使用setmapunordered_setunordered_map,将数据放到这些容器里面,再拿着关键值进行搜索即可。

在通常情况下, 这些方法都是可行的。但是对于上面这道题,这些解法却无法使用。原因很简单,因为这里的数据量过于庞大,一共有40亿个无符号整数。我们知道,10亿byte左右就是1GB。而一个unsigned int占4个byte,40亿个就是占160亿byte。而160byte换算下来就是16GB。因此,如果使用二分法,就至少需要16GB的内存来存储数据。而如果使用红黑树,红黑树的节点之间需要有三个指针进行链接,32位系统下是4byte,64位系统下是8byte。在32位系统下,如果用红黑树,就至少需要16 + 48 = 64GB内存。而哈希桶也是需要有一个指针指向下一个节点,也就需要16 + 16 = 32GB内存。仅仅只是为了找出一个数据,就消耗几十GB的内存,很明显这是不现实的。

那么要如何解决这个问题呢?观察题目可以发现,它说的是判断一个无符号整数是否存在,这就表示它所给的所有数据都可以以给我们的那个无符号整数为基准,标定为存在不存在两种状态。而两种状态大家应该就很熟悉了,我们经常使用的0、1其实就可以用来分别表示两种状态。现在解题方法就很明显了。就是使用0和1来分别标识数据的状态,并且我们知道,计算机中每个数据的最小单位是bit,而bit就是二进制位,用0和1表示。

因此,我们就可以用一个bit位标识状态,0不存在,1存在。然后创建42亿个bit位,用这些bit位进行标识,传入的数据是几,就将第几个bit位修改为1。然后拿着给我们的那个整数去对应的bit位上看该位置是否为1即可。而我们知道,1byte = 8bit,所以42亿个bit换算下俩就是512MB。在内存占用的可接受范围内。

这种用bit位标识数据存在状态的方式,就叫做“位图”。由此,我们就可以得出位图的概念:位图,其实就是用每一个bit位来存放某种状态。适用于海量数据,数据无重复的场景,通常是用来判断某个数据存不存在。

2. 位图的模拟实现

有了位图的概念后,就可以想想位图怎么实现了。首先我们要知道,在C++中,是不允许按照bit位来开辟空间的。因此我们就要借助其他类型来开辟空间。以char为例,char的大小是一个byte,即8bit位,假设要开辟42亿个bit位大小的空间,就可以申请“42亿 / 8 + 1”个char字符,即5亿左右个字符。同时也可以将这些字符存储在一个vector中,保证数据的连续性。

而用char作为开辟空间大小的单位,要知道某个bit的位置也很简单。通过数据除以8即可知道该数据应该存在第几个char上,再通过数据模8就可以知道在这个char的第几位上。例如数字15,除8等于1,模8等于7,即15应该在下标为1的char的第7个bit位上

 位图其实在库中也是有提供的:

在这里面,最主要的接口有三个,即set、reset和test。分别用于将bit为设置为1,将bit位设置为0和测试。在这里,我们也就只简单的模拟实现这三个接口。

2.1 结构实现

namespace MyBitSet
{template<size_t N>class bitset{public:bitset(){_bits.resize(N/8 + 1, 0);}private:vector<char> _bits;};
}

结构很简单,成员变量就是一个vector<char>类型的数据,N用于记录要开多少个bit位,根据传入的数字修改特定bit即可。

注意,在开位图空间时,不是有多少个数据就开多少个空间,而是要看空间的范围。因为这里用的是直接定址法,假设有10个数据,但它的最大值是100,那么就需要开100个空间,而不是10个空间。

2.2 set实现

在实现set前,要先思考一下如何将制定的bit位修改为1。很简单,就是将1左移上要移动到的bit位的位数, 然后再将得到的数与该bit位所在的char进行或运算即可。这里假设要将某个char的第7个bit位置1。推导思路如下:

注意,这里大家可能会有两个误区。

第一个误区是,误认为bit的位数是从1开始的。其实是从0开始的。假设现在有8个bit,那么它的开头应该是第0位,结尾应该是第7位:

当然,这里的第0位不一定是最右边的那个。在小端机,即左高位右低位中,第0位就是在最右边;而在大端机,即左低位右高位中,第0位就是最左边

第二个误区,就是对左移和右移的认知上有错误。可能有人认为左移就是将bit位向左移动,右移就是将bit位向右移动。其实这个认知是有错误的。

左移应该是从低位向高位移动,右移则是从高位向低位移动。这里大家可能看不出什么区别,以1为例子:

可以看到,同样是左移,但是二进制显示的结果是完全不同的。当然,在使用位运算时我们并不需要关注大端机小端机,因为编译器会帮我们处理,不用考虑因为大端小端的影响导致的左移或右移后1会到哪个位置,该左移两位还是两位,不用考虑过多。

由此,要将某个bit位设置为1就很简单了,位运算+或运算即可。

void set(size_t x)//将x位置1
{size_t cnt = x / 8;//在第几个char上size_t pos = x % 8;//在这个char的第几个bit位_bits[cnt] |= (1 << pos);
}

2.3 reset实现

reset是要将指定bit位置为0。这里以0111 1010为例子,将该二进制的第6位置为0。首先,要将第6位置为0且保持其他位不变,就要获得1011 1111,因为1011 1111 & 0111 1010 = 0011 1010。而要获得的1011 1111,很明显,就要拿到0100 0000,拿到0100 0000很简单,直接将1左移6位即可。而要通过0100 0000获得1011 1111也很简单,直接按位取反即可,推导思路如下图:

 由此,reset的实现也很简单了:

void reset(size_t x)//将bit位置0
{size_t cnt = x / 8;//在第几个char上size_t pos = x % 8;//在这个char的第几个bit位_bits[cnt] &= ~(1 << pos);
}

2.4 test实现

test()就是用来查看某个bit位的状态的,所以实现起来也非常的简单。

bool test(size_t x)//测试,检查指定bit位的状态
{size_t cnt = x / 8;size_t pos = x % 8;return _bits[cnt] & (1 << pos);//不要用&=,因为这里不能修改bit位状态
}

注意,在这里就不要用&=了,因为这里是查看bit位的状态,并不是要修改

三、位图的部分应用

位图的实现其实非常的简单,并且在库里面也有一个bitset容器,就是提供的位图。因此在实际中,位图的实现并不是什么问题,主要还是在能否灵活的运用位图。

上文中也讲过,位图主要是适用于处理海量且无重复的数据,判断数据是否存在或者某种只有两种状态的场景。但是这仅仅只是最基础的使用,如果需要,位图也是可以适用于有重复数据,多种状态的场景的。

我们就以以下几个问题来演示。

1.给定100亿个整数,设计算法找出只出现过一次的整数

对于这道题,看到有100亿个整数的时候,大家就要知道,这里很大概率是需要用位图来解决。但是,它的要求不再是判断是否存在,而是找出只出现过一次的整数。这个条件就需要用“出现0次”,“出现1次”和“出现3次”这三种状态来进行标记。而位图是使用二进制序列,1个bit位仅仅只能标识两种状态

那么在这里应该如何解决呢?很简单,虽然1个bit位只能标记2种状态,但是2个bit位通过组合是可以标记4种状态的。因此,在这里,就可以通过用两个bit的方式来标记。而如何用两个bit位来标记呢?有两种方法,第一种就是自己控制位图,开一个2倍空间的位图,以2个bit为基准进行修改。

但是这种方式实现和控制起来都不太方便。因此还有第二种方法,就是开两个相同大小的位图, 通过比较这两个位图的同一个bit位来判断状态

这里就实现第二种方法来进行演示:

	template<size_t N>//N是用于标定要申请多少个bit位的class twobitset{public:void set(size_t x){if (!bit1.test(x) && !bit2.test(x))//0 0,该数之前出现0次,现在第一次出现,修改为0 1{bit2.set(x);}else if (!bit1.test(x) && bit2.test(x))//0 1,该数之前出现1次,现在第二次出现,修改为1 0{bit1.set(x);bit2.reset(x);}else if(bit1.test(x) && !bit2.test(x))//1 0,该数之前出现2次及以上,什么都不做{ }}void PrintOne(){for (size_t i = 0; i < N; ++i){if (!bit1.test(i) && bit2.test(i))//打印0 1,即出现一次的位置的数据{cout << i << ' ';}}cout << endl;}private:bitset<N> bit1;bitset<N> bit2;};

完成代码实现后,就可以进行测试了:

可以看到,结果确实是正确的。

看到这里,大家可能会有一个疑惑,那就是开100亿个空间,万一内存不够怎么办?如果100亿够,那1000亿个呢?内存还是不够啊。这个其实不用太过担心,因为看空间时是看的数据范围,假设开的整数是size_t的,那么哪怕它开100亿个,1000亿个,它的范围依然是在0~2^32,最多也只需要开2^32,即42亿多的空间,共计512MB。无需开100亿或1000亿个空间。

2. 给两个文件,分别有100亿个整数,我们只有1GB内存,找到两个文件交集

有了第一个问题的基础,这个问题的解决方法就很简单了。同样的,构建两个位图,将这两个文件的数据分别映射到这两个位图中。映射完成后,找这个位图中同时为1的位置即可。

实现起来和问题1是差不多的, 这里就不再演示了。

3. 一个文件有100亿个int, 1G内存,设计算法找到出现次数不超过2的所有整数

这个问题和第一个问题差不多。同样的,使用两个位图刚好对应“出现0次”, “出现1次”, “出现2次”,“出现3次及以上”4种状态即可。

但是这里还有一个问题要注意,那就是这里的数据是int,代表可能出现负数。因此,在修改bit位时,要考虑到出现负数的情况。很简单,将数据0看做位图的中间位置即可,修改位图时,小于就修改0的左边的bit位,大于则修改0的右边的bit位,在遍历时,从int的最小值开始遍历。

实现起来和问题1也基本差不多,只是在修改bit位和遍历时有点差别,这里也就不再演示了。

四、哈希切割

在了解哈希切割之前,我们先来看这么一道题:

给你一个超过100GB大小的文件,这个文件中存着ip地址,设计算法找到出现次数最多的ip地址。 

对于这道题,大家了解了位图后,看到这里有100GB大小的数据,可能第一时间就想到的是使用位图来解决。但是,这道题其实是不能用位图解决的。

原因很简单,因为它存储的是ip地址,ip地址是无法被直接看做整形的,也就无法直接映射到位图上。有人可能又会说,可以用哈希函数将ip地址转为整形解决。诚然,哈希函数转整形确实可以解决不是整形的问题,但是要知道,哈希函数转整形是会出现哈希冲突的,这就会导致统计出来的次数很可能是不准确的。并且它的要求是统计出现最多的ip地址,如果使用位图,你如何知道哪个bit位上出现的次数最多呢?前面的题是因为我们明确知道数据有几种状态,所以可以根据状态的个数来申请对应数量的位图,而这里我们无法知道一个数据出现次数的限定,也就无法使用位图解决。

既然不能使用位图,这里又有100GB大小的文件,我们就可以向另一个方向考虑,即切割。100GB大小的文件,我们可以将它切分为100个1GB大小的文件,然后将这100GB的文件分配到这100个文件中。而要统计次数也很简单,直接使用map就可以很方便的统计次数了。此时就可以将想map中传入100GB数据转化为向map中传入100个1GB的数据,map每统计完一次数据就clear一次,并将出现次数最多的ip地址保存下来。

但是,这种方法还是有问题。那就是文件数据切割后,我们无法保证相同的数据都在同一个文件中,这就可能出现有一个ip地址既在文件1出现,也在文件2、3、 4出现,导致统计错误。

由此,虽然思路可行,但不能只是单纯的进行数据切割,而是要以某种依据进行切割。由此,我们就可以想到,这种场景其实就和哈希映射很像。因此,我们可以采用哈希的思想,先将这些ip地址用哈希函数转化为整形,然后用得到的整形模上文件个数,并将整形所对应的数据放到模出来的文件编号所对应的文件中。由此,就可以让有同一个关键码的数据都存放在同一个文件中。

当然,我们知道哈希函数转整形是会出现哈希冲突的,但是这里不用担心哈希冲突的问题。因为虽然哈希冲突会导致不同的数据被存放在同一个文件,但是要知道,相同的数据一定会在同一个文件中,所以直接用map统计每个文件中出现次数最多的ip即可,每统计完一个文件就clear一次map。

到了这里,就只剩一个问题了。这里是分成了100个1GB的文件,但也可能出现一个文件中需要存放的数据超过1GB。这个时候就会出现两种情况。

情况1:

这个小文件中冲突的ip很多,都是不同的ip,大多都是不重复的map无法统计。ip不重复,就意味着可以使用哈希函数映射到不同的文件中。这种情况的解决方法就是再开几个小文件,然后用哈希函数进行映射,针对性的将这个文小文件的数据重新映射到其他小文件中,如果再次遇到空间不足,就重复上述动作,直接空间足够或者数据分配完。

情况2:

这个小文件中冲突的ip虽然很多,但是大多数都是相同的ipmap可以统计。既然数据大多数都是相同的,就意味着这些相同的数据只会在map中创建一个节点,由此就可以直接用map统计。

那么如何分辨这两种情况呢?很简单,直接用map统计,如果是第一种情况,map会因为没有内存导致无法new新的节点,创建失败抛异常。如果是第二种情况,map就会正常运行,不会报错。而我们要做的,就是在抛异常时捕获这个异常即可。当捕获到异常后,就换一个哈希函数再次切割即可。

上面这种通过哈希对数据进行切割分类的方法,就叫做“哈希切割”

当然,上面这道题就不在这里写代码实现了,里面涉及到了大量的文件操作,并不太好写,而且也不好创造出能使map失效的情景。在这里,大家需要理解的是哈希切割的作用和在类似的情景下应该有的处理方案和处理思路。

五、布隆过滤器

1. 布隆过滤器的概念

布隆过滤器是由布隆在1970年提出的一种紧凑型的、比较巧妙的概率型数据结构。特点是高效地插入和查询,可以用来告诉用户“某样东西一定不存在或者可能存在”。它是用多个哈希函数,将一个数据映射到位图结构。这种方式不仅可以提升查询效率,也可以节省大量的内存空间。

说简单点,布隆过滤器其实就是一个结合了哈希与位图的数据结构通过哈希思想进行映射和用位图来标识数据的状态

在某些情况下,有大量的数据,仅仅只需要你判断某个数据是否存在,如果使用哈希桶或者红黑树,虽然可以得出结果,但有点大材小用,并且如果数据太多且重复的数据很少,此时使用哈希桶或红黑树就需要大量的空间,无疑不太划算。面对这种情况,就可以使用位图。

但位图也有缺陷。那就是位图需要数据的范围相对集中,和要求整形。这也是直接寻址法的缺陷。因此,我们再在位图的基础上加上哈希映射,就可以将数据限定在某个范围内且可以适用于不是整形的数据。

2. 布隆过滤器的适用场景

大家知道,哈希映射其实是会出现“哈希冲突”的,这就意味着,当我们在位图中搜索某个数据时,如果该bit位是0,这个数据一定不存在。但如果该bit位是1,就意味该数据可能存在也可能不存在,因为这个bit位上的1可能是其他数据映射进来的。

因此,准确来说,布隆过滤器并不适合所有判断数据是否存在的场景,而是适合于对判断数据是否存在有一定容错率的场景。例如我们玩游戏的时候起名,有些游戏是不允许重名的,此时就可以使用布隆过滤器。因为一个id所对应的bit位如果是0,则表示该id一定未被使用。但如果是1,则表示可能被使用也可能未被使用。此时用户并不知道该id是否被使用,所以哪怕该id未被使用,但在布隆过滤器中被标定为已使用,出现误判,然后告诉用户这个id已被使用,用户也无从查证只能接受并更换id。在这种允许出现一定程度的误判的场景下,就可以使用布隆过滤器。

大家要知道,在网络中获取数据时,数据一般都是放在远端服务器的数据库中的,从本地到远端服务器查找数据再返回数据其实是比较耗时的。而用布隆过滤器充当媒介,就可以很方便的判断一个数据是否存在,存在就进入数据库中查找,不存在则直接返回,无需进入数据库。

通过这种方法,就可以提升从服务器中查找数据的效率。因此,这种数据结构被叫做“布隆过滤器”,原因就是这个数据结构可以用来对数据筛选过滤。

3. 布隆过滤器的优化

布隆过滤器因为使用了哈希映射,在数据足够多的情况下就会出现“哈希冲突”。那有没有什么办法能降低出现“哈希冲突”的概率呢?很简单,进行多次映射就可以了。因为一个数据如果只映射一次,它的冲突概率是很大的。但是,如果映射多次,例如进行两次映射,当要判断一个数据是否存在时,就分别检查它所映射的两个位置是否为1,都会1时才存在,只要有1个为0,就表示该数据不存在。但是,这种方法也仅仅只能降低哈希冲突的概率,不能消除。

4. 布隆过滤器的实现

布隆过滤器的实现其实是很简单,开辟一个位图,然后利用哈希函数进行多次映射并修改对应bit位的状态即可。

那么针对不同的数据个数,一般要开多少bit位呢?针对这个问题,有人进行过研究,下图就是他的研究中的结果:

这张图就对哈希函数个数和插入元素的个数在不同布隆过滤器长度下的误报率进行了研究。最终得出了如下公式:

该公式得出了子啊布隆过滤器不同长度和不同插入元素个数下的最佳哈希函数个数。

4.1 结构设置

这里就只设置3个哈希函数,将公式变形为:m = k*n / 0.7。由此得出:m ≈ 4.2*n,向前进位得出m = 5*n。

由此,按照这个结果开辟bit位:

namespace BloomFilter
{template<size_t N, //要存储的数据个数class K = string,//要存储的数据类型,默认传string便于测试class HashFunc1 = BKDRHash,//三个hash函数用于将数据转为整形class HashFunc2 = APHash,class HashFunc3 = DJBHash>class bloomfilter{private:std::bitset<N * 5> _bf;//开辟数据个数5倍的bit位};
}

4.2 set()函数

void set(const K& key)
{size_t hash1 = HashFunc1()(key) % (N * 5);//得出了结果要模N*5,因为bit位的个数就是N*5size_t hash2 = HashFunc2()(key) % (N * 5);size_t hash3 = HashFunc3()(key) % (N * 5);_bf.set(hash1);//将该位置的bit为设置为1_bf.set(hash2);_bf.set(hash3);
}

4.3 test()函数设置

bool test(const K& key)
{size_t hash1 = HashFunc1()(key) % (N * 5);if (!_bf.test(hash1)){return false;}size_t hash2 = HashFunc2()(key) % (N * 5);if (!_bf.test(hash2)){return false;}size_t hash3 = HashFunc3()(key) % (N * 5);if (!_bf.test(hash3)){return false;}return true;//到这里,说明三个位都是1,此时数据可能存在,也可能不存在
}

4.4 hash函数提供

这里就只提供了三个hash函数,如果有需要,大家可以自行搜索其他哈希函数。当然,为了方便测试,这里传的是三个将string转整形的哈希函数。如果有其他类型需要,可以自己写或在此基础上改造

struct BKDRHash
{size_t operator()(const string& s){size_t hash = 0;for (const auto& ch : s){hash = hash * 131 + ch;}return hash;}
};struct APHash
{size_t operator()(const string& s){size_t hash = 0;int i = 0;for (auto ch : s){if ((i & 1) == 0)hash ^= ((hash << 7) ^ (ch++) ^ (hash >> 3));elsehash ^= (~((hash << 11) ^ (ch++) ^ (hash >> 5)));++i;}return hash;}
};struct DJBHash
{size_t operator()(const string& s){size_t hash = 5381;for (auto ch : s){hash += (hash << 5) + ch;}return hash;}
};

5. 布隆过滤器测试

初步模拟实现了布隆过滤器,就可以来测试一下这个程序了。

 通过上面的测试程序,就可以看到,在当前数据量较少的情况下,并没有出现哈希冲突,并且也可以正常运行。

然后我们再写一个程序,测试在有大量数据的情况下的相似率:

void TestBloomFilter2()
{srand(time(0));const size_t N = 10000;BloomFilter::bloomfilter<N> bf;vector<string> v1;string ur1 = "https://blog.csdn.net/Masquerena114514?spm=1000.2115.3001.5343";for (size_t i = 0; i < N; ++i){v1.push_back(ur1 + to_string(i));}for (auto& str : v1){bf.set(str);}//v2和v1是相似字符串集vector<string> v2;string ur2 = "https://blog.csdn.net/Masquerena114514?spm=1000.2115.3001.5343";for (size_t i = 0; i < N; ++i){	ur2 += to_string(99999 + i);v2.push_back(ur2);}size_t n2 = 0;for (auto& str : v2){if (bf.test(str)){++n2;}}cout << "相似字符串误判率:" << (double)n2 / (double)N << endl;//v3与v1是不相似字符串vector<string> v3;for (size_t i = 0; i < N; ++i){string ur3 = "zhihu.com";ur3 += to_string(rand());v3.push_back(ur3);}size_t n3 = 0;for (auto& str : v3){if (bf.test(str)){++n3;}}cout << "不相似字符串误判率:" << (double)n3 / (double)N << endl;
}

当前是1w个数据,运行该程序:

可以看到,当有1w个数据时,就已经出现了哈希冲突,误判率大致在9%左右。这里就不再进行测试了,如果大家有兴趣,可以自行修改数据,例如多传几个哈希函数,或者减少传入的数据个数,来看不同情况下的误判率大致为多少。