2
0
Fork 0
mirror of https://github.com/Vonng/ddia.git synced 2026-06-21 00:47:05 +08:00
ddia/ch10.md
MuAlex 9de8dbd1bf Ch10 translate
finish the Ch10 manual translate
2018-05-11 23:48:01 +08:00

111 KiB
Raw Blame History

10. 批处理

带有太强个人色彩的系统无法成功。当最初的设计完成并且相对稳定时,不同的人们以自己的方式进行测试,真正的考验才开始。

——高德纳


[TOC]

在本书的前两部分中,我们讨论了很多关于请求和查询以及相应的响应或结果。许多现有数据系统中都采用这种数据处理方式:你发送请求指令,一段时间后(我们期望)系统会给出一个结果。数据库缓存搜索索引Web服务器以及其他一些系统都以这种方式工作。

像这样的线上系统无论是浏览器请求页面还是调用远程API的服务我们通常认为请求是由用户触发的并且正在等待响应。他们不应该等太久所以我们非常关注系统的响应时间参阅“描述性能”)。

Web和越来越多的基于HTTP/REST的API使交互的请求/响应风格变得如此普遍,以至于很容易将其视为理所当然。但我们应该记住,这不是构建系统的唯一方式,其他方法也有其优点。我们来看看三种不同类型的系统:

服务(线上系统)

  • 服务等待客户的请求或指令到达。每收到一个,服务会试图尽快处理它,并发回一个响应。响应时间通常是服务性能的主要衡量指标,可用性通常非常重要(如果客户端无法访问服务,用户可能会收到错误消息)。

批处理系统(线下系统)

  • 一个批处理系统有大量的输入数据,跑一个作业(job)来处理它,并生成一些输出数据,这往往需要一段时间(从几分钟到几天),所以通常不会有用户等待作业完成。相反,批量作业通常会定期运行(例如,每天一次)。批处理作业的主要性能衡量标准通常是吞吐量(处理特定大小的输入所需的时间)。本章中讨论的就是批处理。

流处理系统(近实时系统)

  • 流处理介于线上和线下(批处理)之间,所以有时候被称为近实时或近线(nearline)处理。像批处理系统一样流处理消费输入并产生输出并不需要响应请求。但是流式作业在事件发生后不久就会对事件进行操作而批处理作业则需等待固定的一组输入数据。这种差异使流处理系统比起批处理系统具有更低的延迟。由于流处理基于批处理我们将在第11章讨论它。

正如我们将在本章中看到的那样批处理是构建可靠可扩展和可维护应用程序的重要组成部分。例如2004年发布的批处理算法Map-Reduce可能有点过于热门被称为“造就Google大规模可扩展性的算法”被称为“造就Google大规模可扩展性的算法”[2]。随后在各种开源数据系统中得到应用包括HadoopCouchDB和MongoDB。

与多年前为数据仓库开发的并行处理系统【3,4】相比MapReduce是一个相当低级别的编程模型但它在现有硬件水平基础上迈出了处理大数据重要的一步。虽然MapReduce的重要性正在下降【5】但它仍然值得理解因为它提供了一个清晰的画面来阐述批处理为什么以及如何有用。

实际上批处理是一种非常古老的计算方式。早在可编程数字计算机诞生之前打孔卡制表机例如1890年美国人口普查【6】中使用的霍尔里斯机实现了半机械化的批处理形式从大量输入中汇总计算。 Map-Reduce与1940年代和1950年代广泛用于商业数据处理的机电IBM卡片分类机器有着惊人的相似之处[7]。正如我们所说,历史总是在不断重演。

在本章中我们将了解MapReduce和其他一些批处理算法和框架并探索它们在现代数据系统中的作用。但首先我们将看看使用标准Unix工具的数据处理。即使你已经熟悉了它们Unix的哲学也值得一读Unix的思想和经验教训可以转移到大规模异构的分布式数据系统中。

使用Unix工具的批处理

我们从一个简单的例子开始。假设您有一台Web服务器每次处理请求时都会在日志文件中附加一行。例如使用nginx默认访问日志格式日志的一行可能如下所示

216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1" 
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"

(实际上这只是一行,分成多行只是为了便于阅读)。这一行中有很多信息。为了理解它,你需要看看日志格式的定义,如下所示:

 $remote_addr - $remote_user [$time_local] "$request"
 $status $body_bytes_sent "$http_referer" "$http_user_agent"

日志的这一行表明在2015年2月27日17:55:11 UTC服务器从客户端IP地址216.58.210.78接收到对文件/css/typography.css的请求。用户没有被认证,所以$remote_user被设置为连字符(- 。响应状态是200即请求成功响应的大小是3377字节。网页浏览器是Chrome 40并且加载了该文件因为在URL http://martin.kleppmann.com/的页面中引用。

分析简单日志

很多工具可以利用这些日志文件产生关于你网站流量的漂亮的报告但为了练手让我们使用基本的Unix功能创建自己的工具。 例如,假设你想在你的网站上找到五个最受欢迎的网页。 你可以在Unix shell中这样做1

cat /var/log/nginx/access.log | #1
	awk '{print $7}' | #2
	sort             | #3
	uniq -c          | #4
	sort -r -n       | #5
	head -n 5          #6
  1. 读取日志文件
  2. 将每一行按空格分割成不同的字段每行只输出第七个字段恰好是请求的URL。在我们的例子中是/css/typography.css
  3. 按字母顺序排列请求的URL列表。如果某个URL被请求过n次那么排序后文件在一行中会重复出现n次该URL。
  4. uniq命令通过检查两个相邻的行是否相同来过滤掉输入中的重复行。 -c则表示输出一个计数器对于每个不同的URL它会报告输入中出现该URL的次数。
  5. 第二种排序按每行起始处的数字(-n排序这是URL的请求次数。然后逆序-r返回结果大的数字在前。
  6. 最后,只输出前五行(-n 5并丢弃其余的。该系列命令的输出如下所示
    4189 /favicon.ico
    3631 /2013/05/24/improving-security-of-ssh-private-keys.html
    2124 /2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
    1369 /
     915 /css/typography.css

如果你不熟悉Unix工具上面的命令行可能看起来有点吃力但是它非常强大。它能在几秒钟内处理几GB的日志文件并且您可以根据需要轻松修改命令。例如如果要从报告中省略CSS文件可以将awk参数更改为'$7 !~ /\.css$/ {print $7}',如果想统计最多的客户端IP地址,可以把awk参数改为'{print $1}'等等。

我们不会在这里详细探索Unix工具但是它非常值得学习。令人惊讶的是使用awksedgrepsortuniq和xargs的组合可以在几分钟内完成许多数据分析并且它们的性能相当的好[8]。

命令链与自定义程序

抛开Unix命令链你可以写一个简单的程序来做同样的事情。例如在Ruby中它可能看起来像这样

counts = Hash.new(0)         # 1
File.open('/var/log/nginx/access.log') do |file| 
    file.each do |line|
        url = line.split[6]  # 2
        counts[url] += 1     # 3
    end
end

top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4
top5.each{|count, url| puts "#{count} #{url}" }                   # 5
  1. counts是一个存储计数器次数的哈希表,初始情况下每个网址对应的计数为零。
  2. 我们读取日志的每一行把获取第七个空格分隔的字段这里的数组索引是6因为Ruby的数组从0开始对应的URL。
  3. 将日志当前行中URL的计数加一。
  4. 按计数器值(降序)对哈希表进行排序,并取前五位。
  5. 打印出前五个条目。

这个程序并不像Unix管道那样简练但是它的可读性很强对于这两种方式每个人有不同的偏好。但是除了表面上的语法差异两者执行流程也有很大差异文件越大则越明显。

排序 VS 内存中的聚合

Ruby脚本在内存中保存着一个URL的哈希表其中每个URL映射到它被统计的次数。 Unix流水线没有这样一个哈希表而是依赖于对URL列表的排序在这个URL列表中同一个URL的只是简单的重复出现。

哪种方法更好这取决于你有多少个不同的URL。对于大多数中小型网站您可能可以为所有不同网址提供一个计数器假设我们使用1GB内存。在此例中作业的工作集作业需要随机访问的内存大小仅取决于不同URL的数量如果单个URL有一百万个日志条目则哈希中所需的空间表仍然只有一个URL加上一个计数器的大小。如果这个工作集足够小那么内存哈希表工作正常甚至在性能较差的笔记本电脑上也可以。

另一方面如果作业的工作集大于可用内存则排序方法的优点是可以高效地使用磁盘。这与我们在第74页的“[SSTables和LSM树]”中讨论过的原理是一样的数据块可以在内存中排序并分段写入磁盘然后多个有序的分段可以合并为一个更大的有序文件。归并排序具有在磁盘上运行良好的顺序访问模式。请记住在顺序I/O中进行优化是第3章中反复出现的主题,这里我们再次提到。)

Linux GNU 核心工具(Coreutils)中的排序自动把大于可用内存的数据集在磁盘进行分割,并且通过CPU的多核并行排序【9】。这意味着之前简单的Unix命令链很容易扩展到大数据集并且不会耗尽内存。瓶颈更可能是从磁盘读取输入文件的速度。

Unix哲学

我们可以非常容易地使用前一个例子中的命令链来分析日志文件这并非巧合这实际上是Unix的关键设计思想之一而且它今天依然令人惊讶。让我们更深入地进行研究这样我们可以从Unix中借鉴一些想法【10】。

Unix管道的发明者道格·麦克罗伊Doug McIlroy在1964年首先做出了描述【11】“我们需要一种像花园软管一样的连接程序的方法 - 可以把数据通过拧口传输到不同的片段中。这也是I/O的实现方式”。通过管道连接程序的想法成为了Unix的哲学的一部分Unix哲学成为在开发人员和用户之间流行的设计准则。1978年, Unix哲学得到具体描述【12,13】:

  1. 让每个程序都做好一件事。如果有新的任务,重新建立一个新的程序,而不是在原有程序上增加“功能”使其复杂化。
  2. 每个程序的输出应该成为另一个程序的输入,而不是未知程序。不要将输出与无关的信息混淆。避免使用严格的柱状或二进制输入格式。不要坚持交互式输入。
  3. 设计和构建软件,甚至是操作系统时要尽早尝试,最好在几周内完成。毫不犹犹豫扔掉笨拙的部分并重建它们。
  4. 优先使用工具而不是拙劣的帮助来减轻编程任务,即使您必须花费额外经历开发工具,并且其中一些工具可能今后再也不会用到。

这种方式 - 自动化,快速原型设计,迭代式迭代,对实验友好,将大型项目分解成可管理的模块 - 听起来非常像今天的敏捷和DevOps概念。令人惊奇的是这个理念四十年来变化不大。

排序工具是一个“做好一件事”很好的例子。它可以说比大多数编程语言的标准库不会分割到磁盘并且不使用多线程即使这么做有好处中的实现更好。然而排序工具几乎不会单独使用。它只能与其他Unix工具uniq)结合使用。

像bash这样的Unix shell可以让我们轻松地将这些小程序组合成令人惊讶的强大数据处理作业。尽管这些程序有很多是由不同人群编写的但它们可以灵活地组合在一起。 Unix如何实现这种可组合性

统一的接口

如果您希望一个程序的输出成为另一个程序的输入,那意味着这些程序必须使用相同的数据格式 - 换句话说是一个兼容的接口。如果您希望能够将任何程序的输入和其他程序的输出连接起来那意味着所有程序必须使用相同的I/O接口。

在Unix中该接口是一个文件更准确地说是一个文件描述符。一个文件只是一个有序的字节序列。因为这是一个非常简单的接口所以可以用来表示许多不同的东西文件系统上的真实文件到另一个进程Unix套接字stdinstdout的通信通道设备驱动程序比如/dev/audio/dev/lp0表示TCP连接的套接字等等。这看起来似乎理所当然但实际上这些非常不同的事物可以共享一个统一接口是很牛逼的这使它们可以很容易地连接在一起2

按照惯例许多但不是全部Unix程序将这个字节序列视为ASCII文本。我们的日志分析案例使用了这个事实awksortuniq和head都将它们的输入文件视为由\n换行符ASCII 0x0A字符分隔的记录列表。 \n的选择是有争议的 - ASCII记录分隔符0x1E可能是一个更好的选择因为它是为了这个目的而设计的【14】 但是无论如何,所有这些程序使用相同的记录分隔符允许它们进行交互。

每个记录(比如一行输入)的解析更加模糊。 Unix工具通常通过空格或制表符将行分割成字段但也使用CSV逗号分隔管道分隔和其他编码。即使像xargs这样一个相当简单的工具也有六个命令行选项,用于指定如何解析输入。

ASCII文本的统一接口大多数情况下好用但它不优雅我们的日志分析示例使用{print $ 7}来提取网址,这个命令的可读性很差。理想情况下,这可能是{print $ request_url}或类似的东西。我们稍后来看这个想法。

尽管几十年后仍然不够完美但统一的Unix接口还是非同凡响。与Unix工具一样软件的交互操作并不是很多你不能通过定制分析工具将你的邮件和在线购物记录通过管道导入电子表格并且发布到社交网络或者维基。像Unix工具一样流畅地将程序组合运行是一个例外而不是规范。

即使是具有相同数据模型的数据库,也往往不容易进行数据迁移。这种缺乏整合导致数据的巴尔干化3

逻辑和线路的分离

Unix工具的另一个特点是使用标准输入stdin和标准输出stdout。如果你运行一个程序而不指定任何其他的东西将会从键盘获得并且标准输出到屏幕上。但是您也可以从文件获得输入和/或将输出重定向到文件。管道允许您将一个进程的标准输出附加到另一个进程的标准输入上(通过较小的内存缓冲区,而不需要将整个中间数据写入磁盘)。

程序仍然可以直接读取和写入文件但如果程序不关注特定的文件路径只使用标准输入和标准输出则Unix的方法效果最好。这允许shell用户以任何他们想要的方式连接输入和输出;该程序不知道或不关心输入来自哪里以及输出到哪里。 可以说这是一种松耦合后期绑定【15】或控制反转【16】。将输入/输出的连接与程序逻辑分开,可以更容易将小模块组合成更大的系统。

您甚至可以编写自己的程序并将它们与操作系统提供的工具组合在一起。你的程序只需要从标准输入读取输入并将输出写入标准输出并且可以参与数据处理流水线。在日志分析案例中您可以编写一个将user-agent字符串转换为更易读的浏览器标识符或者将IP地址转换为国家代码的工具并将其插入管道。排序程序并不关心它和操作系统的一部分还是你写的程序进行通信。

但是使用stdin和stdout可以做什么是有限的。需要多个输入或输出的程序会很棘手。如果程序直接打开文件进行读写或者启动一个子程序或者打开网络连接则无法将程序的输出传输到网络连接中【17,18】4这种情况下I/O操作由程序本身连接。它仍然可以配置例如通过命令行选项但是减少了在shell中连接输入和输出的灵活性。

透明度和调试

Unix工具如此成功的部分原因是它们使得查看正在发生的事情变得非常容易

  • Unix命令的输入文件通常被视为不可变的。这意味着您可以随意运行命令尝试各种命令行选项而不会损坏输入文件。

  • 您可以在任何时候结束管道,将输出管道输送到less,然后查找它是否具有预期的数据。这种检查能力对调试非常有用。

  • 您可以将一个流水线阶段的输出写入文件,并将该文件用作下一阶段的输入。这使您可以重新启动后面的阶段,而无需重新运行整个管道。

因此与关系数据库的查询优化器相比即使Unix工具非常直接和简单仍然非常有用特别是对于调试来说。

然而Unix工具的最大局限在于它们只能在一台机器上运行 - 而Hadoop这样的工具就此应运而生。

MapReduce和分布式文件系统

MapReduce有点像Unix工具但分布在数千台机器上。像Unix工具一样这是一个相当直接的蛮力的但却是令人惊讶的有效工具。一个MapReduce作业可以和一个Unix进程类似它获取一个或多个输入并产生一个或多个输出。

和大多数Unix工具一样运行MapReduce作业通常不会修改输入除了生成输出外没有任何副作用。输出文件以顺序的方式写入一次一旦生成输出不会修改任何现有的文件

Unix工具使用stdinstdout作为输入和输出MapReduce作业在分布式文件系统上读写文件。在Hadoop的Map-Reduce实现中该文件系统被称为HDFSHadoop分布式文件系统一个开源Google文件系统GFS的重新实现【19】。

除HDFS外还有各种其他分布式文件系统如GlusterFS和Quantcast File SystemQFS【20】。诸如Amazon S3Azure Blob存储和OpenStack Swift [21]等对象存储服务非常类似5。在本章中我们将主要使用HDFS作为示例但是原则适用于任何分布式文件系统。

与网络连接存储NAS和存储区域网络SAN架构的共享磁盘方法相比HDFS基于无共享原则参见第二部分的介绍。共享磁盘存储由集中式存储设备实现通常使用定制硬件和专用网络基础设施如光纤通道。另一方面无共享方法不需要特殊的硬件只需要通过传统数据中心网络连接的计算机。

HDFS包含在每台机器上运行的守护进程暴露一个允许其他节点访问存储在该机器上的文件的网络服务假设数据中心中的每台通用的计算机都有磁盘。名为NameNode的中央服务器会跟踪哪个文件块存储在哪台机器上。因此在概念上HDFS创建了一个大文件系统的守护进程可以使用所有机器磁盘上的内容。

为了容忍机器和磁盘故障文件块被复制到多台机器上。复制可能意味着在多个机器上存在相同数据的多个副本如第5章中所述或者像Reed-Solomon代码这样的擦除编码方案比完全复制花费更小的开销恢复丢失的数据【20 22】。这些技术与RAID相似可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。

HDFS发展迅猛在撰写本文时最大的HDFS部署运行在成千上万台机器上总存储容量达数百peta-bytes 【23】。使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于同等容量的专用存储设备【24】使得如此大的规模存储变得可行。

MapReduce作业执行

MapReduce是一个编程框架您可以使用它编写代码来处理HDFS等分布式文件系统中的大型数据集。理解它的最简单方法是参考第391页上的“简单日志分析”中的Web服务器日志分析示例。MapReduce中的数据处理模式与下例非常相似

  1. 读取一组输入文件并将其分解成多个记录。在Web服务器日志案例中每条记录都是日志中的一行\n是记录分隔符)。
  2. 调用Mapper函数从每个输入记录中提取一个键和值。在前面的例子中mapper函数是awk'{print $ 7}':它提取URL($7)作为主键,并将值保留为空。
  3. 按键排序所有的键值对。在日志案例中,这由第一个排序命令完成。
  4. 调用reducer函数遍历排序后的键值对。如果同一个键出现多次排序使它们在列表中相邻所以很容易组合这些值而不必在内存中保留很多中间状态。在前面的例子中reducer是由uniq -c命令实现的该命令使用相同的密钥来统计相邻记录的数量。

这四个步骤可以由一个MapReduce作业执行。步骤2map和4reduce是您编写自定义数据处理代码的地方。步骤1将文件分解成记录由输入格式解析器处理。步骤3中的排序步骤隐含在MapReduce中 - 您不必编写它因为mapper的输出始终在给予reducer之前进行排序。

要创建MapReduce作业您需要实现两个回调函数mapper和reducer其行为如下另请参阅“MapReduce查询”第46页

Mapper

每个输入记录都会调用一次mapper其工作是从输入记录中提取键和值。对于每个输入它可以生成任意数量的键值对包括没有。它不会保留从一个输入记录到下一个中的状态因此每个记录都是独立处理的。

Reducer

MapReduce框架使用由mapper生成的键值对收集属于同一个键的所有值并使用迭代器调用reducer处理集合。 Reducer可以产生输出记录例如相同URL的出现次数

在Web服务器日志案例中第5步中出现了第二个排序命令它按请求数对URL进行排序。在MapReduce中如果您需要第二个排序阶段则可以通过编写第二个MapReduce作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来mapper的作用是将数据放入一个适合排序的表单中而reducer的作用是处理已排序的数据。

分布式执行MapReduce

和Unix命令管道相比MapReduce可以在多台机器上并行执行计算而无需编写代码来明确并行处理。mapper和reducer一次只能处理一条记录;他们不需要知道他们的输入来自哪里或者输出到什么地方,所以框架可以处理数据在机器之间移动带来的复杂性。

在分布式计算中可以使用标准的Unix工具作为mapper和reducer【25】但是通常我们使用传统编程语言中的函数来实现,。在Hadoop MapReduce中mapper和reducer都是实现特定接口的Java类。在MongoDB和CouchDB中mapper和reducer都是JavaScript函数请参阅第46页的“MapReduce查询”

图10-1显示了Hadoop MapReduce作业中的数据流。它的并行化基于分区参见第6章输入通常是HDFS中的一个目录输入目录中的每个文件或文件块都被认为是一个单独的分区可以单独进行map任务图10-1中的m 1m 2和m 3标记

每个输入文件的大小通常是数百兆字节。 MapReduce调度器图中未显示试图在其中一台存储输入文件副本的机器上运行每个mapper只要该机器有足够的备用RAM和CPU资源来运行映射任务【26】。这个原则被称为将计算放在数据附近【27】节省通过网络复制输入文件开销减少网络负载和增加局部计算。

图10-1 具有三个Mapper和三个Reducer的MapReduce任务

在大多数情况下应该在map任务中运行的程序代码在分配运行的计算机上不存在所以MapReduce框架首先复制代码例如Java程序中的JAR文件到适当的机器。然后启动map任务并开始读取输入文件一次将一条记录传递给mapper的回调函数。mapper的输出由键值对组成。

Reduer的计算也会分割。虽然Map任务的数量由输入文件块的数量决定但Reducer的任务数量由作业执行者配置可以不同于mapper任务的数量。为了确保具有相同主键的所有键值对在相同的reducer处理框架使用主键的哈希值来确定reducer处理的键值对参见“通过主键哈希分区”第203页

键值对必须进行排序但数据集可能太大无法在单台机器上使用常规排序算法进行排序。替代方案是将排序分阶段进行。首先每个map任务都基于主键的哈希通过reducer将其输出分区。每一个分区都作为排序好的文件被写入mapper的磁盘上使用的技术与我们在第76页的“SSTables and LSM-Trees”中讨论的类似。

只要mapper读取输入文件并写入它排序后的输出文件MapReduce调度器就会通知reducer可以从mapper获取输出文件。Reducer连接到每个mapper并下载其对应分区的排序好的键值对文件。Reducer进行分区排序从mapper中拷贝对应分区到reducer一并称为洗牌(shuffle)【26】一个令人困惑的术语 - 不像扑克中的洗牌MapReduce中没有随机性

Reduce任务从mapper获取文件,将它们合并在一起并保证有序。因此如果不同的mapper使用相同的主键生成记录则它们将在合并的reducer输入中相邻。

Reducer通过一个迭代器对相同的主键进行逐步扫描在某些情况下可能不是全部在内存中。 Reducer可以使用任意逻辑来处理这些记录并且可以生成任意数量的输出记录。这些输出会写入到分布式文件系统上的文件通常存在运行reducer的机器的本地磁盘其他机器上则是这个文件的副本

MapReduce工作流程

单个MapReduce作业可以解决的问题范围有限。请参阅日志分析案例一个MapReduce作业可以确定每个URL的页面浏览次数但无法确定访问最多的URL因为这需要第二轮排序。

因此将MapReduce作业链到工作流中非常常见例如一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce框架对工作流没有特别的支持所以这个链接是通过目录名隐含完成的第一个作业必须通过配置将其输出写入到HDFS中的指定目录第二个作业必须是读取这个目录。从MapReduce框架的角度来看他们是两个独立的工作。

因此被链接的MapReduce作业和Unix命令的流水线它直接将一个进程的输出作为输入传递给另一个进程只使用一个小的内存缓冲区不太一样它更像是一个命令序列其中每个命令的输出会写入临时文件下一个命令从临时文件中读取。这种设计有利有弊我们将在第419页“中间状态的具体化”中讨论。

只有当作业成功完成时批处理作业的输出才被视为有效MapReduce丢弃失败作业产生的部分输出。因此工作流中的一项工作只有在先前的工作 - 即产生它所需的输入工作成功完成时才能开始。为了处理这些作业之间的依赖关系执行基于Hadoop开发了各种工作流调度器包括OozieAzkabanLuigiAirflow和Pinball [28]。

这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统[29]时由50到100个MapReduce作业组成的工作流非常常见而在大型组织中不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流非常重要。

Hadoop的各种高级工具如Pig [30]Hive [31]Cascading [32]Crunch [33]和FlumeJava [34]也设置了多个MapReduce阶段的工作流程并且将他们自动连接在一起。

Reduce端连接(join)与分组(group)

我们在第2章中讨论了数据模型和查询语言的连接但是我们还没有深入探讨连接如何实现现在我们来再次探讨。

在许多数据集中,一条记录与另一条记录有连接的情况很常见:关系模型中的外键,文档模型中的文档引用,图模型中的边。只要有一些代码需要访问该连接两边的记录包含引用的记录和被引用的记录连接就是必需的。正如第2章所讨论的反范式可以减少对连接的依赖但很难将其完全移除6

在数据库中如果执行的查询只涉及少量记录数据库通常会使用索引来快速定位请参阅第3章。如果查询涉及连接则可能需要多个索引查找。然而MapReduce没有索引的概念 - 至少不是通常意义上的索引。

当MapReduce作业被赋予一组文件作为输入时它读取这些文件的全部内容;数据库会把这个操作称为全表扫描。如果您只想读取少量的记录与索引查找相比全表扫描的成本非常高昂。但是在分析查询中请参阅第88页上的“处理事务或分析”通常需要计算大量记录的聚合。在这种情况下全表扫描可能比较合理特别是可以在多台机器上并行处理。

当我们在批处理的背景下讨论连接时,意味着解析数据集内某个连接的所有事件。 例如,我们假设一个作业同时为所有用户处理数据,而不仅仅是为一个特定用户查找数据(可以通过索引更有效地完成)。

示例:分析用户活动事件

图10-2给出了一个批处理作业中使用join的典型例子。 在左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件或点击流数据),右侧是用户数据库。 您可以将此示例看作是星型模式的一部分请参阅“星号和雪花分析的示意图”第93页事件日志记录可以看做一个表用户数据库是其中的一个维度(通过userId映射)。

图10-2 用户行为日志与用户档案的连接

分析任务可能需要将用户活动与用户信息相连接例如用户资料包含年龄或出生日期则系统可以确定不同年龄组中最受欢迎的页面。但是活动事件仅包含用户ID而不是完整的用户资料信息。在每一个活动事件中嵌入这个完整信息很可能非常浪费。因此活动事件需要和用户资料数据库连接。

最简单实现连接的方式是逐个遍历活动事件并为每个用户ID查询用户数据库在远程服务器上。虽然可行,但是很可能性能非常差:处理吞吐量受限于访问数据库服务器的往返时间,本地缓存的有效性很大程度上取决于数据的分布情况,并行运行大量查询可能会轻易压垮数据库[35]。

为了在批处理过程中达到良好的吞吐量,计算必须(尽可能)在一台机器上进行。通过网络随机访问要处理的每个记录太慢。而且,查询远程数据库意味着批处理作业结果会变得不确定,因为远程数据库中的数据可能会改变。

所以更好的方法是获取用户数据库的副本例如使用ETL进程从数据库备份中提取数据请参阅第91页上的“数据仓库”并将其放入与用户活动事件的日志相同的分布式文件系统中。这样用户数据库存在于HDFS中的一组文件中用户活动记录在另一组文件中您可以使用MapReduce将所有相关记录集中在一起并高效地处理它们。

排序合并连接

回想一下mapper的作用是从每个输入记录中提取一个键和值。在图10-2中键是用户ID一组mapper负责活动事件提取用户ID作为键和活动事件作为值而另一组mapper将会检查用户数据库提取用户ID作为键,用户的出生日期作为值。这个过程如图10-3所示。

图10-3 Reduce端基于user ID进行归并排序连接如果输入数据集被分为多个文件则每个都可以被多个mapper并行处理

当MapReduce框架通过主键对mapper输出进行分区然后对键值对进行排序后所有用户ID相同的活动事件和用户记录在reducer输入中彼此相邻。 Map-Reduce作业甚至可以通过排序使reducer始终最先看到用户信息,然后紧跟着按照时间排序的活动事件 - 这种技术被称为次级排序【26】。

然后reducer可以很容易地执行真正的连接逻辑对于每个用户ID都会调用一次reduce函数并且由于次级排序第一个值应该是来自用户数据库的出生日期记录。 Reducer将出生日期存储在本地变量中然后使用相同的用户ID遍历活动事件输出“已观看网址”和“观看者年龄”这一对数据。随后的Map-Reduce作业可以计算每个URL查看者的年龄分布并按年龄组进行分簇。

由于reducer一次处理一个特定用户ID的所有记录因此每次只需要将一个用户记录保存在内存中而不需要通过网络发起任何请求。这个算法被称为归并排序连接(sort-merge join)因为mapper输出按主键排序然后reducer可以把两边排序好的记录列表合并在一起。

相关数据整合

在归并排序连接中mapper和排序过程确保将执行特定用户ID的连接操作的所有必需数据放在一起然后只需调用一次reducer。因为预先排列了所有需要的数据reducer可以是一个高吞吐量并且低内存消耗的相当简单单线程的代码。

看待这种体系结构的一种方法是mapper“发送消息”给reducer。当一个mapper发出一个键值对时主键决定了值应该传递到的目标地址。尽管主键只是一个任意的字符串不像IP地址和端口号那样的实际的网络地址它的行为就像一个地址所有具有相同主键的键值对将被发送到相同的目标对同一个reducer的调用

使用MapReduce编程模型将物理网络通信方面的计算从正确的计算机获取数据从应用程序逻辑处理数据中分离出来。这种分离与数据库的典型应用形成了鲜明的对比从数据库中获取数据的请求通常发生在应用程序代码中的某处[36]。由于MapReduce能够处理所有的网络通信因此它也避免了应用程序代码担心的部分故障问题例如另一个节点的崩溃MapReduce在不影响应用程序逻辑的前提下透明地重试失败的任务。

分组(GROUP BY)

除了连接(Join)“数据整合”的另一个常见用法是通过某个键如SQL中的GROUP BY子句对记录进行分组。相同的密钥的记录成为一个分组并且下一步通常在每个组内进行某种聚合例如

  • 计算每个组中记录的数量例如在统计页面视图的示例中在SQL中表示为COUNT*)聚合)
  • 选取特定字段(SUM(fieldname))进行累加
  • 根据某些排序函数选择前k个记录

使用MapReduce实现这种分组操作的最简单方法是设置mapper可以依据需要的主键进行分组生成的键值对。然后分区和排序过程将所有相同键的记录一起发送给与同一个reducer。因此在MapReduce上实现时分组和连接看起来非常相似。

分组的另一个常见用途是整理特定用户会话期间的所有活动事件,以便找出用户的一系列操作(称为会话化[37]。例如可以使用这种分析来确定显示网站新版本的用户是否比那些显示旧版本A / B测试的用户更有可能进行购买或计算某个营销活动是否值得。

如果您有多个Web服务器处理用户请求则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。您可以通过使用会话cookie用户ID或类似的标识符作为分组主键来实现会话化并将特定用户的所有活动事件进行整合同时将不同用户的事件分配到不同的分区。

处理偏斜

如果大量数据指向同一主键,则“将具有相同主键的所有记录放到相同位置”的模式将被破坏。例如,在社交网络中,大多数用户可能和几百人有连接,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为“关键对象”(linchpin objects)[38]或热键(hot keys)。

在单个reducer中收集与名人相关的所有活动例如回复他们发布的内容可能导致严重的偏斜也称为热点 - 也就是说一个reducer必须处理比其他更多的记录参见205页的“偏斜的工作负载和消除热点“。由于MapReduce作业只有在其所有mapper和reducer都完成时才完成所有后续作业必须等待最慢的reducer完成后才能启动。

如果连接(join)输入有热键则可以使用一些算法进行补偿。例如Pig中的偏斜连接方法首先运行一个抽样作业来确定哪些是热键[39]。执行实际连接时mapper随机发送到一系列主键相关的reducer中的一个传统的MapReduce会选择一个基于主键哈希确定的reducer。对于连接的其他输入与热键相关的记录需要被复制到所有处理该密钥的reducer[40]。

这种技术将处理热键的工作分散到多个reducer上以把其他连接输入复制到多个reducer的代价使其更好地并行化。Crunch中的分片连接(sharded join)方法与之类似但需要显式指定热键而不使用采样作业。这种技术也非常类似于我们在第205页的“偏斜的工作负载和消除热点”中讨论的技术使用随机化来缓解(alleviate)分区数据库中的热点。

Hive的偏斜连接优化采取了另一种方法。它需要在表元数据中明确指定热键并将与这些键相关的记录与其余文件分开存放。在该表上执行连接时它将使用map端连接请参阅下一节获取热键。

使用热键对记录进行分组和聚合时可以分两个阶段。第一个MapReduce阶段将记录发送到随机reducer以便每个reducer对热键对应的子集执行分组并为每个键输出更紧凑的聚合值。第二个Map-Reduce作业将来自所有第一阶段reducer的输出合并为每个键的单个值。

Map端连接

上一节描述的连接算法在reducer中执行实际的连接逻辑因此被称为reduce端连接。mapper负责准备输入数据从每个输入记录中提取键和值将键值对分配给reducer分区并按键排序。

reduce端连接的优点是不需要对输入数据做任何假设无论其属性和结构如何mapper都可以为连接准备数据。然而不利的一面是所有这些排序复制到reducer以及合并reducer输入可能开销非常大。取决于可用的内存缓冲区因为数据会经历MapReduce [37]的不同阶段,可能被多次写入磁盘。

另一方面如果您可以对输入数据进行某些假设则可以通过使用所谓的map端连接来加快连接速度。这种方法使用了一个简化的MapReduce作业其中没有reducer也没有排序。相反每个mapper只需从分布式文件系统读取一个输入文件块然后将一个输出文件写入文件系统。

广播哈希连接

执行map端连接最简单的方法适用于大数据集与小数据集连接的情况。特别是小数据集需要足够小以便可以将其全部加载到每个mapper的内存中。

例如在图10-2的情况下假设用户数据库足够小可以放入内存。当mapper启动时它可以首先将用户数据库从分布式文件系统读取到内存中的哈希表中。然后mapper可以扫描用户活动事件并简单地在哈希表查找中每个事件的用户ID7

这种方式仍然可以有多个map任务一个是大数据集用于连接的每个文件块在图10-2的例子中活动事件是大数据集然后每个mapper都将小的数据集全部加载到内存中和之前进行连接。

这种简单而有效的算法被称为广播哈希连接“广播”反映了这样一个事实即大数据集分区后的每个mapper都读取整个小数据集所以小数据集有效地“广播”到大的数据单词hash反映了它使用一个哈希表。 Pig又叫做“replicated join”Hive“MapJoin”Cascading和Crunch都支持这种连接方法。它也用于数据仓库查询引擎如Impala [41]。

除了将连接中的小数据集加载到内存哈希表中以外,另一种方法是将它存储在本地磁盘上的只读索引中[42]。索引中经常使用的部分将保留在操作系统的页面缓存中,因此这种方法可以提供与内存中哈希表几乎一样快的随机访问查找,也并不需要把数据放入内存。

分区哈希连接

如果map端连接的输入以相同的方式进行分区那么哈希连接方法可以独立应用于每个分区。在图10-2的情况中您可以根据用户ID的最后一位(假设是数字)进行分区因此大小数据集每边都有10个分区。例如mapper 3首先将所有具有以3结尾的ID的用户加载到哈希表中然后扫描ID为3的每个用户的所有活动事件。

如果分区正确无误您可以确定所有需要连接的记录都位于相同编号的分区中因此每个mapper只需读取一个分区就足够了。这个方法的优点是每个mapper都只需将较少量的数据加载到其哈希表中。

这种方法只适用于连接两边的输入具有相同数量的分区的情况并且分区选取的主键和哈希方式相同。如果输入是由之前执行过这个分组的MapReduce作业生成的那么这是一个可行的方案。

在Hive中, 分区哈希连接被叫做桶映射连接(bucketed map joins )。

Map端归并连接

如果输入数据集不仅以相同的方式进行分区而且还基于相同的键进行排序则可以应用另一种map端连接的变体。在这种情况下输入是否足够小可以放入内存并不重要因为mapper可以执行通常由reducer执行的合并操作按主键升序顺序递增读取两个输入文件来匹配有相同主键的记录。

如果可以进行map端归并连接则可能意味着先前的MapReduce作业将输入数据集进行了分区和排序。原则上这个连接可以在之前工作的reduce阶段进行。但是在单独只有map的作业中执行归并连接仍然是有可能的例如除了上述特定连接之外分区和排序好的数据集还有其他用途。

使用Map端连接的MapReduce工作流程

当下游作业消费MapReduce连接完成后的输出时map端或reduce端连接的选择会影响输出的结构。 reduce端连接的输出按连接主键进行分区和排序而map端连接的输出与大数据集相同的方式进行分区和排序因为每个map任务都基于连接的大数据集端的文件块进行启动无论是使用分区连接还是广播连接

如前所述map端连接也对输入数据集的大小排序和分区做出了更多的假设和限制。在优化连接策略时了解分布式文件系统中数据集的物理布局变得非常重要仅仅知道编码格式和数据存储目录的名称是不够的您还必须知道数据分区的数量和排序依据的主键。

在Hadoop生态系统中这种关于数据集分区的元数据经常在HCatalog和Hive Metastore中维护[37]。

批处理工作流的输出

我们已经谈了很多关于实现MapReduce工作流程的各种算法但是我们忽略了一个重要的问题所有处理完成的结果是什么我们为什么要把这些工作放在前面

在数据库查询中我们把事务处理OLTP和分析处理(OLAP)进行了区分请参阅第90页上的“事务处理或分析。我们看到OLTP查询通常使用索引按键查找少量记录以便将其呈现给用户例如在网页上。而另一方面分析查询通常会扫描大量记录执行分组和聚合输出通常是报告的形式显示某个随时间变化的指标的图表或以某种排序得出的前10项或把一些数量的数据按子类别分解。这种报告的用户通常是需要做出商业决策的分析师或经理。

批处理适用在哪里不是事务处理也不是分析但是与分析更接近因为批处理通常需要扫描输入数据集的大部分。然而MapReduce作业的工作流程与用于分析目的的SQL查询不同请参阅第414页的“比较Hadoop与分布式数据库”。批处理过程的输出通常不是报告而是一些其他类型的结构。

建立搜索索引

Google最初使用的MapReduce为其搜索引擎建立索引以5到10个MapReduce作业的工作流实现[1]。虽然Google后来不再使用MapReduce [43]构建搜索索引但是如果从建立搜索索引的角度来看可以帮助理解MapReduce。 即使在今天Hadoop MapReduce仍然是构建Lucene / Solr索引的好方法。

我们在第88页的“全文搜索和模糊索引”中简要地看到了Lucene这样的全文搜索索引是如何工作的它是一个文件术语字典您可以在其中高效地查找特定关键字并找到包含该关键字的所有文档ID列表发布列表。这是一个非常简单的搜索索引视图 - 实际上,它需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等,但这一原则是成立的。

如果需要对一组固定文档执行全文搜索则批处理是构建索引的一种非常有效的方法mapper根据需要对文档集合进行分区每个reducer构建其分区的索引并将索引文件写入分布式文件系统。构建这样的文档分区索引请参阅“分区和二级索引”第184页非常有利于并行处理。

由于按关键字查询搜索索引是只读操作,因此这些索引文件一旦创建就是不可变的。

如果索引的文档集合发生更改,则可以选择定期重新运行整个索引工作流程,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法可能会带来很高的计算成本,但是它的优点是索引更新过程很简单:文档变化,索引过期需更新。

或者可以增量建立索引。如第3章所述如果要添加删除或更新索引中的文档Lucene会写出新的某段(segment)文件并在后台异步合并和整理。我们将在第11章中看到更多这样的增量处理。

键值存储作为批处理输出

搜索索引只是批处理工作流程可能输出的一个示例。批量处理的另一个常见用途是构建机器学习系统,如分类器(例如,垃圾邮件过滤器,异常检测,图像识别)和推荐系统(例如,您可能认识的人,您可能感兴趣的产品或相关搜索)[29])。

这些批处理作业的输出通常是某种数据库例如可以通过用户ID查询并获取该用户的推荐好友的数据库或者可以通过产品ID查询的数据库并获取相关产品[45]。

这些数据库需要从处理用户请求的Web应用程序中查询这些请求通常与Hadoop基础架构分离。那么批处理过程的输出如何返回到Web应用程序可以查询的数据库中

最显然的选择可能是直接在mapper或reducer中使用客户端库作为您最喜欢的数据库并从批处理作业直接写入数据库服务器一次写入一条记录。这确实奏效假设您的防火墙规则允许从您的Hadoop环境直接访问您的生产数据库但由于以下几个原因这是一个坏主意

  • 正如前面讨论的连接一样,为每个记录提出一个网络请求比批处理任务的正常吞吐量要慢几个数量级。即使客户端库支持批处理,性能也可能很差。
  • MapReduce作业经常并行运行许多任务。如果所有mapper或reducer都同时以批处理速率写入相同的输出数据库那么该数据库可能很容易过载并且其查询性能可能受到影响。这可能会导致系统其他部分的运行问题[35]。
  • 通常情况下MapReduce为作业输出提供了一个完全的“全有或全无”的保证如果作业成功则结果就是每个任务只一次成功执行的输出如果某些任务失败则必须重试。如果整个作业失败则不会生成任何输出。然而从作业内部程序写入外部系统会产生不能被隐藏的外部可见性的副作用。因此您不得不担心部分完成的作业在其他系统可见以及Hadoop任务尝试和推测性执行的复杂性。

更好的解决方案是在批处理作业中创建一个全新的数据库并将其作为文件写入分布式文件系统中作业的输出目录中就像上一节的搜索索引一样。这些数据文件一旦写入就不可变可以批量加载到处理只读查询的服务器中。有很多基于键值存储的产品支持在MapReduce作业中构建数据库文件包括Voldemort [46]Terrapin [47]ElephantDB [48]和HBase批量加载[49]。

构建这些数据库文件是MapReduce的一个很好的用法使用mapper提取一个键然后使用该键进行排序已经成为构建索引所需的大部分工作。由于大多数这些键值存储是只读的文件只能由批处理作业一次写入而且是不可变的所以数据结构非常简单。例如它们不需要WAL请参阅第82页的「使B树可靠」 Write-Ahead Logging - 预写式日志)。

将数据加载到Voldemort时服务器将继续向旧数据文件提供请求同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成服务器会自动切换到查询新文件。如果在这个过程中出现任何问题它可以很容易地再次切换回旧的文件因为它们仍然存在并且是不变的[46]。

批量过程输出的哲学

本章前面讨论过的Unix哲学第394页的“Unix哲学”鼓励通过明确的数据流来进行实验程序读取输入并写入输出。在这个过程中输入保持不变任何之前的输出都被新输出完全替换这样没有其他副作用。这意味着您可以随心所欲对命令重新运行调整或调试而不会扰乱系统的状态。

MapReduce作业的输出处理遵循相同的思想。将输入视为不可变来避免副作用如写入外部数据库批处理作业不仅性能良好而且更容易维护

  • 如果在代码中引入了一个错误并且输出错误或损坏了则可以简单地回滚到代码的先前版本然后重新运行该作业输出将再次正确。或者甚至更简单您可以将旧的输出保存在不同的目录中然后切换回原来的目录。具有读写事务的数据库没有这个属性如果你部署了有Bug的代码将错误的数据写入数据库那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的思想被称为人为容错[50]。)
  • 由于易于回滚,功能开发可以比“错误意味着不可挽回的损害”的环境更快地进行。这种使不可逆性最小化的原则有利于敏捷软件的开发[51]。
  • 如果map或reduce任务失败MapReduce框架将自动重新调度并在同一个输入上再次运行。如果失败是由于代码中的一个错误造成那么它会一直崩溃并最终导致作业在几次尝试之后失败。但是如果故障是由于暂时的问题引起那么故障是可以容忍的。这种自动重试是安全的因为输入不可变而失败任务的输出会被MapReduce框架丢弃。
  • 同一组文件可用作各种不同作业的输入,其中包括计算指标并评估作业的输出是否达到预期的监控作业(例如,将其与前一次运行的输出进行比较并测量差异)。
  • 与Unix工具类似MapReduce作业将逻辑与串联配置输入和输出目录分开这就提供了关注点的分离并且带来代码重用的可能性一个团队可以专注于实现一项工作其他团队可以决定何时何地运行这项工作。

在这些领域对Unix运行良好的设计原则似乎也适用于Hadoop但Unix和Hadoop在某些方面也有所不同。例如因为大多数Unix工具都没有指定文件的类型所以必须做大量的输入解析本章开头的日志分析示例使用{print $ 7}来提取URL。在Hadoop上通过使用更多结构化的文件格式来消除一些低价值的语法转换Avro请参阅第122页上的“Avro”和Parquet请参阅第95页上的“面向列的存储”经常被使用因为它们提供高效的基于schema的编码并允许随着时间的推移模式的演变见第4章

比较Hadoop和分布式数据库

正如我们所看到的Hadoop有点像Unix的分布式版本其中HDFS对应文件系统而MapReduce是Unix进程的特殊实现在Unix中排序总是在map和reduce这两个阶段之间进行。我们看到了如何在这些基础上实现各种连接和分组操作。

当MapReduce论文[1]发表时它在某种意义上说并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前的所谓的大规模并行处理MPP数据库中实现了[3,40]。例如Gamma数据库机器Teradata和Tandem NonStop SQL是这方面的先驱[52]。

最大的区别是MPP数据库集中于在一组机器上并行执行分析SQL查询而MapReduce和分布式文件系统[19]的组合则更像是一个可以运行任意程序的通用操作系统。

存储的多样性

数据库要求您根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码格式来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。

说白了Hadoop开放了将数据不加区分地转储到HDFS的可能性之后才考虑如何进一步处理它[53]。相比之下在将数据以特定格式导入数据库之前MPP数据库通常需要对数据和查询模式进行仔细的前期建模。

纯粹来看,这种谨慎的建模和导入似乎是可取的,因为这意味着数据库用户的数据质量更好。然而,在实践中,似乎只是简单地使数据可用 - 即使它有些怪异,形式原始,难以使用 - 通常也比尝试在前期确定理想的数据模型更有价值[54 ]。

这个想法与数据仓库类似请参阅第91页上的“数据仓库”简单的将大型组织的各个部分的数据集中在一起是很有价值的因为它可以将之前完全不同的数据集关联在一起。 MPP数据库所要求的谨慎的模式设计减慢了数据集中收集速度;以原始形式收集数据以后再去关注设计schema使数据收集速度加快这种理念有时被称为“数据湖”或“企业数据中心”【55】

不加区别的数据倾泻转移了解析数据的责任不再是强迫数据集的生产者将其转化为标准化的格式数据的解析成为消费者的问题读时建模方法【56】;请参阅第39页上的“文档模型中的模式灵活性”。如果生产者和消费者是不同团队并且有不同优先级这可能是一个优势。甚至可能不存在一个理想的数据模型利用数据的目的不同带来处理方式的不同。以原始形式简单地转储数据可以进行多次这样的转换。这种方法被称为寿司原则“原始数据而不是加工过的数据更好”【57】。

因此Hadoop经常被用于实现ETL过程请参阅“数据仓库”第91页事务处理系统中的数据以某种原始形式转储到分布式文件系统中然后编写MapReduce作业来清理数据将其转换为关系形式并将其导入MPP数据仓库以进行分析。数据建模仍然会有但在一个单独的步骤中从数据收集中分离出来。这种解耦可以实现因为分布式文件系统支持以任何格式编码的数据。

处理模型的多样性

MPP数据库是单一的紧密集成的软件负责磁盘上的存储布局查询计划调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化因此整个系统可以在其设计的查询类型上取得非常好的性能。而且SQL语言提供丰富查询的优雅机制无需编写代码业务分析师使用的图形工具例如Tableau可访问直接该语言。

另一方面并非所有类型的处理都容易使用SQL查询表达。例如如果要构建机器学习和推荐系统或者使用相关性排名模型的全文搜索索引或者执行图像分析则很可能需要更通用的数据处理模型。这些类型的处理通常对特定的应用程序非常具体例如机器学习的特征工程机器翻译的自然语言模型欺诈预测的风险评估函数因此它们不可避免地需要编写代码而不仅仅是查询。

MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce那么你可以在它上面建立一个SQL查询执行引擎事实上这正是Hive项目所做的[31]。而且对于不适合用SQL查询表示的批处理也可以自己实现处理方式。

随后人们发现MapReduce对于某些类型的处理来说太过于限制表现得很差基于Hadoop开发了其他各种处理模型我们将在第419页的“Beyond MapReduce”中看到其中的一些。仅仅两种处理模型SQL和MapReduce是不够的我们需要更多不同的模型而且由于Hadoop平台的开放性实施一整套方法是可行的而这在整体MPP数据库的范围内是不可能的[58]。

至关重要的是这些不同的处理模型都可以在一个共享用途的集群上运行所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中不需要将数据导入到几个不同的专用系统中进行不同类型的处理系统足够灵活可以支持同一个群集内不同的工作负载。不需要移动数据使得从数据中取值变得容易得多并且在新的处理模型进行实验更加容易。

Hadoop生态系统包括随机访问的OLTP数据库如HBase请参阅第70页的“SSTables和LSM-Trees”和MPP类型的分析数据库如Impala [41]。 HBase和Impala都不使用MapReduce但都使用HDFS进行存储。访问和处理数据的方式完全不同但是它们可以共存并被集成到同一个系统中。

为频繁的故障而设计

在比较MapReduce和MPP数据库设计理念时另外两个关键点是故障处理;对内存和磁盘的使用。与在线系统相比,批处理对故障不太敏感,因为失败不会立即影响用户,并且任务可以再次运行。

如果一个节点在执行查询时崩溃大多数MPP数据库会中止整个查询让用户重新提交查询或自动重新运行[3]。查询通常运行几秒钟最多几分钟,这种处理方法可以接受,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据例如使用哈希连接(join))以避免从磁盘读取的成本。

另一方面MapReduce通过以单个任务的粒度重试工作可以容忍map或reduce任务的失败而不会影响作业的整体。它也非常渴望将数据写入磁盘一方面是为了容错另一方面考虑数据集可能太大放不进内存。

MapReduce方法更适用于较大的作业处理很多数据并运行很长时间的作业以至于在此过程中很可能遇到至少一个任务故障。在这种情况下由于单个任务失败而重新运行整个工作太浪费了。即使以单个任务的粒度进行恢复带来的开销使得无故障处理更慢如果任务失败率足够高仍然可以进行合理的权衡。

但是这些假设有多大可能性?在大多数集群中,机器故障确实发生,但是它们不是很频繁 - 可能很少,大多数工作都不会经历机器故障。为了容错真的值得引入这些额外开销吗?

要了解MapReduce保守使用内存和任务级恢复的原因需要明白最初设计MapReduce的背景。 Google拥有混合使用的数据中心在线生产服务和离线批处理作业在相同的机器上运行。每个任务都有一个使用容器执行的资源分配CPU核心RAM磁盘空间等。每个任务也具有优先级如果优先级较高的任务需要更多的资源则可以终止抢占同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的价格团队必须为他们使用的资源付费优先级更高的进程更贵[59]。

这种架构允许为非生产低优先级投入过度的计算资源因为系统知道如果必要的话它可以回收资源。与分离生产和非生产任务的系统相比它可以更好地利用机器和提高效率。但是如果MapReduce作业以低优先级运行它随时都有被抢占的风险因为优先级较高的进程需要其资源。批量工作有效地“拾取桌子下面的碎片”利用高优先级进程已经占据以外的资源。

在谷歌运行一个小时的MapReduce任务有大约5被终止的风险为更高优先级的进程腾出空间。这个比率比由于硬件问题机器重新启动或其他原因引起的故障率高出一个数量级[59]。按照这种抢占率如果一个作业有100个任务每个运行10分钟那么至少有一个任务在完成之前将被终止的风险大于50

这就是为什么MapReduce能够容忍频繁意外的任务终止的原因这不是因为硬件特别不可靠而是因为任意终止进程可以在计算集群中更好地利用资源。

在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占以平衡不同队列的资源分配[58]但在编写本文时YARNMesos或Kubernetes不支持通用优先级抢占[60]。在任务不经常被终止的环境中MapReduce的设计决策没有太多意义。在下一节中我们将看看MapReduce的一些替代方案这些方案做出了不同的设计决策。

超越MapReduce

虽然MapReduce在2010年前后变得非常流行并受到大量炒作但它只是分布式系统众多编程模型的一种。根据数据量数据结构和处理类型可能其他工具更适合用于计算。

尽管如此我们在这一章花了很多时间讨论MapReduce因为它是一个有用的学习工具是分布式文件系统的一个相当清晰和简单的抽象。也就是说易于理解它在做什么而不是在易于使用。恰恰相反使用原始的MapReduce API来实现复杂的处理工作实际上是非常困难和费力的 - 例如,您需要从头开始实现任何连接算法[37]。

针对直接使用MapReduce的困难基于MapReduce衍生出很多更高级的编程模型PigHiveCascadingCrunch的抽象。如果您了解MapReduce的工作原理那么它们相当容易学习而且它们的高级架构使许多常见的批处理任务更容易实现。

但是MapReduce执行模型本身也存在一些问题这些问题并没有通过增加另一个抽象层次来解决而且在某些类型的处理中性能很差。一方面MapReduce非常强大您可以使用它来处理任意大量的数据,即使用户系统不可靠并且会有频繁的任务终止(虽然速度可能很慢)。另一方面,对于某种类型的处理来说其他的工具有时候也会更快。

在本章的其余部分中,我们将介绍另外一些批处理方法。在第十一章我们将转向流处理,这可以看作是加速批处理的另一种方法。

中间状态具体化

如前所述每个MapReduce作业都独立于其他任何作业。作业与其他地方的接口是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入则需要将第二个作业的输入目录配置为与第一个作业的输出目录并且外部工作流调度程序必须保证第一份作业已经完成的情况下才会启动下一个作业。

如果第一个作业的输出要在系统内广播,之前的设置非常合理。在这种情况下,您需要能够通过名称来引用它,并将其用作多个不同作业(包括由其他团队开发的作业)的输入。将数据发布到分布式文件系统中的众所周知的位置允许松耦合,这样作业就不需要知道是谁提供输入或消耗其输出(请参阅“分离逻辑和线路”,395页

但是在很多情况下您知道一个作业的输出只能用作另一个作业的输入并且他们由同一个团队维护。在这种情况下分布式文件系统上的文件只是简单的中间状态一种将数据从一个作业传递到下一个作业的方式。在用于构建由50或100个MapReduce作业[29]组成的推荐系统的复杂工作流程中,存在很多这样的中间状态。

将这个中间状态写入文件的过程称为具体化。 我们在第101页的“聚合数据立方体和具体化视图”中已经遇到了这个术语。它意味着要着重于计算某个操作的结果并写出来而不是有需求时才进行计算。

相反本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输出连接起来。管道并没有完全实现中间状态而是只使用一个小的内存缓冲区将输出增量流向输入。

MapReduce的完全实现中间状态的方法与Unix管道相比存在不足

  • MapReduce作业只有在前面的作业生成其输入中的所有任务都完成时才能启动而由Unix管道连接的进程同时启动输出一旦生成就会被使用。不同机器上的偏差或不同的负荷意味着一份工作往往会有一些较慢的任务。必须等到所有前面的工作完成才能拖慢了整个工作流程的执行。
  • mapper通常是多余的它们只读取刚刚由reducer写入的相同文件并为下一个分区和排序阶段做好准备。在许多情况下mapper代码可能是之前的reducer的一部分如果reducer输出被分区和排序的方式与mapper输出相同那么reducer们可以直接链接在一起而不需要在中间插入一个mapper。
  • 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这对于这样的临时数据通常过于浪费。

数据流引擎

了解决MapReduce的这些问题人们开发了几种用于分布式批量计算的新的执行引擎其中最着名的是Spark [61,62]Tez [63,64]和Flink [65,66]。他们设计的方式有很多不同之处,但他们有一个共同点:他们把整个工作流作为一项工作来处理,而不是把它分解成独立的子作业。

由于它们通过几个处理阶段明确地对数据流建模所以这些系统被称为数据流引擎。像MapReduce一样它们通过反复调用用户定义的函数,每次在单个线程上处理一条记录。他们通过对输入进行分区来并行工作,并将一个功能的输出复制到网络上,成为另一个功能的输入。

与MapReduce不同这些功能不需要mapper和reducer的严格交替而是可以更灵活进行组合。我们把这些函数函数称为操作符数据流引擎提供了几个不同的选项来连接一个操作符的输出到另一个的输入

  • 一个选项是通过键对记录进行重新分区和排序就像在MapReduce的洗牌阶段一样请参阅“分布式执行MapReduce”。此功能可以像在MapReduce中一样启用排序合并连接和分组。
  • 另一种可能是获取几个输入,并以相同的方式进行分区,但跳过排序。这节省了分区哈希连接的工作,这要求记录的分区重要,但顺序无所谓,因为哈希表本身就是无序。
  • 对于广播哈希连接,可以将一个运算符的相同输出发送到连接运算符的所有分区。

这种处理引擎的风格基于像Dryad [67]和Nephele [68]这样的研究系统与MapReduce模型相比它提供了几个优点

  • 排序等昂贵的工作只需要在实际需要的地方执行而不是在每个Map和Reduce阶段之间默认发生。
  • 没有不必要的map任务因为mapper的工作通常可以合并到前面的reducer中因为mapper不会更改数据集的分区
  • 由于工作流程中的所有连接和数据依赖性都是明确声明的,因此调度程序会清楚在哪里需要哪些数据,因此可以进行本地优化。例如,它可以尝试将使用某些数据的任务放在与生成它的任务相同的机器上,以便可以通过共享内存缓冲区交换数据,而不必通过网络复制数据。
  • 通常将运算符之间的中间状态保存在内存中比写入本地磁盘更为高效这比写入HDFS需要更少的I/O必须将其复制到多个计算机的磁盘上。 MapReduce已经将这种优化用于mapper的输出但是数据流引擎将该思想推广到了所有的中间状态。
  • 运算符可以在输入准备就绪后立即开始执行;不需要等待整个前一阶段的完成。
  • 与MapReduce为每个任务启动一个新的JVM相比Java虚拟机JVM进程可以重用来运行新操作从而减少启动开销。

您可以使用数据流引擎来执行与MapReduce相同的计算并且由于此处提到的优化通常执行速度会明显更快。既然操作符是map和reduce的泛化相同的处理代码可以在任一执行引擎上运行PigHive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce切换到Tez或Spark而无需修改代码[64]。

Tez是一个相当瘦的库它依赖于YARN shuffle服务来实现节点间数据的实际复制[58]而Spark和Flink则是拥有自己的网络通信层调度器和面向用户的API的大型框架。我们随后会讨论这些API。

容错

完全实现中间状态到分布式文件系统的一个优点是它是持久的这使得MapReduce中的容错相当容易如果一个任务失败它可以在另一台机器上重新启动并从文件系统重新读取相同的输入。

SparkFlink和Tez避免将中间状态写入HDFS因此他们采取了不同的容错方案如果一台机器发生故障并且该机器上的中间状态丢失则会从其他仍然可用的数据重新计算优先使用之前的可以拿到的中间数据然后才是HDFS上的原始输入数据

为了实现重新计算,框架必须跟踪给定数据是如何计算的 - 使用哪个输入分区,以及哪个运算符被应用。 Spark使用弹性分布式数据集RDD抽象来追踪数据的祖先[61]而Flink为运算符状态设置检查点允许在执行过程中遇到错误时重新运算[66]。

在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,运算符是否始终生成相同的输出?如果一些丢失的数据已经发送给下游运算符,这个问题就很重要。如果运算符重新启动,重新计算的数据与原有的丢失数据不一致,下游运算很难解决新旧数据之间的矛盾。对于不确定性运算符来说,解决方案通常是同样停止下游运算,然后再运行新数据。

为了避免这种级联故障,最好让运算符具有确定性。但是请注意,非确定性行为很容易不经意间发生:例如,许多编程语言在迭代哈希表的元素时不能保证任何特定顺序,许多概率和统计算法明确依赖于使用随机数,以及任何依赖系统时钟或外部数据源也是不确定的。为了可靠地从故障中恢复我们需要消除这种不确定性,例如通过使用固定种子产生伪随机数。

通过重新计算数据从故障中恢复并不总是合适如果中间数据比源数据小得多或者如果计算量非常大CPU开销大那么将中间数据转化为文件可能比将其重新计算更便宜。

具体化的讨论

回到与Unix的类比我们看到MapReduce就像是将每个命令的输出写入临时文件而数据流引擎看起来更像是Unix管道。尤其是Flink,围绕流水线执行的思想而建立:也就是说,将运算符的输出递增地传递给其他运算符,并且在开始处理之前不等待输入完成。

排序操作不可避免地需要消耗其整个输入,然后才能生成任何输出,因为最后一个输入记录可能排在最前。任何需要排序的运算都需要至少暂时地累积(记录)状态。但是工作流程的许多其他部分可以以流水线方式执行。

当作业完成时,它的输出需要持久化到某个地方,以便用户可以找到并使用它 - 很可能它会再次写入分布式文件系统。因此在使用数据流引擎时HDFS上的文件数据集通常仍是作业的输入和最终输出。和MapReduce一样输入是不可变的输出被完全替换。和MapReduce相比的好处是您可以省去将所有中间状态写入文件系统的开销。

图与迭代处理

在第49页上的“类图形数据模型”中我们讨论了使用图形来建模数据并使用图形查询语言来遍历图形中的边和顶点。第2章的讨论集中在OLTP风格的使用上快速执行查询来查找少量符合特定条件的顶点。

图表在批处理环境中的应用也很有趣其目标是在整个图表上执行某种离线处理或分析。这种需求经常出现在机器学习应用程序如推荐引擎或排序系统中。例如最着名的图形分析算法之一是PageRank [69],它试图根据都有什么网页链接到这个网页来估算这个网页的流行度。它是被用于确定搜索引擎呈现结果顺序公式的一部分。

像SparkFlink和Tez这样的数据流引擎参见第419页“中间状态的实现化”通常将运算符作为有向无环图DAG排列在作业中。这与图形处理不一样在数据流引擎中从一个运算符到另一个运算符的数据流被构造成一个图而数据本身通常由关系型元组(relational-style tuples)构成。在图形处理中,数据本身就是图形的形式。另一个不幸的命名混淆!

许多图算法是一次遍历一个边,将一个顶点与相邻的顶点连接起来以便传播一些信息,并且重复直到满足一些条件为止 - 例如直到后面没有更多的边或者直到一些收敛条件。我们在图2-6中看到一个例子它通过重复地跟踪后续的边来指示哪个位置在哪个位置当中这种算法被称为传递闭包从而列出了包含在数据库中的北美所有位置。

可以在分布式文件系统包含顶点和边的列表的文件中存储图形但是这种“重复直到完成”的思路不能用普通的MapReduce来表示因为它只执行一次数据传递。因此这种算法经常以迭代方式实现

  1. 外部调度程序运行批处理来计算算法中的一步。
  2. 当批处理过程完成时,调度器检查它是否完成(基于完成条件 - 例如,没有更多后续的边,或者与上次迭代相比的变化低于某个阈值)。
  3. 如果尚未完成则调度程序返回到步骤1并运行另一轮批处理。

这种方法可行但是用MapReduce实现往往是非常低效的因为MapReduce没有考虑算法的迭代性它总是读取整个输入数据集并产生一个全新的输出数据集即使与上次迭代相比只有一小部分发生变化。

Pregel处理模型

作为图形批处理的优化批量同步并行BSP计算模型[70]已经流行起来。其中它由Apache Giraph [37]Spark的GraphX API和Flink的Gelly API [71]实现。它也被称为Pregel模型因为Google的Pregel论文使这种处理图的方法变得流行[72]。

回想一下在MapReduce中mapper在概念上“发送消息”给reducer的特定调用因为框架将所有具有相同主键的mapper输出集中在一起。 Pregel背后的思想与其类似一个顶点可以“发送消息”到另一个顶点通常这些消息沿着图的边发送。

在每次迭代中,每个顶点调用一个函数,将所有发送给它的消息传递给函数 - 就像调用reducer一样。与MapReduce的不同之处在于在Pregel模型中顶点在一次迭代到下一次迭代期间记忆它的状态所以这个函数只需要处理新的传入消息。如果在图的某个部分没有发送消息则不需要做任何工作。

这与参与者(Actor)模型有些相似请参阅第130页上的“分布式参与者框架”如果我们把每个点当做一个参与者,除了顶点状态和顶点之间的消息具有容错性和持久性,并且通信以固定的方式进行:在每一次迭代中,框架传递在前一次迭代中发送的所有消息。参与者通常没有这样的时间保证。

容错

顶点只能通过消息传递进行通信而不是直接相互查询有助于提高Pregel作业的性能因为消息可以批处理而且等待通信的时间也减少了。唯一的等待是在迭代之间由于Pregel模型保证所有在一次迭代中发送的消息都传递给下一个迭代所以在下一个迭代开始之前先前的迭代必须完全完成并且所有的消息必须在网络上复制。

即使底层网络可能导致消息丢失重复或任意延迟请参阅第267页上的“不可靠网络”Pregel实施可保证在接下来的迭代中消息在其目标顶点处理并且仅处理一次。像MapReduce一样该框架透明地从故障中恢复以简化基于Pregel模型的算法。

这种容错通过在迭代结束时定期检查所有顶点的状态来实现的,即将其全部状态写入持久存储。如果某个节点发生故障并且其内存中状态丢失,则最简单的解决方法是将整个图计算回滚到上一个检查点,然后重新启动计算。如果算法是确定性的并且记录了消息,那么也可以选择性地只恢复丢失的分区(就像我们之前在数据流引擎中所讨论那样)[72]。

并行执行

顶点不需要知道它在哪个物理机器上执行;当它发送消息到其他顶点时它只是发送到一个顶点ID。框架来决定图的分区即确定哪个顶点运行在哪个机器上以及如何通过网络路由消息以便它们正确运行。

由于编程模型一次仅处理一个顶点(有时称为“像顶点一样思考”),所以框架可以以任意方式划分图形。理想情况下,如果顶点之间需要进行大量的通信,那么它将被分配在同一台机器上。然而,寻找这样一个优化的分区在实践中是困难的,图形顶点经常被任意分配,而不会尝试将相关的顶点分在一起。

因此,图算法通常会有很多跨机器通信,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销增大会显着减慢分布式图算法的速度。

出于这个原因,如果你的图可以放在一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理[73,74]。即使图形太大以至于无法放入内存也可以放在单个计算机的磁盘上使用GraphChi等框架进行单机处理是一个可行的选择[75]。如果图形太大而不适合单个机器像Pregel这样的分布式方案是不可避免的。人们正在对有效的并行化图算法进行探索。

高级API和语言

从MapReduce流行到现在分布式批处理的执行引擎已经成熟。到目前为止基础设施已经足够强大能够存储和处理超过10,000台机器群集上的PB级别的数据。由于在这种规模下物理操作批处理过程的问题已经或多或少得到了解决所以我们更加关注其他方面改进编程模型提高处理效率扩大这些技术可以解决的问题集。

如前所述HivePigCascading和Crunch等高级语言和API由于减少了手工编写MapReduce作业带来的工作量而变得非常流行。随着Tez的出现这些高级语言还可以移动到新的数据流执行引擎而无需重写作业代码。 Spark和Flink也包括他们自己的高级数据流API经常从FlumeJava中获得灵感[34]。

这些数据流API通常使用关系型构建块来表达一个计算基于某个字段的值连接数据集;按主键分组元组;通过一些条件过滤;并通过计数,求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。

除了需要较少代码这个明显优势之外这些高级接口还允许交互式使用在这种交互式使用中您可以将分析代码增量地写到shell中并经常运行以观察它在做什么。这种开发风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学我们在第394页的“Unix哲学”中讨论过这个问题。

此外,这些高级接口不仅使我们使用系统的效率更高,而且提高了机器的工作执行效率。

向声明式查询语言转变

与拼接执行连接的代码相比,指定连接为关系运算符的优点是,框架可以分析连接输入的属性,并自动决定哪个上述连接算法最适合手头的任务。 HiveSpark和Flink都有基于开销的查询优化器可以做到这一点甚至可以改变连接顺序减少中间状态的数量最[66,77,78,79]。

连接算法的选择可以对批处理作业的性能产生很大的影响不必理解和记住本章中讨论的各种连接算法对用户来说更加友好。如果以声明的方式指定连接则可以做到这点应用程序简单地说明哪些连接是必需的查询优化器决定如何最好地执行连接。我们以前在第42页的“数据的查询语言”中提到了这个想法。

但是在其他方面MapReduce和之后的数据流引擎与SQL的完全声明性查询模型有很大不同。 MapReduce是围绕函数回调的思想构建的对于每个记录或者一组记录调用一个用户定义的函数mapper或reducer并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以在现有大型系统上进行解析自然语言分析图像分析以及运行数字或统计算法等。

我们一直使用能否轻松运行任意代码来区分MapReduce和MPP数据库参见“比较Hadoop和分布式数据库”一节第414页。虽然数据库具有编写用户定义函数的功能但是它们通常使用起来很麻烦而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统例如Maven,npm,Rubygems做不到很好的集成。

但是数据流引擎已经发现除了连接之外更多使用声明性特征有其他的优点。例如如果一个回调函数只包含一个简单的过滤条件或者只是从一条记录中选择了一些字段那么在每条记录调用函数会带来相当大的CPU开销。如果以声明方式表示这样简单的过滤和映射操作那么查询优化器可以利用面向列的存储布局请参阅第95页的“面向列的存储”并从磁盘只读取所需的列。 HiveSpark DataFrames和Impala也使用向量化执行请参阅第99页的“内存带宽和向量化处理”在对CPU缓存很友好的内部循环中迭代数据并避免函数调用。Spark生成JVM字节码[79]Impala使用LLVM为这些内部循环生成原生代码[41]。

通过将声明式与高级API结合起来查询优化器可以在执行期间利用这些声明式方法批处理框架看起来更像MPP数据库并且在性能上可以媲美。同时通过运行任意代码和以读取任意格式数据保持扩展性它们保持了灵活性的优势。

不同领域专业化

尽管能够运行任意代码的可扩展性是有用的但是通常在标准处理模式不断重复发生所以值得构建通用模块的可重用实现。传统上MPP数据库满足了商业智能分析和业务报告的需求但这只是批处理适用的众多领域之一。

另一个越来越重要的领域是统计和数值算法它们是机器学习应用如分类和推荐系统所需要的。可重用的实现正在出现例如Mahout在MapReduceSpark和Flink之上实现了用于机器学习的各种算法而MADlib在关系型MPP数据库Apache HAWQ中也实现了类似的功能[54]。

空间算法例如k-近邻算法[80]也很有用,它在一些多维空间中搜索与目标接近的项 - 这是一种相似性搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串[81]。

分布式中的批处理引擎执行被越来越多的算法使用。随着批处理系统拥有内置功能和高级声明式运算并且MPP数据库变得更加可编程和灵活两者开始看起来更相似最终它们都只是存储和处理数据的系统。

本章小结

在本章中我们探讨了批处理话题。我们首先查看了诸如awkgrep和sort之类的Unix工具然后我们看到了这些工具的设计理念是如何运用到MapReduce和更新的数据流引擎中的。其中一些设计原则有: 输入是不可变的,输出是为了成为另一个(还未知的)程序的输入,而复杂的问题是通过编写“做好一件事”的小工具来解决的。

在Unix世界中允许一个程序与另一个程序组合的统一接口是文件和管道;在MapReduce中接口是一个分布式文件系统。我们看到数据流引擎添加了自己的管道式数据传输机制以避免将中间状态持久化到分布式文件系统但作业的初始输入和最终输出通常仍然是HDFS。

分布式批处理框架需要解决的两个主要问题是:

  • 分区

在MapReduce中mapper根据输入文件块进行分区。mapper的输出被重新分区排序合并到可配置数量的reducer分区中。这个过程的目的是整合相关数据 - 例如,所有主键相同的记录都放在同一个地方。

后MapReduce数据流引擎尽量避免排序除非无法避免但它们采取了大致类似的分区方法。

  • 容错

MapReduce经常写入磁盘可以从单个失败的任务中轻松地恢复而无需重新启动整个作业但在无故障的情况下减慢了执行速度。数据流引擎的中间状态较少持久化而是保留在内存中这意味着如果节点发生故障重新计算需要更多的数据。确定性运算符减少了需要重新计算的数据量。

我们讨论了几种MapReduce的连接算法其中大多数也是在MPP数据库和数据流引擎中使用的。他们还很好地阐述了分区算法如何工作

  • 排序合并连接

每个正在连接的输入都通过一个mapper提取主键。通过分区排序和合并具有相同主键的所有记录最终都会进入同一个reducer的函数。这个函数可以输出连接后的记录。

  • 广播哈希连接

连接的两个输入中有一个数量很少所以它不需要分区并且可以被完全加载到一个哈希表中。因此您可以为大的输入的每个分区启动一个mapper将小输入的哈希表加载到每个mapper中然后一次扫描大输入的一条记录并在哈希表中进行查询。

  • 分区哈希连接

如果连接的两个输入以相同的方式分区(使用相同的主键,相同的哈希函数和相同数量的分区),则可以独立地为每个分区使用哈希表。

分布式批处理引擎有一个故意限制的编程模型回调函数比如mapper和reducer被认为是无状态的除了指定的输出外没有外部可见的副作用。这个限制允许框架通过抽象抽象隐藏一些困难的分布式系统问题对崩溃和网络问题任务可以安全地重试任何失败任务的输出都被丢弃。如果某个分区的多个任务成功则只有其中一个真正使其输出可见。

得益于这个框架,您在批处理作业中的代码无需担心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,尽管可能不得不重新尝试各种任务。与处理用户请求并且写入数据库(这里可能给请求带来副作用)的在线服务相比,这种机制更为可靠。

批量处理工作的显着特点是它读取一些输入数据并产生一些输出数据,而不修改输入 - 换句话说,输出来源于输入。重要的是,输入数据是有界的:它有一个已知的,固定的大小(例如,某个时间点的日志文件或数据库内容的快照)。因为它是有界的,一个工作知道什么时候它完成了整个输入的读取,和什么时候任务最终完成。

在下一章中,我们将转向流处理,其中的输入是未知的 - 也就是说,你还有一个作业,但是它的输入是永无止境的数据流。在这种情况下,作业永远不会完成,因为在任何时候都可能有更多的任务进来。我们将看到流和批处理在某些方面类似,但是关于无边界的流的假设同样也给怎样构建系统带来了很多变化。

参考文献

  1. Jeffrey Dean and Sanjay Ghemawat: “MapReduce: Simplified Data Processing on Large Clusters,” at 6th USENIX Symposium on Operating System Design and Implementation (OSDI), December 2004.

  2. Joel Spolsky: “The Perils of JavaSchools,” joelonsoftware.com, December 25, 2005.

  3. Shivnath Babu and Herodotos Herodotou: “Massively Parallel Databases and MapReduce Systems,” Foundations and Trends in Databases, volume 5, number 1, pages 1104, November 2013. doi:10.1561/1900000036

  4. David J. DeWitt and Michael Stonebraker: “MapReduce: A Major Step Backwards,” originally published at databasecolumn.vertica.com, January 17, 2008.

  5. Henry Robinson: “The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google,” the-paper-trail.org, June 25, 2014.

  6. The Hollerith Machine,” United States Census Bureau, census.gov.

  7. IBM 82, 83, and 84 Sorters Reference Manual,” Edition A24-1034-1, International Business Machines Corporation, July 1962.

  8. Adam Drake: “Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster,” aadrake.com, January 25, 2014.

  9. GNU Coreutils 8.23 Documentation,” Free Software Foundation, Inc., 2014.

  10. Martin Kleppmann: “Kafka, Samza, and the Unix Philosophy of Distributed Data,” martin.kleppmann.com, August 5, 2015.

  11. Doug McIlroy: Internal Bell Labs memo, October 1964. Cited in: Dennis M. Richie: “Advice from Doug McIlroy,” cm.bell-labs.com.

  12. M. D. McIlroy, E. N. Pinson, and B. A. Tague: “UNIX Time-Sharing System: Foreword,” The Bell System Technical Journal, volume 57, number 6, pages 18991904, July 1978.

  13. Eric S. Raymond: The Art of UNIX Programming. Addison-Wesley, 2003. ISBN: 978-0-13-142901-7

  14. Ronald Duncan: “Text File Formats ASCII Delimited Text Not CSV or TAB Delimited Text,” ronaldduncan.wordpress.com, October 31, 2009.

  15. Alan Kay: “Is 'Software Engineering' an Oxymoron?,” tinlizzie.org.

  16. Martin Fowler: “InversionOfControl,” martinfowler.com, June 26, 2005.

  17. Daniel J. Bernstein: “Two File Descriptors for Sockets,” cr.yp.to.

  18. Rob Pike and Dennis M. Ritchie: “The Styx Architecture for Distributed Systems,” Bell Labs Technical Journal, volume 4, number 2, pages 146152, April 1999.

  19. Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung: “The Google File System,” at 19th ACM Symposium on Operating Systems Principles (SOSP), October 2003. doi:10.1145/945445.945450

  20. Michael Ovsiannikov, Silvius Rus, Damian Reeves, et al.: “The Quantcast File System,” Proceedings of the VLDB Endowment, volume 6, number 11, pages 10921101, August 2013. doi:10.14778/2536222.2536234

  21. OpenStack Swift 2.6.1 Developer Documentation,” OpenStack Foundation, docs.openstack.org, March 2016.

  22. Zhe Zhang, Andrew Wang, Kai Zheng, et al.: “Introduction to HDFS Erasure Coding in Apache Hadoop,” blog.cloudera.com, September 23, 2015.

  23. Peter Cnudde: “Hadoop Turns 10,” yahoohadoop.tumblr.com, February 5, 2016.

  24. Eric Baldeschwieler: “Thinking About the HDFS vs. Other Storage Technologies,” hortonworks.com, July 25, 2012.

  25. Brendan Gregg: “Manta: Unix Meets Map Reduce,” dtrace.org, June 25, 2013.

  26. Tom White: Hadoop: The Definitive Guide, 4th edition. O'Reilly Media, 2015. ISBN: 978-1-491-90163-2

  27. Jim N. Gray: “Distributed Computing Economics,” Microsoft Research Tech Report MSR-TR-2003-24, March 2003.

  28. Márton Trencséni: “Luigi vs Airflow vs Pinball,” bytepawn.com, February 6, 2016.

  29. Roshan Sumbaly, Jay Kreps, and Sam Shah: “The 'Big Data' Ecosystem at LinkedIn,” at ACM International Conference on Management of Data (SIGMOD), July 2013. doi:10.1145/2463676.2463707

  30. Alan F. Gates, Olga Natkovich, Shubham Chopra, et al.: “Building a High-Level Dataflow System on Top of Map-Reduce: The Pig Experience,” at 35th International Conference on Very Large Data Bases (VLDB), August 2009.

  31. Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, et al.: “Hive A Petabyte Scale Data Warehouse Using Hadoop,” at 26th IEEE International Conference on Data Engineering (ICDE), March 2010. doi:10.1109/ICDE.2010.5447738

  32. Cascading 3.0 User Guide,” Concurrent, Inc., docs.cascading.org, January 2016.

  33. Apache Crunch User Guide,” Apache Software Foundation, crunch.apache.org.

  34. Craig Chambers, Ashish Raniwala, Frances Perry, et al.: “FlumeJava: Easy, Efficient Data-Parallel Pipelines,” at 31st ACM SIGPLAN Conference on Programming Language Design and Implementation (PLDI), June 2010. doi:10.1145/1806596.1806638

  35. Jay Kreps: “Why Local State is a Fundamental Primitive in Stream Processing,” oreilly.com, July 31, 2014.

  36. Martin Kleppmann: “Rethinking Caching in Web Apps,” martin.kleppmann.com, October 1, 2012.

  37. Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira: Hadoop Application Architectures. O'Reilly Media, 2015. ISBN: 978-1-491-90004-8

  38. Philippe Ajoux, Nathan Bronson, Sanjeev Kumar, et al.: “Challenges to Adopting Stronger Consistency at Scale,” at 15th USENIX Workshop on Hot Topics in Operating Systems (HotOS), May 2015.

  39. Sriranjan Manjunath: “Skewed Join,” wiki.apache.org, 2009.

  40. David J. DeWitt, Jeffrey F. Naughton, Donovan A. Schneider, and S. Seshadri: “Practical Skew Handling in Parallel Joins,” at 18th International Conference on Very Large Data Bases (VLDB), August 1992.

  41. Marcel Kornacker, Alexander Behm, Victor Bittorf, et al.: “Impala: A Modern, Open-Source SQL Engine for Hadoop,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

  42. Matthieu Monsch: “Open-Sourcing PalDB, a Lightweight Companion for Storing Side Data,” engineering.linkedin.com, October 26, 2015.

  43. Daniel Peng and Frank Dabek: “Large-Scale Incremental Processing Using Distributed Transactions and Notifications,” at 9th USENIX conference on Operating Systems Design and Implementation (OSDI), October 2010.

  44. "Cloudera Search User Guide," Cloudera, Inc., September 2015.

  45. Lili Wu, Sam Shah, Sean Choi, et al.: “The Browsemaps: Collaborative Filtering at LinkedIn,” at 6th Workshop on Recommender Systems and the Social Web (RSWeb), October 2014.

  46. Roshan Sumbaly, Jay Kreps, Lei Gao, et al.: “Serving Large-Scale Batch Computed Data with Project Voldemort,” at 10th USENIX Conference on File and Storage Technologies (FAST), February 2012.

  47. Varun Sharma: “Open-Sourcing Terrapin: A Serving System for Batch Generated Data,” engineering.pinterest.com, September 14, 2015.

  48. Nathan Marz: “ElephantDB,” slideshare.net, May 30, 2011.

  49. Jean-Daniel (JD) Cryans: “How-to: Use HBase Bulk Loading, and Why,” blog.cloudera.com, September 27, 2013.

  50. Nathan Marz: “How to Beat the CAP Theorem,” nathanmarz.com, October 13, 2011.

  51. Molly Bartlett Dishman and Martin Fowler: “Agile Architecture,” at O'Reilly Software Architecture Conference, March 2015.

  52. David J. DeWitt and Jim N. Gray: “Parallel Database Systems: The Future of High Performance Database Systems,” Communications of the ACM, volume 35, number 6, pages 8598, June 1992. doi:10.1145/129888.129894

  53. Jay Kreps: “But the multi-tenancy thing is actually really really hard,” tweetstorm, twitter.com, October 31, 2014.

  54. Jeffrey Cohen, Brian Dolan, Mark Dunlap, et al.: “MAD Skills: New Analysis Practices for Big Data,” Proceedings of the VLDB Endowment, volume 2, number 2, pages 14811492, August 2009. doi:10.14778/1687553.1687576

  55. Ignacio Terrizzano, Peter Schwarz, Mary Roth, and John E. Colino: “Data Wrangling: The Challenging Journey from the Wild to the Lake,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

  56. Paige Roberts: “To Schema on Read or to Schema on Write, That Is the Hadoop Data Lake Question,” adaptivesystemsinc.com, July 2, 2015.

  57. Bobby Johnson and Joseph Adler: “The Sushi Principle: Raw Data Is Better,” at Strata+Hadoop World, February 2015.

  58. Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, et al.: “Apache Hadoop YARN: Yet Another Resource Negotiator,” at 4th ACM Symposium on Cloud Computing (SoCC), October 2013. doi:10.1145/2523616.2523633

  59. Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, et al.: “Large-Scale Cluster Management at Google with Borg,” at 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741964

  60. Malte Schwarzkopf: “The Evolution of Cluster Scheduler Architectures,” firmament.io, March 9, 2016.

  61. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, et al.: “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing,” at 9th USENIX Symposium on Networked Systems Design and Implementation (NSDI), April 2012.

  62. Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia: Learning Spark. O'Reilly Media, 2015. ISBN: 978-1-449-35904-1

  63. Bikas Saha and Hitesh Shah: “Apache Tez: Accelerating Hadoop Query Processing,” at Hadoop Summit, June 2014.

  64. Bikas Saha, Hitesh Shah, Siddharth Seth, et al.: “Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications,” at ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742790

  65. Kostas Tzoumas: “Apache Flink: API, Runtime, and Project Roadmap,” slideshare.net, January 14, 2015.

  66. Alexander Alexandrov, Rico Bergmann, Stephan Ewen, et al.: “The Stratosphere Platform for Big Data Analytics,” The VLDB Journal, volume 23, number 6, pages 939964, May 2014. doi:10.1007/s00778-014-0357-y

  67. Michael Isard, Mihai Budiu, Yuan Yu, et al.: “Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks,” at European Conference on Computer Systems (EuroSys), March 2007. doi:10.1145/1272996.1273005

  68. Daniel Warneke and Odej Kao: “Nephele: Efficient Parallel Data Processing in the Cloud,” at 2nd Workshop on Many-Task Computing on Grids and Supercomputers (MTAGS), November 2009. doi:10.1145/1646468.1646476

  69. Lawrence Page, Sergey Brin, Rajeev Motwani, and Terry Winograd: “The PageRank

  70. Leslie G. Valiant: “A Bridging Model for Parallel Computation,” Communications of the ACM, volume 33, number 8, pages 103111, August 1990. doi:10.1145/79173.79181

  71. Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl: “Spinning Fast Iterative Data Flows,” Proceedings of the VLDB Endowment, volume 5, number 11, pages 1268-1279, July 2012. doi:10.14778/2350229.2350245

  72. Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, et al.: “Pregel: A System for Large-Scale Graph Processing,” at ACM International Conference on Management of Data (SIGMOD), June 2010. doi:10.1145/1807167.1807184

  73. Frank McSherry, Michael Isard, and Derek G. Murray: “Scalability! But at What COST?,” at 15th USENIX Workshop on Hot Topics in Operating Systems (HotOS), May 2015.

  74. Ionel Gog, Malte Schwarzkopf, Natacha Crooks, et al.: “Musketeer: All for One, One for All in Data Processing Systems,” at 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741968

  75. Aapo Kyrola, Guy Blelloch, and Carlos Guestrin: “GraphChi: Large-Scale Graph Computation on Just a PC,” at 10th USENIX Symposium on Operating Systems Design and Implementation (OSDI), October 2012.

  76. Andrew Lenharth, Donald Nguyen, and Keshav Pingali: “Parallel Graph Analytics,” Communications of the ACM, volume 59, number 5, pages 7887, May 2016. doi:10.1145/2901919

  77. Fabian Hüske: “Peeking into Apache Flink's Engine Room,” flink.apache.org, March 13, 2015.

  78. Mostafa Mokhtar: “Hive 0.14 Cost Based Optimizer (CBO) Technical Overview,” hortonworks.com, March 2, 2015.

  79. Michael Armbrust, Reynold S Xin, Cheng Lian, et al.: “Spark SQL: Relational Data Processing in Spark,” at ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742797

  80. Daniel Blazevski: “Planting Quadtrees for Apache Flink,” insightdataengineering.com, March 25, 2016.

  81. Tom White: “Genome Analysis Toolkit: Now Using Apache Spark for Data Processing,” blog.cloudera.com, April 6, 2016.


上一章 目录 下一章
第三部分:派生数据 设计数据密集型应用 第十章:流处理

  1. 有些人认为cat这里并没有必要因为输入文件可以直接作为awk的参数。 但这种写法让线性管道更为显眼。 ↩︎

  2. 统一接口的另一个例子是URL和HTTP他们是Web的基石。 一个URL标识一个网站上的一个特定的东西资源你可以在一个网站上链接到任何网址。 浏览器用户因此可以通过链接在网站之间无缝跳转,即使服务器属于完全不相关的组织。 这个原则看起来平淡无奇,但它是网络取得今天成功的关键。 之前的系统并不是那么统一例如在广播公告系统BBS时代每个系统都有自己的电话号码和波特率配置。 从一个BBS到另一个BBS的引用必须以电话号码和调制解调器设置的形式; 用户将不得不挂断拨打其他BBS然后手动找到他们正在寻找的信息无法直接链接到另一个BBS中内容。 ↩︎

  3. 巴尔干化Balkanization 是一个常带有贬义的地缘政治学术语,其定义为:一个国家或政区分裂成多个互相敌对的国家或政区的过程。 ↩︎

  4. 除了使用一个单独的工具,如netcatcurl。 Unix开始试图将所有东西都表示为文件但是BSD套接字API偏离了这个惯例[17]。研究型操作系统Plan 9和Inferno在使用文件方面更加一致它们将TCP连接表示为/net/tcp中的文件[18]。 ↩︎

  5. 对于HDFS一个不同之处在于可以将计算任务安排在存储特定文件副本的计算机上运行而对象存储通常将存储和计算分开。如果网络带宽是瓶颈从本地磁盘读取有性能优势。但是请注意如果是删除功能局部存储将没有优势因为来自多台机器的数据必须进行合并以重建原始文件[20]。 ↩︎

  6. 我们在本书中讨论的连接通常是等值连接即最常见的连接类型其中记录与其他记录通过主键例如ID连接。有些数据库支持更通用的连接类型例如使用<而不是=,由于篇幅所限,我们不予讨论。 ↩︎

  7. 这个例子假定哈希表中的每个键只有一个记录这对用户数据库用户ID唯一标识一个用户可能是正确的。通常哈希表可能需要包含具有相同键的多个记录并且连接运算符将输出主键的所有匹配。 ↩︎