08月19, 2017

Yarn、Flume、Storm简介

支持批处理的MapReduce,支持迭代图算法的Dreme

Yarn

YARN简介

YARN的基本思想是将JobTracker的两个主要功能(资源管理和作业调度/监控)分离,主要方法是创建一个全局的ResourceManager(RM)和若干个针对应用程序的ApplicationMaster(AM)。这里的应用程序是指传统的MapReduce作业或作业的DAG(有向无环图)。

  • YARN 分层结构的本质是 ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager 将各个资源部分(计算、内存、带宽等)精心安排给基础 NodeManager(YARN 的每节点代理)。ResourceManager 还与 ApplicationMaster 一起分配资源,与 NodeManager 一起启动和监视它们的基础应用程序。在此上下文中,ApplicationMaster 承担了以前的 TaskTracker 的一些角色,ResourceManager 承担了 JobTracker 的角色。
  • ApplicationMaster 管理一个在 YARN 内运行的应用程序的每个实例。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配)。请注意,尽管目前的资源更加传统(CPU 核心、内存),但未来会带来基于手头任务的新资源类型(比如图形处理单元或专用处理设备)。从 YARN 角度讲,ApplicationMaster 是用户代码,因此存在潜在的安全问题。YARN 假设 ApplicationMaster 存在错误或者甚至是恶意的,因此将它们当作无特权的代码对待。
  • NodeManager 管理一个 YARN 集群中的每个节点。NodeManager 提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1 通过插槽管理 Map 和 Reduce 任务的执行,而 NodeManager 管理抽象容器,这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。YARN 继续使用 HDFS 层。它的主要 NameNode 用于元数据服务,而 DataNode 用于分散在一个集群中的复制存储服务。
  • 要使用一个 YARN 集群,首先需要来自包含一个应用程序的客户的请求。ResourceManager 协商一个容器的必要资源,启动一个 ApplicationMaster 来表示已提交的应用程序。通过使用一个资源请求协议,ApplicationMaster 协商每个节点上供应用程序使用的资源容器。执行应用程序时,ApplicationMaster 监视容器直到完成。当应用程序完成时,ApplicationMaster 从 ResourceManager 注销其容器,执行周期就完成了。

YARN的核心思想

将JobTracker和TaskTacker进行分离,它由下面几大构成组件: a. 一个全局的资源管理器 ResourceManager b. ResourceManager的每个节点代理 NodeManager c. 表示每个应用的 ApplicationMaster d. 每一个ApplicationMaster拥有多个Container在NodeManager上运行

Flume

系统功能

  1. 日志收集 Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
  2. 数据处理 Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力 Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。

工作方式

  • Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
  • Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

Storm

设计特征

  • 适用场景广:Storm 可以用来处理消息和更新数据库(消息的流处理),对一个数据量进行持续的查询并将结果返回给客户端(连续计算),对于耗费资源的查询进行并行化处理(分布式方法调用),Storm 提供的计算原语可以满足诸如以上所述的大量场景。
  • 可伸缩性强:Storm 的可伸缩性可以让Storm 每秒处理的消息量达到很高,如100 万。实现计算任务的扩展,只需要在集群中添加机器,然后提高计算任务的并行度设置。Storm 网站上给出了一个具有伸缩性的例子,一个Storm应用在一个包含10 个节点的集群上每秒处理1 000 000 个消息,其中包括每秒100 多次的数据库调用。Storm 使用Apache ZooKeeper 来协调集群中各种配置的同步,这样Storm 集群可以很容易地进行扩展。
  • 保证数据不丢失:实时计算系统的关键就是保证数据被正确处理,丢失数据的系统使用场景会很窄,而Storm 可以保证每一条消息都会被处理,这是Storm 区别于S4(Yahoo 开发的实时计算系统)系统的关键特征。
  • 健壮性强:不像Hadoop 集群很难进行管理,它需要管理人员掌握很多Hadoop 的配置、维护、调优的知识。而Storm 集群很容易进行管理,容易管理是Storm 的设计目标之一。
  • 高容错:Storm 可以对消息的处理过程进行容错处理,如果一条消息在处理过程中失败,那么Storm 会重新安排出错的处理逻辑。Storm 可以保证一个处理逻辑永远运行。
  • 语言无关性:Storm 应用不应该只能使用一种编程平台,Storm 虽然是使用Clojure 语言开发实现,但是,Storm 的处理逻辑和消息处理组件都可以使用任何语言来进行定义,这就是说任何语言的开发者都可以使用Storm。

关键概念

(1)计算拓扑(Topologies) 在 Storm 中,一个实时计算应用程序的逻辑被封装在一个称为Topology 的对象中,也称为计算拓扑。Topology 有点类似于Hadoop 中的MapReduce Job,但是它们之间的关键区别在于,一个MapReduce Job 最终总是会结束的,然而一个Storm 的Topology 会一直运行。在逻辑上,一个Topology 是由一些Spout(消息的发送者)和Bolt(消息的处理者)组成图状结构,而链接Spouts 和Bolts 的则是Stream Groupings。 (2)消息流(Streams) 消息流是 Storm 中最关键的抽象,一个消息流就是一个没有边界的tuple序列,tuple 是一种Storm 中使用的数据结构,可以看作是没有方法的Java 对象。这些tuple 序列会被一种分布式的方式并行地在集群上进行创建和处理。对消息流的定义主要就是对消息流里面的tuple 进行定义,为了更好地使用tuple,需要给tuple 里的每个字段取一个名字,并且不同的tuple 字段对应的类型要相同,即两个tuple 的第一个字段类型相同,第二个字段类型相同,但是第一个字段和第二个字段的类型可以不同。默认情况下,tuple 的字段类型可以为integer、long、short、byte、string、double、float、boolean 和byte array 等基本类型,也可以自定义类型,只需要实现相应的序列化接口。每一个消息流在定义的时候需要被分配一个id,最常见的消息流是单向的消息流,在Storm 中OutputFieldsDeclarer 定义了一些方法,让你可以定义一个Stream 而不用指定这个id。在这种情况下,这个Stream 会有个默认的id: 1。 (3)消息源(Spouts) Spouts 是Storm 集群中一个计算任务(Topology)中消息流的生产者,Spouts一般是从别的数据源(例如,数据库或者文件系统)加载数据,然后向Topology中发射消息。在一个Topology 中存在两种Spouts,一种是可靠的Spouts,一种是非可靠的Spouts,可靠的Spouts 在一个tuple 没有成功处理的时候会重新发射该tuple,以保证消息被正确地处理。不可靠的Spouts 在发射一个tuple 之后,不会再重新发射该tuple,即使该tuple 处理失败。每个Spouts 都可以发射多个消息流,要实现这样的效果,可以使用OutFieldsDeclarer.declareStream 来定义多个Stream,然后使用SpoutOutputCollector 来发射指定的Stream。 在Storm 的编程接口中,Spout 类最重要的方法是nextTuple()方法,使用该方法可以发射一个消息tuple 到Topology 中,或者简单地直接返回,如果没有消息要发射。需要注意的是,nextTuple 方法的实现不能阻塞Spout,因为Storm在同一线程上调用Spout 的所有方法。Spout 类的另外两个重要的方法是ack()和fail(),一个tuple 被成功处理完成后,ack()方法被调用,否则就调用fail()方法。注意,只有对于可靠的Spout,才会调用ack()和fail()方法。 (4)消息处理者(Bolts) 所有消息处理的逻辑都在Bolt 中完成,在Bolt 中可以完成如过滤、分类、聚集、计算、查询数据库等操作。Bolt 可以做简单的消息处理操作,例如,Bolt 可以不做任何操作,只是将接收到的消息转发给其他的Bolt。Bolt 也可以做复杂的消息流的处理,从而需要很多个Bolt。在实际使用中,一条消息往往需要经过多个处理步骤,例如,计算一个班级中成绩在前十名的同学,首先需要对所有同学的成绩进行排序,然后在排序过的成绩中选出前十名的 成绩的同学。所以在一个Topology 中,往往有很多个Bolt,从而形成了复杂的流处理网络。 Bolts 不仅可以接收消息,也可以像Spout 一样发射多条消息流,可以使用OutputFieldsDeclarer.declareStream 定义Stream,使用OutputCollector.emit 来选择要发射的Stream。在编程接口上,Bolt 类中最终需要的方法是execute()方法,该方法的参数就是输入Tuple,Bolt 使用OutputCollector 发送消息tuple,Bolt 对于每个处理过的消息tuple 都必须调用OutputCollector 的ack()方法,通知Storm 这个消息被处理完成,最终会通知到发送该消息的源,即Spout。消息在Bolt 中的处理过程一般是这样,Bolt 将接收到的消息tuple 进行处理,然后发送0 个或多个消息tuple,之后调用OutputCollector 的ack()方法通知消息的发送者。 (5)Stream Groupings(消息分组策略) 定义一个 Topology 的其中一步是定义每个Bolt 接收什么样的流作为输入。Stream Grouping 就是用来定义一个Stream 应该如何分配给Bolts 上面的多个Tasks。Storm 里面有6 种类型的Stream Grouping。 ● Shuffle Grouping:随机分组,随机派发Stream 里面的tuple,保证每个Bolt 接收到的tuple 数目相同。 ● Fields Grouping:按字段分组,比如按userid 来分组,具有同样userid 的tuple 会被分到相同的Bolts,而不同的userid 则会被分配到不同的Bolts。 ● All Grouping:广播发送,对于每一个tuple,所有的Bolts 都会收到。 ● Global Grouping: 全局分组,这个tuple 被分配到Storm 中一个Bolt 的其中一个Task。再具体一点就是分配给id 值最低的那个Task。 ● Non Grouping:不分组,这个分组的意思是Stream 不关心到底谁会收到它的tuple。目前这种分组和Shuffle Grouping 是一样的效果,有一点不同的是Storm 会把这个Bolt 放到此Bolt 的订阅者同一个线程里面去执行。 ● Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个Task 处理这个消息。只有被声明为Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple 必须使用emitDirect 方法来发送。消息处理者可以通过TopologyContext 来获取处理它的消息的taskid(OutputCollector.emit 方法也会返回taskid)。 (6)可靠性(Reliability) Storm 可以保证每个消息tuple 会被Topology 完整地处理,Storm 会追踪每个从Spout 发送出的消息tuple 在后续处理过程中产生的消息树(Bolt 接收到的消息完成处理后又可以产生0 个或多个消息,这样反复进行下去,就会形成一棵消息树),Storm 会确保这棵消息树被成功地执行。Storm 对每个消息都设置了一个超时时间,如果在设定的时间内,Storm 没有检测到某个从Spout 发送的tuple 是否执行成功,Storm 会假设该tuple 执行失败,因此会重新发送该tuple。这样就保证了每条消息都被正确地完整地执行。 Storm 保证消息的可靠性是通过在发送一个tuple 和处理完一个tuple 的时候都需要像Storm 一样返回确认信息来实现的,这一切是由OutputCollector 来完成的。通过它的emit 方法来通知一个新的tuple 产生,通过它的ack 方法通知一个tuple 处理完成。 (7)任务(Tasks) 在 Storm 集群上,每个Spout 和Bolt 都是由很多个Task 组成的,每个Task对应一个线程,流分组策略就是定义如何从一堆Task 发送tuple 到另一堆Task。在实现自己的Topology 时可以调用TopologyBuilder.setSpout() 和TopBuilder.setBolt()方法来设置并行度,也就是有多少个Task。 (8)工作进程(Worker) 一个 Topology 可能会在一个或者多个工作进程里面执行,每个工作进程执行整个Topology 的一部分。比如,对于并行度是300 的Topology 来说,如果我们使用50 个工作进程来执行,那么每个工作进程会处理其中的6 个Tasks(其实就是每个工作进程里面分配6 个线程)。Storm 会尽量均匀地把工作分配给所有的工作进程。 (9)配置 在 Storm 里面可以通过配置大量的参数来调整Nimbus、Supervisor 以及正在运行的Topology 的行为,一些配置是系统级别的,一些配置是Topology 级别的。所有有默认值的配置的默认配置是配置在default.xml 里面的,用户可以通过定义一个storm.xml 在classpath 里来覆盖这些默认配置。并且也可以使用Storm Submitter 在代码里面设置一些Topology 相关的配置信息。当然,这些配置的优先级是default.xml<storm.xml<TOPOLOGY-SPECIFIC 配置。

本文链接:https://blog.jnliok.com/post/study-yarn-flume-storm.html

-- EOF --

Comments