雪花算法生成的是Long类型的ID,一个Long类型占8个字节,每个字节占8比特,也就是说一 个Long类型占64个比特。 雪花ID组成结构:正数位(占1比特)+ 时间戳(占41比特)+ 机器 ID(占5比特)+ 数据中心(占5比特)+ 自增值(占12比特),总共64比特组成的一个Long 类型。 第一个bit位(1bit):Java中long的最高位是符号位代表正负,正数是0,负数是1, 一般生成ID都为正数,所以默认为0。 时间戳部分(41bit):毫秒级的时间,不建议存当前时 间戳,而是用(当前时间戳 – 固定开始时间戳)的差值,可以使产生的ID从更小的值开始;41 位的时间戳可以使用69年,(1L << 41) / (1000L 60 60 24 365) = 69年 工作机器 id(10bit):也被叫做workId,这个可以灵活配置,机房或者机器号组合都可以。 序列号部 分(12bit),自增值支持同一毫秒内同一个节点可以生成4096个ID
任何分布式系统都无法同时满足一致性(consistency),可用性(availibity),分区容错性 (partition tolerance)这三项,最多只可同时满足其中的两项。
涉及到多个数据库操作的事务即为分布式事务,目的是为保证分布式系统中的数据一致性.
二阶段提交2PC:第一步请求阶段通过协调者来统计表决结果,第二步执行表决后的结果,如果表 决的结果是提交,那就提交执行,否则不执行提交.缺点是同步阻塞,而且万一协调者挂了就无法保 证ACID.
三阶段提交3PC:在2PC的第一步拆分成了2步并且引入了超时机制,解决了2PC的痛点.第一步先 向参与者发出一个信号,看看大家是否都能提交,如果可以就返回yes,否则返回no.第二步 PreCommit阶段,预提交一下,如果参与者可以完成commit,就返回ack进确认,如果不能则放弃 提交本次事务.第三步doCommit阶段,进行真正的事务提交.
try,commit,cancel的缩写,try阶段进行检测,commit提交执行,只要try阶段成功了commit就 一定会被执行,cancel业务出现错误时执行,回滚事务,释放资源.
轮询(Round Robin)
轮询算法把每个请求轮流发送到每个服务器上。 该算法比较适合每个服务器的性能差不多的场景,如果有性能存在差异的情况下,那么性能较 差的服务器可能无法承担多大的负载。
加权轮询(Weighted Round Robbin)
加权轮询是在轮询的基础上,根据服务器的性能差异,为服务器赋予一定的权值。
最少连接(least Connections)
由于每个请求的连接时间不一样,使用轮询或者加权轮询算法的话,可能会让一台服务器当前 连接数过多,而另一台服务器的连接数过少,造成负载不均衡。最少连接算法就是将请求发送 给当前最少连接数的服务器上。
加权最小连接(Weighted Least Connection)
在最小连接的基础上,根据服务器的性能为每台服务器分配权重,然后根据权重计算出每台服 务器能处理的连接数。
随机算法(Random)
把请求随机发送到服务器上。和轮询算法类似,该算法比较适合服务器性能差不多的场景。
DNS 解析
使用 DNS 作为负载均衡器,会根据负载情况返回不同服务器的 IP 地址。大型网站基本使用了 这种方式最为第一级负载均衡手段,然后在内部在第二级负载均衡。
修改 MAC 地址
使用 LVS(Linux Virtual Server)这种链路层负载均衡器,根据负载情况修改请求的 MAC 地址。
修改 IP 地址
在网络层修改请求的目的 IP 地址。
HTTP 重定向
HTTP 重定向负载均衡服务器收到 HTTP 请求之后会返回服务器的地址,并将该地址写入 HTTP 重定向响应中返回给浏览器,浏览器收到后再次发送请求。
正向代理:发生在客户端,是由用户主动发起的。
比如翻墙,客户端通过主动访问代理服务 器,让代理服务器获得需要的外网数据,然后转发回客户端。
反向代理:发生在服务器端,用户不知道发生了代理。
如果不做任何处理的话,用户将出现频繁登录的现象,比如集群中存在 A、B 两台服务器,用 户在第一次访问网站时,Nginx 通过其负载均衡机制将用户请求转发到 A 服务器,这时 A 服 务器就会给用户创建一个 Session。当用户第二次发送请求时,Nginx 将其负载均衡到 B 服务 器,而这时候 B 服务器并不存在 Session,所以就会将用户踢到登录页面。这将大大降低用户 体验度,导致用户的流失,这种情况是项目绝不应该出现的。
- 粘性 Session 原理 粘性 Session 是指将用户锁定到某一个服务器上,比如上面说的例子,用户第一次请求 时,负载均衡器将用户的请求转发到了 A 服务器上,如果负载均衡器设置了粘性 Session 的话,那么用户以后的每次请求都会转发到 A 服务器上,相当于把用户和 A 服 务器粘到了一块,这就是粘性 Session 机制。
优点
简单,不需要对 Session 做任何处理。
缺点
缺乏容错性,如果当前访问的服务器发生故障,用户被转移到第二个服务器上时,他的 Session 信息都将失效。
适用场景
发生故障对客户产生的影响较小; 服务器发生故障是低概率事件。
- 服务器 Session 复制
原理
任何一个服务器上的 Session 发生改变,该节点会把这个 Session 的所有内容序列化,然后广 播给所有其它节点,不管其他服务器需不需要 Session,以此来保证 Session 同步。
优点
可容错,各个服务器间 Session 能够实时响应。
缺点
会对网络负荷造成一定压力,如果 Session 量大的话可能会造成网络堵塞,拖慢服务器性能。 实现方式 设置 Tomcat 的 server.xml 开启 tomcat 集群功能。 在应用里增加信息:通知应用当前处于集群环境中,支持分布式,即在 web.xml 中添加 选 项。
- Session 共享机制
使用分布式缓存方案比如 Memcached、Redis,但是要求 Memcached 或 Redis 必须是集群。
使用 Session 共享也分两种机制,两种情况如下:
3.1 粘性 Session 共享机制 和粘性 Session 一样,一个用户的 Session 会绑定到一个 Tomcat 上。Memcached 只是起 到备份作用。
3.2 非粘性 Session 共享机制
原理
Tomcat 本身不存储 Session,而是存入 Memcached 中。Memcached 集群构建主从复制架 构。
优点
可容错,Session 实时响应。 实现方式 用开源的 msm 插件解决 Tomcat 之间的 Session 共享: Memcached_Session_Manager(MSM)
- Session 持久化到数据库
原理
拿出一个数据库,专门用来存储 Session 信息。保证 Session 的持久化。
优点
服务器出现问题,Session 不会丢失。
缺点
如果网站的访问量很大,把 Session 存储到数据库中,会对数据库造成很大压力,还需要增加 额外的开销维护数据库。
- Terracotta 实现 Session 复制
原理
Terracotta 的基本原理是对于集群间共享的数据,当在一个节点发生变化的时候, Terracotta 只把变化的部分发送给 Terracotta 服务器,然后由服务器把它转发给真正需要这 个数据的节点。它是服务器 Session 复制的优化。
优点
这样对网络的压力就非常小,各个节点也不必浪费 CPU 时间和内存进行大量的序列化操作。 把这种集群间数据共享的机制应用在 Session 同步上,既避免了对数据库的依赖,又能达到负 载均衡和灾难恢复的效果。
前端。每次点击后都要等X秒才能点击。
数据库添加唯一索引 服务器返回表单页面时,会先生成一个token保存于session或redis,当表单提交时候携带 token,如果token一致,则执行后续,并将服务器中的token删除。
前端:在秒杀之前,按钮置灰,并且不给前端真正的请求地址。前端定时请求后端接口,如果到了秒杀时间,则返回给前端真正的地址,前端放开按钮,每次点击后都要等X秒才能点击。
服务器:服务器用nginx做集群、redis也做集群
限流:在秒杀之前,将秒杀数量的令牌存入到redis中,可以用list,每次来请求都去redis中取出令牌,如果获取到说明秒杀成功,然后去访问数据库下单,如果没有获取到,则说明商品卖完了。
消息中间件:如果秒杀数量比较多,例如上万十万,则秒杀成功之后,将成功的请求放入到 mq或者kafka中间件中,再从消息队列中获取请求。
服务降级:为了以防万一,还是要做服务熔断降级。
唯一id。每次操作,都根据操作和内容生成唯一的id,在执行之前先判断id是否存在,如果不 存在则执行后续操作,并且保存到数据库或者redis等。
服务端提供发送token的接口,业务调用接口前先获取token,然后调用业务接口请求时,把 token携带过去,务器判断token是否存在redis中,存在表示第一次请求,可以继续执行业务, 执行业务完成后,最后需要把redis中的token删除建去重表。
将业务中有唯一标识的字段保存到去重表,如果表中存在,则表示已经处理过了 版本控制。增加版本号,当版本号符合时,才能更新数据 状态控制。
例如订单有状态已支付 未支付 支付中 支付失败,当处于未支付的时候才允许修改为支付中等
一般来说,从业务逻辑上最好设计系统不需要这种顺序的保证,因为一旦引入顺序性保障,会导致系统复杂度的上升,效率会降低,对于热点数据会压力过大等问题。 首先使用一致性hash负载均衡策略,将同一个id的请求都分发到同一个机器上面去处理,比如 订单可以根据订单id。
如果处理的机器上面是多线程处理的,可以引入内存队列去处理,将相 同id的请求通过hash到同一个队列当中,一个队列只对应一个处理线程。
最好能将多个操作合并成一个操作。
BASE是 Basically Available (基本可用) Soft state(软状态) Eventually consistent(最终一 致性)这几个单词的缩写,是从CAP理论发展而来的,其核心思想是:即使无法做到强一致性,但每 个应用都可以根据自身特点,采取适当的方式来使系统达到最终一致性.
微服务是在SOA的基础上发展而来,从粒度上来说,微服务的粒度要比SOA更细.
微服务由于粒度更细,所以微服务架构的耦合度相对于SOA架构的耦合度更低.
微服务的服务规模相较于SOA一般要更大,所能承载的并发量也更高.
缓存雪崩我们可以简单的理解为:由于原有缓存失效,新缓存未到期间所有原本应该访问缓存的请求都去查询数据库了,而对数据库 CPU 和内存造成巨大压力,严重的会造成数据库宕机。从而形成一系列连锁反应,造成整个系统崩溃。一般有三种处理办法:
- 一般并发量不是特别多的时候,使用最多的解决方案是加锁排队。
- 给每一个缓存数据增加相应的缓存标记,记录缓存的是否失效,如果缓存标记失效,则更新数据缓存。
- 为 key 设置不同的缓存失效时间。
缓存穿透是指用户查询数据,在数据库没有,自然在缓存中也不会有。这样就导致用户查询的时候,在缓存中找不到,每次都要去数据库再查询一遍,然后返回空(相当于进行了两次无用的查询)。这样请求就绕过缓存直接查数据库,这也是经常提的缓存命中率问题。
有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的 bitmap 中,一个一定不存在的数据会被这个 bitmap 拦截掉,从而避免了对底层存储系统的查询压力。另外也有一个更为简单粗暴的方法,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。通过这个直接设置的默认值存放到缓存,这样第二次到缓冲中获取就有值了,而不会继续访问数据库。
缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统。这样就可以避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询事先被预热的缓存数据!
缓存更新除了缓存服务器自带的缓存失效策略之外(Redis 默认的有 6 中策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:
- 定时去清理过期的缓存;
- 当有用户请求过来时,再判断这个请求所用到的缓存是否过期,过期的话就去底层系统得到新数据并更新缓存。
当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。
就是一个大数据解决方案。它提供了一套分布式系统基础架构。 核心内容包含 hdfs 和mapreduce。hadoop2.0 以后引入 yarn. hdfs 是提供数据存储的,mapreduce 是方便数据计算的。
- hdfs 又对应 namenode 和 datanode. namenode 负责保存元数据的基本信息,datanode 直接存放数据本身;
- mapreduce 对应 jobtracker 和 tasktracker. jobtracker 负责分发任务,tasktracker 负责执行具体任务;
- 对应到 master/slave 架构,namenode 和 jobtracker 就应该对应到 master, datanode和 tasktracker 就应该对应到 slave.
Client(代表用 户) 通过与 NameNode 和 DataNode 交互访问 HDFS 中 的文件。 Client 提供了一个类似 POSIX 的文件系统接口供用户调用。
整个 Hadoop 集群中只有一个 NameNode。 它是整个系统的“ 总管”, 负责管理 HDFS 的目录树和相关的文件元数据信息。 这些信息是以“ fsimage”( HDFS 元数据镜像文件)和“ editlog”(HDFS 文件改动日志)两个文件形式存放在本地磁盘,当 HDFS 重启时重新构造出来的。此外, NameNode 还负责监控各个 DataNode 的健康状态, 一旦发现某个 DataNode 宕掉,则将该 DataNode 移出 HDFS 并重新备份其上面的数据。
Secondary NameNode 最重要的任务并不是为 NameNode 元数据进行热备份, 而是定期合并fsimage 和 edits 日志, 并传输给 NameNode。 这里需要注意的是,为了减小 NameNode 压力, NameNode 自己并不会合并 fsimage 和 edits, 并将文件存储到磁盘上, 而是交由Secondary NameNode 完成。
一般而言, 每个 Slave 节点上安装一个 DataNode, 它负责实际的数据存储, 并将数据信息定期汇报给 NameNode。 DataNode 以固定大小的 block 为基本单位组织文件内容, 默认情况下block 大小为 64MB。 当用户上传一个大的文件到 HDFS 上时, 该文件会被切分成若干个 block,分别存储到不同的 DataNode ; 同时,为了保证数据可靠, 会将同一个 block 以流水线方式写到若干个(默认是 3,该参数可配置)不同的 DataNode 上。 这种文件切割后存储的过程是对用户透明的。
同 HDFS 一样,Hadoop MapReduce 也采用了 Master/Slave(M/S)架构,具体如图所示。它主要由以下几个组件组成:Client、JobTracker、TaskTracker 和 Task。
用户编写的 MapReduce 程序通过 Client 提交到 JobTracker 端; 同时, 用户可通过 Client 提供的一些接口查看作业运行状态。 在 Hadoop 内部用“作业”(Job) 表示 MapReduce 程序。 一个 MapReduce 程序可对应若干个作业,而每个作业会被分解成若干个 Map/Reduce 任务(Task)。
JobTracker 主要负责资源监控和作业调度。JobTracker 监控所有 TaskTracker 与作业的健康状况,一旦发现失败情况后,其会将相应的任务转移到其他节点;同时 JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在 Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。
TaskTracker 会周期性地通过 Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker, 同时接收 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、 杀死任务等)。TaskTracker 使用“slot” 等量划分本节点上的资源量。“slot” 代表计算资源(CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,而 Hadoop 调度器的作用就是将各个TaskTracker 上的空闲 slot 分配给 Task 使用。 slot 分为 Map slot 和 Reduce slot 两种,分别供MapTask 和 Reduce Task 使用。 TaskTracker 通过 slot 数目(可配置参数)限定 Task 的并发度。
Task 分为 Map Task 和 Reduce Task 两种, 均由 TaskTracker 启动。 HDFS 以固定大小的 block 为基本单位存储数据, 而对于 MapReduce 而言, 其处理单位是 split。split 与 block 的对应关系如图所示。 split 是一个逻辑概念, 它只包含一些元数据信息, 比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。 但需要注意的是,split 的多少决定了 Map Task 的数目 ,因为每个 split 会交由一个 Map Task 处理。
Map Task 执行过程如图所示。 由该图可知,Map Task 先将对应的 split 迭代解析成一个个key/value 对,依次调用用户自定义的 map() 函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个 partition,每个 partition 将被一个 Reduce Task 处理。
该过程分为三个阶段
- 从远程节点上读取 MapTask 中间结果(称为“Shuffle 阶段”);
- 按照 key 对 key/value 对进行排序(称为“ Sort 阶段”);
- 依次读取<key, value list>,调用用户自定义的 reduce() 函数处理,并将最终结果存到 HDFS 上(称为“ Reduce 阶段”)。
生命周期:
- 作业提交与初始化。
- 任务调度与监控。
- 任务运行环境准备。
- 任务执行。
- 作业完成
用户提交作业后, 首先由 JobClient 实例将作业相关信息, 比如将程序 jar 包、作业配置文件、 分片元信息文件等上传到分布式文件系统( 一般为 HDFS)上,其中,分片元信息文件记录了每个输入分片的逻辑位置信息。 然后 JobClient 通过 RPC 通知 JobTracker。JobTracker 收到新作业提交请求后, 由 作业调度模块对作业进行初始化:为作业创建一个JobInProgress 对象以跟踪作业运行状况, 而 JobInProgress 则会为每个 Task 创建一个TaskInProgress 对象以跟踪每个任务的运行状态, TaskInProgress 可能需要管理多个“ Task 运行尝试”( 称为“ Task Attempt”)。
前面提到,任务调度和监控的功能均由 JobTracker 完成。TaskTracker 周期性地通过Heartbeat 向 JobTracker 汇报本节点的资源使用 情况, 一旦出 现空闲资源, JobTracker 会按照一定的策略选择一个合适的任务使用该空闲资源, 这由任务调度器完成。 任务调度器是一个可插拔的独立模块, 且为双层架构, 即首先选择作业, 然后从该作业中选择任务, 其中,选择任务时需要重点考虑数据本地性。 此外,JobTracker 跟踪作业的整个运行过程,并为作业的成功运行提供全方位的保障。 首先, 当 TaskTracker 或者 Task 失败时, 转移计算任务 ; 其次, 当某个 Task 执行进度远落后于同一作业的其他 Task 时,为之启动一个相同Task, 并选取计算快的 Task 结果作为最终结果。
运行环境准备包括 JVM 启动和资源隔 离, 均由 TaskTracker 实现。 TaskTracker 为每个Task 启动一个独立的 JVM 以避免不同 Task 在运行过程中相互影响 ; 同时,TaskTracker 使用了操作系统进程实现资源隔离以防止 Task 滥用资源。
TaskTracker 为 Task 准备好运行环境后, 便会启动 Task。 在运行过程中, 每个 Task 的最新进度首先由 Task 通过 RPC 汇报给 TaskTracker, 再由 TaskTracker 汇报给 JobTracker。
待所有 Task 执行完毕后, 整个作业执行成功。
Spark 提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。
包含 Spark 的基本功能;尤其是定义 RDD 的 API、操作以及这两者上的动作。其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的
提供通过 Apache Hive 的 SQL 变体 Hive 查询语言(HiveQL)与 Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。
对实时数据流进行处理和控制。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据
一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。
控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作
**Cluster Manager-制整个集群,监控 worker 在 standalone 模式中即为 Master 主节点,控制整个集群,监控 worker。在 YARN 模式中为资源管理器 Worker 节点-负责控制计算节点从节点,负责控制计算节点,启动 Executor 或者 Driver。 Driver: 运行 Application 的 main()函数 Executor:执行器,是为某个 Application 运行在 worker node 上的一个进程
Spark 应用程序从编写到提交、执行、输出的整个过程如图所示,图中描述的步骤如下:
- 用户使用 SparkContext 提供的 API(常用的有 textFile、sequenceFile、runJob、stop 等)编写 Driver application 程序。此外 SQLContext、HiveContext 及 StreamingContext 对SparkContext 进行封装,并提供了 SQL、Hive 及流式计算相关的 API。
- 使用SparkContext提交的用户应用程序,首先会使用BlockManager和BroadcastManager将任务的 Hadoop 配置进行广播。然后由 DAGScheduler 将任务转换为 RDD 并组织成 DAG,DAG 还将被划分为不同的 Stage。最后由 TaskScheduler 借助 ActorSystem 将任务提交给集群管理器(Cluster Manager)。
- 集群管理器(ClusterManager)给任务分配资源,即将具体任务分配到Worker上,Worker创建 Executor 来处理任务的运行。Standalone、YARN、Mesos、EC2 等都可以作为 Spark的集群管理器。
RDD 可以看做是对各种数据计算模型的统一抽象,Spark 的计算过程主要是 RDD 的迭代计算过程。RDD 的迭代计算过程非常类似于管道。分区数量取决于 partition 数量的设定,每个分区的数据只会在一个 Task 中计算。所有分区可以在多个机器节点的 Executor 上并行执行。
- 构建 Spark Application 的运行环境,启动 SparkContext
- SparkContext 向资源管理器(可以是 Standalone,Mesos,Yarn)申请运行 Executor 资源,并启动 StandaloneExecutorbackend,
- Executor 向 SparkContext 申请 Task
- SparkContext 将应用程序分发给 Executor
- SparkContext 构建成 DAG 图,将 DAG 图分解成 Stage、将 Taskset 发送给 Task Scheduler,最后由 Task Scheduler 将 Task 发送给 Executor 运行
- Task 在 Executor 上运行,运行完释放所有资源
- 创建 RDD 对象
- DAGScheduler 模块介入运算,计算 RDD 之间的依赖关系,RDD 之间的依赖关系就形成了DAG
- 每一个 Job 被分为多个 Stage。划分 Stage 的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个 Stage,避免多个 Stage 之间的消息传递开销
- 从 Hadoop 文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如 HDFS)创建。
- 从父 RDD 转换得到新 RDD。
- 通过 parallelize 或 makeRDD 将单机数据创建为分布式 RDD。
对于 RDD 可以有两种操作算子:转换(Transformation)与行动(Action)。
-
转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
-
行动(Action):Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark 系统。
Storm 是一个免费并开源的分布式实时计算系统。利用 Storm 可以很容易做到可靠地处理无限的数据流,像 Hadoop 批量处理大数据一样,Storm 可以实时处理数据。
Storm 集群的 Master 节点,负责分发用户代码,指派给具体的 Supervisor 节点上的 Worker 节点,去运行 Topology 对应的组件(Spout/Bolt)的 Task。
Storm 集群的从节点,负责管理运行在 Supervisor 节点上的每一个 Worker 进程的启动和终止。通过 Storm 的配置文件中的 supervisor.slots.ports 配置项,可以指定在一个 Supervisor 上最大允许多少个 Slot,每个 Slot 通过端口号来唯一标识,一个端口号对应一个 Worker 进程(如果该Worker 进程被启动)。
运行具体处理组件逻辑的进程。Worker 运行的任务类型只有两种,一种是 Spout 任务,一种是Bolt 任务。
worker中每一个spout/bolt的线程称为一个task. 在storm0.8 之后,task不再与物理线程对应,不同 spout/bolt 的 task 可能会共享一个物理线程,该线程称为 executor。
用来协调 Nimbus 和 Supervisor,如果 Supervisor 因故障出现问题而无法运行 Topology,Nimbus 会第一时间感知到,并重新分配 Topology 到其它可用的 Supervisor 上运行
strom 在运行中可分为 spout 与 bolt 两个组件,其中,数据源从 spout 开始,数据以 tuple 的方式发送到 bolt,多个 bolt 可以串连起来,一个 bolt 也可以接入多个 spot/bolt.运行时原理如下图:
Storm 中运行的一个实时应用程序的名称。将 Spout、 Bolt 整合起来的拓扑图。定义了 Spout 和Bolt 的结合关系、并发数量、配置等等。
在一个 topology 中获取源数据流的组件。通常情况下 spout 会从外部数据源中读取数据,然后转换为 topology 内部的源数据。
接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。
一次消息传递的基本单元,理解为一组消息就是一个 Tuple。
Tuple 的集合。表示数据的流向。
在 Storm 中,一个实时应用的计算任务被打包作为 Topology 发布,这同 Hadoop MapReduce任务相似。但是有一点不同的是:在 Hadoop 中,MapReduce 任务最终会执行完成后结束;而在Storm 中,Topology 任务一旦提交后永远不会结束,除非你显示去停止任务。计算任务Topology 是由不同的 Spouts 和 Bolts,通过数据流(Stream)连接起来的图。一个 Storm 在集群上运行一个 Topology 时,主要通过以下 3 个实体来完成 Topology 的执行工作:
1 个 worker 进程执行的是 1 个 topology 的子集(注:不会出现 1 个 worker 为多个 topology服务)。1 个 worker 进程会启动 1 个或多个 executor 线程来执行 1 个 topology 的component(spout 或 bolt)。因此,1 个运行中的 topology 就是由集群中多台物理机上的多个worker 进程组成的。
executor 是 1 个被 worker 进程启动的单独线程。每个 executor 只会运行 1 个 topology 的 1 个component(spout 或 bolt)的 task(注:task 可以是 1 个或多个,storm 默认是 1 个component 只生成 1 个 task,executor 线程里会在每次循环里顺序调用所有 task 实例)。
是最终运行 spout 或 bolt 中代码的单元(注:1 个 task 即为 spout 或 bolt 的 1 个实例,executor 线程在执行期间会调用该 task 的 nextTuple 或 execute 方法)。topology 启动后,1个 component(spout 或 bolt)的 task 数目是固定不变的,但该 component 使用的 executor 线程数可以动态调整(例如:1 个 executor 线程可以执行该 component 的 1 个或多个 task 实例)。这意味着,对于 1 个 component 存在这样的条件:#threads<=#tasks(即:线程数小于等于 task 数目)。默认情况下 task 的数目等于 executor 线程数目,即 1 个 executor 线程只运行 1 个 task。
Storm 中最重要的抽象,应该就是 Stream grouping 了,它能够控制 Spot/Bolt 对应的 Task 以什么样的方式来分发 Tuple,将 Tuple 发射到目的 Spot/Bolt 对应的 Task.
随机分组,尽量均匀分布到下游 Bolt 中将流分组定义为混排。这种混排分组意味着来自 Spout 的输入将混排,或随机分发给此 Bolt 中的任务。shuffle grouping 对各个 task 的 tuple 分配的比较均匀。
按字段分组,按数据中 field 值进行分组;相同 field 值的 Tuple 被发送到相同的 Task 这种grouping 机制保证相同 field 值的 tuple 会去同一个 task。
广播发送, 对于每一个 tuple 将会复制到每一个 bolt 中处理。
全局分组,Tuple 被分配到一个 Bolt 中的一个 Task,实现事务性的 Topology。Stream 中的所有的 tuple 都会发送给同一个 bolt 任务处理,所有的 tuple 将会发送给拥有最小 task_id 的 bolt任务处理。
不关注并行处理负载均衡策略时使用该方式,目前等同于 shuffle grouping,另外 storm 将会把bolt 任务和他的上游提供数据的任务安排在同一个线程下。
由 tuple 的发射单元直接决定 tuple 将发射给那个 bolt,一般情况下是由接收 tuple 的 bolt 决定接收哪个 bolt 发射的 Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个 task 处理这个消息。 只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。消息处理者可以通过TopologyContext 来获取处理它的消息的 taskid (OutputCollector.emit 方法也会返回taskid)。
YARN 是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。其中,ResourceManager 负责所有资源的监控、分配和管理; ApplicationMaster 负责每一个具体应用程序的调度和协调;NodeManager 负责每一个节点的维护。对于所有的 applications,RM 拥有绝对的控制权和对资源的分配权。而每个 AM 则会和 RM 协商资源,同时和 NodeManager 通信来执行和监控 task。
- ResourceManager 负责整个集群的资源管理和分配,是一个全局的资源管理系统。
- NodeManager 以心跳的方式向 ResourceManager 汇报资源使用情况(目前主要是 CPU 和内存的使用情况)。RM 只接受 NM 的资源回报信息,对于具体的资源处理则交给 NM 自己处理。
- YARN Scheduler 根据 application 的请求为其分配资源,不负责 application job 的监控、追踪、运行状态反馈、启动等工作。
- NodeManager 是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN集群每个节点都运行一个NodeManager。
- NodeManager 定时向 ResourceManager 汇报本节点资源(CPU、内存)的使用情况和Container 的运行状态。当 ResourceManager 宕机时 NodeManager 自动连接 RM 备用节点。
- NodeManager 接收并处理来自 ApplicationMaster 的 Container 启动、停止等各种请求。
用户提交的每个应用程序均包含一个 ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。
- 负责与 RM 调度器协商以获取资源(用 Container 表示)。
- 将得到的任务进一步分配给内部的任务(资源的二次分配)。
- 与 NM 通信以启动/停止任务。
- 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
- 当前 YARN 自带了两个 ApplicationMaster 实现,一个是用于演示 AM 编写方法的实例程序
DistributedShell,它可以申请一定数目的 Container 以并行运行一个 Shell 命令或者 Shell脚本;另一个是运行 MapReduce 应用程序的 AM—MRAppMaster。
注:RM 只负责监控 AM,并在 AM 运行失败时候启动它。RM 不负责 AM 内部任务的容错,任务的容错由 AM 完成。
- client 向 RM 提交应用程序,其中包括启动该应用的 ApplicationMaster 的必须信息,例如ApplicationMaster 程序、启动 ApplicationMaster 的命令、用户程序等。
- ResourceManager 启动一个 container 用于运行 ApplicationMaster。
- 启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳。
- ApplicationMaster 向 ResourceManager 发送请求,申请相应数目的 container。
- ResourceManager 返回 ApplicationMaster 的申请的 containers 信息。申请成功的container,由 ApplicationMaster 进行初始化。container 的启动信息初始化后,AM 与对应的 NodeManager 通信,要求 NM 启动 container。AM 与 NM 保持心跳,从而对 NM 上运行的任务进行监控和管理。
- container 运行期间,ApplicationMaster 对 container 进行监控。container 通过 RPC 协议向对应的 AM 汇报自己的进度和状态等信息。
- 应用运行期间,client 直接与 AM 通信获取应用的状态、进度更新等信息。
- 应用运行结束后,ApplicationMaster 向 ResourceManager 注销自己,并允许属于它的container 被收回。