> 文章列表 > 【分布式 论文】之 1. MapReduce——Simplified Data Processing on Large Clusters

【分布式 论文】之 1. MapReduce——Simplified Data Processing on Large Clusters

【分布式 论文】之 1. MapReduce——Simplified Data Processing on Large Clusters

文章目录

  • 1. 需求 / 现存问题
  • 2. 总述
  • 3. 实现
    • 3.1 概述
    • 3.2 Master的数据结构
    • 3.3 容错性
      • 3.3.1 worker节点故障
      • 3.3.2 master节点故障
      • 3.3.3 故障环境下的语义
    • 3.4 位置(Locality)
    • 3.5 任务粒度
    • 3.6 备份任务
  • 4. 对MapReduce的扩展
    • 4.1 划分函数
    • 4.2 排序保证
    • 4.3 聚合器函数
    • 4.4 输入输出格式
    • 4.5 副作用(side-effects)
    • 4.6 跳过坏(bad)记录
    • 4.7 本地执行
    • 4.8 状态信息
    • 4.9 计数器

1. 需求 / 现存问题

输入数据通常很大,为了在合理的时间内完成计算,必须将计算分布到数百或数千台机器上。

如何并行化计算、分发数据和处理故障等问题使得原本简单的计算变得晦涩难懂,需要大量复杂的代码来处理这些问题。

2. 总述

提出了一种新的抽象化方法,使我们能够表达我们试图执行的简单计算,同时隐藏了并行化、容错、数据分发和负载平衡等混乱的细节。

  • 大多数的计算都涉及对输入中的每个逻辑“记录”应用一个map操作,以计算一组中间键/值对;
  • 然后对所有共享相同键的值应用reduce操作,以适当地组合导出的数据。
  • 使用基于函数式编程模型的用户指定map和reduce操作的方法,使我们能够轻松地并行化大型计算,并将重新执行作为容错的主要机制

这篇文章最终提供了简单而强大的接口,用户可以实现这个接口来应用于自己的业务。

3. 实现

MapReduce接受一组输入键值对,并输出一组键值对。将其表示为两个函数:Map和Reduce,均由用户编写。

  • Map:接收一个输入键/值对,并生成一组中间键/值对。MapReduce库将所有与相同中间键I相关联的中间值组合在一起,并将它们传递给Reduce函数。
  • Reduce:接受一个中间键I和该键对应的一组值,并将这些值合并在一起以形成可能更小的一组值。通常,每次Reduce调用只会生成零个或一个输出值。中间值通过迭代器提供给用户的Reduce函数,这使得我们能够处理太大而无法放入内存的值列表。

举个例子——倒排索引:

  • Map函数:解析每个文档,并发出一个<单词序列,文档ID>对。
  • Reduce函数:接受给定单词的所有对,对相应的文档ID进行排序,并发出一个<word, list(文档ID)>对。

所有输出对的集合形成一个简单的倒排索引。很容易扩大这个计算来跟踪单词的位置。

3.1 概述

  • Map调用:
    • Map调用分布在多台机器上。
    • 将输入数据自动划分为M个块,
    • 输入的块可以由不同的机器并行处理。
  • Reduce调用:是通过使用分区函数(例如,hash(key) mod R)将中间键空间划分为R块来分配的。分区的数量®和分区函数由用户指定。

MapReduce操作的总体流程如图:
【分布式 论文】之 1. MapReduce——Simplified Data Processing on Large Clusters

  • (1)用户程序中的MapReduce库首先将输入文件分割成M块,通常为16MB到64MB(用户可以通过一个可选参数进行控制)。然后在一组机器上启动程序的许多副本。
  • (2)程序的一个副本是特殊的——主副本(master)。其余的worker是由master分配任务的worker。有M个map任务和R个reduce任务需要分配。master会挑选空闲的worker,给每个worker分配一个map任务或者reduce任务。
  • (3)被分配一个map任务的worker读取相应的输入块的内容。它从输入数据中解析键/值对,并将每个键/值对传递给用户定义的Map函数。Map函数产生的中间键/值对被缓冲在内存中。
  • (4)周期性地将缓冲对写入本地磁盘,通过分区函数将缓冲对划分为R个区域。将这些缓冲对在本地磁盘上的位置传递回master,master负责将这些位置转发给reduce worker。
  • (5)当master通知reduce worker这些位置时,它使用rpc(远程过程调用)从map worker的本地磁盘读取缓冲数据。当reduce worker读取了所有中间数据后,它会根据中间键对数据进行排序,这样 所有相同键的occurrences(这里的occurrences,博主认为可以是映射为相同的key的任何形式的数据)都会被分组到一起。并且这里的排序过程是有必要的,因为通常有许多不同的键映射到同一个reduce任务。如果中间数据的数量太大而无法装入内存,则使用外部排序。
  • (6)reduce worker遍历排序的中间数据,对于遇到的每个唯一的中间键,它将键和相应的中间值集传递给用户的reduce函数。Reduce函数的输出被追加到这个Reduce分区的最终输出文件中。
  • (7)当所有map任务和reduce任务完成后,master将唤醒用户程序。此时,用户程序中的MapReduce调用返回到用户代码。

成功完成后,MapReduce执行的输出存储在R文件中以供使用(每个reduce任务会有一个输出文件,文件名由用户指定)。通常,用户不需要将这些R输出文件合并到一个文件中——他们通常将这些文件作为输入传递给另一个MapReduce调用,或者从另一个分布式应用程序中使用它们(该应用程序能够处理划分为多个文件的输入)。

3.2 Master的数据结构

master存储:

  • 每个map/reduce任务的状态(闲置/进行中/已完成)
  • (非空闲的)worker机器的标识

master是一个管道,负责将中间文件区域的位置从map任务转播到reduce任务。 因此,对于每个已完成的map任务,master会存储map任务产生的R个中间文件区域的位置和大小。当map任务完成时,master将接收到对此位置和大小信息的更新,并将该信息推送给所有正在进行中的reduce任务的worker。

3.3 容错性

3.3.1 worker节点故障

master定期向worker发送ping。 如果在一定时间内没有收到worker的响应,master将该worker标记为failed。由该worker完成的map任务都会被重置回初始空闲状态,并可以被调度到其他工作节点上。类似地,任何在failed worker上进行中的map/reduce任务也会被重置为空闲状态,并且变得可以重新调度。

已完成的Map任务会在节点故障时进行重新执行,因为它们的输出结果存储在故障节点的本地磁盘上,因此无法访问。已完成的Reduce任务则不需要重新执行,因为其输出结果存储在全局文件系统中。

当一个Map任务首先由Worker A执行,然后由Worker B执行(因为Worker A故障),执行Reduce任务的所有Worker都会收到重新执行的通知。任何尚未从Worker A读取数据的Reduce任务将从Worker B读取数据。

3.3.2 master节点故障

在 MapReduce 中,可以定期对 master 数据结构进行检查点的方式进行备份。如果 master 任务出现故障,则可以从最后一个检查点的状态开始启动一个新的任务。然而,由于只有一个 master,其出现故障的概率很小,因此我们当前的实现方式是在 master 出现故障时中止 MapReduce 计算。客户端可以检查此条件并在需要时重试 MapReduce 操作。

3.3.3 故障环境下的语义

当用户提供的Map和Reduce操作是它们输入值的确定性函数时,MapReduce的分布式实现会产生与整个程序非故障顺序执行所产生的相同输出。

MapReduce是依赖Map和Reduce任务输出的原子性提交来保证这个特性的。每个正在进行的任务将其输出写入私有临时文件。一个Reduce任务生成一个私有临时文件,而一个Map任务生成R个这样的文件(每个Reduce任务一个文件)。当Map任务完成时,Worker向Master发送一条消息,该消息内容包含R个临时文件的名称。

  • 如果Master接收到已经完成的Map任务的完成消息,则会忽略该消息。
  • 否则,master会将R个文件的名称记录在Master数据结构中。

Reduce任务的输出如何被写入到最终输出文件中?(在MapReduce中如何确保输出结果的正确性,即在多个Reduce任务之间如何避免数据冲突?)

每个Reduce任务都会生成一个临时输出文件,当一个Reduce任务完成后,Reduce Worker会使用底层文件系统提供的原子重命名操作将临时输出文件重命名为最终输出文件,从而确保输出结果的一致性和正确性。

如果同一Reduce任务在多台机器上执行,则会对同一个最终输出文件执行多次重命名操作,但由于底层文件系统提供的原子重命名操作是具有原子性的,因此最终文件系统状态仅包含一个Reduce任务执行生成的数据。这样,即使有多个Reduce任务在同时执行,也可以保证最终输出结果的正确性。

假设有两个Reduce任务 R1 和 R2,它们都需要写入一个文件f。如果没有原子重命名操作,它们可能会在同一时间尝试重命名自己的临时输出文件到最终输出文件f,这会导致文件f包含混合的输出结果。但是,由于有原子重命名操作的保证,只有一个Reduce任务能够成功地重命名其临时输出文件到最终输出文件f,从而保证最终文件系统状态只包含一个Reduce任务执行生成的数据。

MapReduce框架在处理任务时的语义问题。

在大多数情况下,MapReduce中的map和reduce操作是确定性的,也就是说对于相同的输入数据,它们总是会生成相同的输出结果。因此,MapReduce在这种情况下能够保证输出结果与串行执行的结果是相同的,这使得程序员很容易理解程序的行为。

但是,当map和reduce操作是非确定性的,即对于相同的输入数据,它们可能会生成不同的输出结果时,MapReduce提供的语义就不如前面的强了,但仍然是合理的。在这种情况下,特定的reduce任务R1的输出与非确定性程序的串行执行所产生的R1的输出是相等的。但是,另一个reduce任务R2的输出则可能对应于非确定性程序的另一个串行执行所产生的R2的输出

具体解释一下:当MapReduce操作中存在非确定性操作时,会导致语义变弱。

考虑一个Map任务M和两个Reduce任务R1和R2。用e(R1)表示由R1提交的执行(只有一个这样的执行)。由于Map任务M的输出是非确定性的,可能会有多个不同的执行结果。因此,执行R1时可能会读取由M的一个执行产生的输出,而执行R2时可能会读取由M的另一个执行产生的输出。这样,由于输出是不确定的,即可能有多个版本,就会导致MapReduce操作的语义变得更弱,即结果的正确性无法保证。

3.4 位置(Locality)

主要说明:如何在一个计算环境中节省网络带宽资源?

具在MapReduce计算模型中,利用Google File System (GFS) 本地磁盘存储输入数据来减少网络通信。

首先,GFS将每个文件分成64 MB的块,并在不同的机器上存储多个副本(通常是3个副本)。MapReduce的Master会考虑输入文件的位置信息,并尝试在存储相应输入数据副本的机器上调度Map任务。如果失败,则会尝试在离输入数据副本最近的机器上调度Map任务(例如,在与存储数据的机器相同的网络交换机上的worker机器上)。这样,在对集群中的大部分Worker运行大型MapReduce操作时,大多数输入数据都是本地读取的,不占用网络带宽。

3.5 任务粒度

这一小节,讨论如何划分MapReduce任务的大小。

文章将Map阶段细分为M个部分,将Reduce阶段细分为R个部分。

理想情况下,M和R应该远大于worker机器的数量。 让每个worker执行许多不同的任务,这样可以提高动态负载平衡 和 某个worker故障时的恢复速度(即,将故障worker已经完成的Map任务分散到其他所有的worker上)。

但是,现实情况中,对于M和R的大小有实际限制。 因为master需要作O(M+R)个调度决策,并保存O(M*R)的状态。(内存中的常数因子很小:状态中的O(M*R)部分大约是每个map任务/ reduce任务对的状态只需要大约1字节的数据)。

此外,因为reduce阶段的每个任务的输出都在单独的输出文件中,所以用户通常对R也有限制。作者建议M的大小应该是每个任务大约16MB至64MB的输入数据大小,并且R应该是我们预计使用的工作机器数量的小倍数。作者提到,通常使用M = 200,000和R = 5,000,使用2,000个工作机器来执行MapReduce计算。

3.6 备份任务

MapReduce 操作中常见的导致总时间延长的原因之一是“straggler”:一台机器在计算中最后几个 map 或 reduce 任务中需要异常长的时间来完成。

straggler 可以出现的原因可能有:

  • 磁盘有问题的机器可能会遇到频繁的可纠正错误,将读取性能从30 MB/s减慢到1 MB/s。
  • 群集调度系统可能已经在机器上安排了其他任务,导致机器由于竞争 CPU、内存、本地磁盘或网络带宽而执行 MapReduce 代码更慢。
  • 机器初始化代码中的一个错误,导致处理器缓存被禁用:受影响的机器上的计算速度降低了一百倍以上

作者提供了一种解决“straggler”问题的通用机制。——在MapReduce操作即将完成时,主节点会为剩余未完成的任务安排备份执行。只要主执行或备份执行中的任何一个执行完毕,任务就被标记为已完成。

作者团队已经调整了这个机制,使它通常增加操作使用的计算资源不超过几个百分点。我们发现这显著减少了完成大型 MapReduce 操作所需的时间。

4. 对MapReduce的扩展

本节介绍除map和reduce函数之外的一些有用的扩展。

4.1 划分函数

主要解决:MapReduce中如何将数据划分到reduce任务中。

MapReduce的用户可以指定他们想要的reduce任务/输出文件数量(R)。使用中间键上的分区函数将数据分配到这些任务中。

MapReduce提供了一个默认的分区函数,它使用哈希(例如,“hash(key) mod R”)来进行分区。

但是,在某些情况下,按键值进行哈希并不足以满足特定需求(例如,在处理URL时,我们可能需要将所有来自同一主机的条目放在同一个输出文件中)。为了支持这种情况,MapReduce库的用户可以提供一个特殊的分区函数

4.2 排序保证

本小节核心:MapReduce保证在给定分区内中间键/值对按递增键顺序处理,并介绍了排序的优点。

MapReduce保证在给定分区内中间键/值对按递增键顺序处理。

这种排序保证使得每个分区生成排序的输出文件变得容易,因为可以简单地将中间结果写入到一个临时文件中,并在reduce任务完成后将其合并成一个有序的输出文件。

这对于需要支持通过键进行高效随机访问查找或者输出数据的用户非常有用,因为它们可以直接使用已经排好序的输出文件进行查找和操作,而不需要再次进行排序。

4.3 聚合器函数

本小节核心:主要介绍了MapReduce中如何处理中间键存在重复情况。

在某些情况下,每个map任务产生的中间键存在显著的重复(例如,单词计数任务,每个map都会产生各自的key/value,但是不同map直接是存在大量重复的)。这可能会导致网络拥塞和性能下降。

为了解决这个问题,MapReduce允许用户指定一个可选的Combiner函数,对数据进行部分合并,以减少数据在网络上传输的数量。Combiner函数会在执行Map任务的每个计算机上执行,并且通常使用相同的代码来实现Combiner和Reduce函数。

Combiner函数和Reduce函数之间唯一的区别是MapReduce库如何处理函数的输出:

  • Reduce函数的输出被写入最终输出文件;
  • 而Combiner函数的输出被写入一个中间文件,然后发送到Reduce任务。

4.4 输入输出格式

本小节核心:MapReduce支持用户自定义形式的输入/输出类型。

用户可以通过提供一个简单的读取器接口的实现来添加对新输入类型的支持。读取器不一定需要提供从文件中读取的数据。例如,可以定义一个从数据库或映射在内存中的数据结构中读取记录的读取器。

例如,“文本”模式输入将每行视为一个键/值对:键是文件中的偏移量,值是行的内容。另一种常见的支持格式是按键排序的键/值对序列。每个输入类型实现都知道如何将自身拆分为有意义的范围,以便作为单独的映射任务进行处理(例如,文本模式的范围拆分确保范围拆分仅在行边界处发生)。

类似地,MapReduce支持一组输出类型,以便以不同的格式生成数据,用户通过代码添加对新输出类型的支持。

4.5 副作用(side-effects)

本小节核心:主要介绍了MapReduce中如何处理辅助文件作为附加输出以及如何保证它们具有原子性和幂等性。

具体来说,MapReduce允许用户从他们的map和/或reduce操作符中产生辅助文件作为附加输出。然而,为了保证这些副作用具有原子性和幂等性,我们依赖应用程序编写者来实现它们。通常情况下,应用程序会将数据写入一个临时文件,并在完全生成后将该文件原子重命名。

另外,MapReduce不支持由单个任务产生的多个输出文件的原子两阶段提交。因此,如果任务需要产生具有跨文件一致性要求的多个输出文件,则应该是确定性的。这意味着,对于相同的输入数据和相同的执行环境,任务应该总是产生相同的输出文件。

博主对于”MapReduce不支持由单个任务产生的多个输出文件的原子两阶段提交“这句话的理解,如下:

这意味着,如果一个任务需要产生多个输出文件,并且这些文件之间有一致性要求,那么这些文件必须是确定性的,即对于相同的输入数据和相同的执行环境,任务应该总是产生相同的输出文件。

举个例子来说,假设我们有一个MapReduce任务需要从一个大型日志文件中提取出所有错误信息,并将它们写入两个不同的输出文件:一个包含所有严重错误信息,另一个包含所有警告信息。由于这两个输出文件之间存在一致性要求(例如,在某些情况下,警告信息可能需要引用严重错误信息),因此我们需要确保它们在提交时具有原子性。

然而,由于MapReduce不支持原子两阶段提交多个输出文件,我们不能将这两个输出文件作为一个事务进行原子性提交。相反,我们必须确保任务是确定性的,并且对于相同的输入数据和相同的执行环境总是产生相同的输出文件。例如,在每次运行任务之前清空输出目录并使用固定名称来命名每个输出文件。

4.6 跳过坏(bad)记录

本小节核心:介绍了MapReduce框架中如何处理数据中的错误和异常情况,并提供了一种可选的执行模式来跳过导致程序崩溃的记录。。

一些情况下用户代码中可能会出现一些bug,这些bug可能会导致Map或Reduce函数在某些特定的记录上出现崩溃,从而阻止整个MapReduce操作的完成。通常的做法是修复这些bug,但有些情况下并不可行,比如说出现了源代码不可用的第三方库的bug,或者只需要忽略少量的记录,例如在处理大型数据集的统计分析时。MapReduce提供了一种可选的执行模式,可以检测到导致崩溃的记录,并跳过这些记录以便向前推进。

**MapReduce框架中的错误处理机制的实现方式。**每个worker进程都安装了一个信号处理程序,用于捕获分段违规和总线错误。在调用用户的Map或Reduce操作之前,MapReduce库会将参数的序列号存储在全局变量中。如果用户代码生成了信号,信号处理程序会向MapReduce主进程发送一个“最后的尝试”UDP数据包,其中包含序列号。当主进程看到同一个记录发生了多个故障时,就会在下一次重新执行相应的Map或Reduce任务时指示跳过该记录。这种机制可以避免由于一些崩溃导致整个MapReduce任务无法完成。

4.7 本地执行

本小节核心:为了方便调试和测试,MapReduce库中提供了一个替代实现,只在一台本地机器上顺序执行 MapReduce 操作的所有工作,而不是在分布式系统上并行执行。

开发了一个MapReduce库的替代实现,它在本地机器上按顺序执行MapReduce操作的所有工作。提供了控制选项,使用户可以将计算限制在特定的Map任务中。用户使用特殊标志调用程序,然后可以轻松地使用任何有用的调试或测试工具(例如gdb)。

4.8 状态信息

本小节核心:MapReduce框架通过内部HTTP服务器提供状态页面,展示任务完成情况、处理速率、中间数据等,以及失败任务的信息。

master节点运行一个内部HTTP服务器,并导出一组用于人类消费的状态页面。状态页面显示计算的进度,例如已完成多少任务,有多少正在进行,输入字节,中间数据字节,输出字节,处理速率等等。这些页面还包含了每个任务生成的标准错误和标准输出文件的链接。用户可以使用这些数据来预测计算需要多长时间,以及是否应该添加更多资源来加速计算。这些页面还可用于确定计算速度远低于预期的原因。

此外,顶层状态页面显示了哪些工作节点失败以及它们在失败时正在处理哪些Map和Reduce任务。这些信息对于尝试诊断用户代码中的错误很有用。

4.9 计数器

本小节核心:MapReduce提供了计数器功能来统计各种事件的发生次数。

要使用此功能,用户代码创建一个命名的计数器对象,然后在Map和/或Reduce函数中适当地递增计数器。

例如:
【分布式 论文】之 1. MapReduce——Simplified Data Processing on Large Clusters

从各个工作机器的计数器值定期传播到主节点(附加在ping响应上)。主节点聚合成功的map和reduce任务的计数器值,并在MapReduce操作完成时将其返回给用户代码。当前计数器值也会显示在主状态页面上,以便人们可以观察实时计算的进度。在聚合计数器值时,主节点消除了相同map或reduce任务的重复执行的影响,以避免重复计数。(重复执行可能是由于备份任务和由于故障重新执行任务导致的。)

MapReduce库自动维护一些计数器值,例如处理的输入键/值对数和产生的输出键/值对数。

用户发现计数器功能对于检查MapReduce操作的行为非常有用。例如,在某些MapReduce操作中,用户代码可能希望确保产生的输出对数恰好等于处理的输入对数,或者处理的德语文档的比例在可容忍的总文档数量的一定比例内。

后续追更……