Apache Storm内部原理分析

本文算是个人对Storm应用和学习的一个总结,由于不太懂Clojure语言,所以无法更多地从源码分析,但是参考了官网、好多朋友的文章,以及《Storm Applied: Strategies for real-time event processing》这本书,以及结合自己使用Storm的经历,希望对于想深入一点了解Storm原理的朋友能有所帮助,有不足之处欢迎拍砖交流。

Storm集群架构

Storm集群采用主从架构方式,主节点是Nimbus,从节点是Supervisor,有关调度相关的信息存储到ZooKeeper集群中,架构如下图所示:
storm-cluster
具体描述,如下所示:

  • Nimbus

Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。

  • Supervisor

Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。

  • ZooKeeper

用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。

Stream Groupings

Storm中最重要的抽象,应该就是Stream grouping了,它能够控制Spot/Bolt对应的Task以什么样的方式来分发Tuple,将Tuple发射到目的Spot/Bolt对应的Task,如下图所示:
storm-topology-tasks
目前,Storm Streaming Grouping支持如下几种类型:

  • Shuffle Grouping:随机分组,跨多个Bolt的Task,能够随机使得每个Bolt的Task接收到大致相同数目的Tuple,但是Tuple不重复
  • Fields Grouping:根据指定的Field进行分组 ,同一个Field的值一定会被发射到同一个Task上
  • Partial Key Grouping:与Fields grouping 类似,根据指定的Field的一部分进行分组分发,能够很好地实现Load balance,将Tuple发送给下游的Bolt对应的Task,特别是在存在数据倾斜的场景,使用 Partial Key grouping能够更好地提高资源利用率
  • All Grouping:所有Bolt的Task都接收同一个Tuple(这里有复制的含义)
  • Global Grouping:所有的流都指向一个Bolt的同一个Task(也就是Task ID最小的)
  • None Grouping:不需要关心Stream如何分组,等价于Shuffle grouping
  • Direct Grouping:由Tupe的生产者来决定发送给下游的哪一个Bolt的Task ,这个要在实际开发编写Bolt代码的逻辑中进行精确控制
  • Local or Shuffle Grouping:如果目标Bolt有1个或多个Task都在同一个Worker进程对应的JVM实例中,则Tuple只发送给这些Task

另外,Storm还提供了用户自定义Streaming Grouping接口,如果上述Streaming Grouping都无法满足实际业务需求,也可以自己实现,只需要实现backtype.storm.grouping.CustomStreamGrouping接口,该接口定义了如下方法:

List<Integer> chooseTasks(int taskId, List<Object> values)

上面几种Streaming Group的内置实现中,最常用的应该是Shuffle Grouping、Fields Grouping、Direct Grouping这三种,使用其它的也能满足特定的应用需求。

Acker原理

首先,我们理解一下Tuple Tree的概念,要计算英文句子中每个字母出现的次数,形成的Tuple Tree如下图所示:
storm-tuple-tree
对上述这个例子,也就是说,运行时每一个英文句子都会对应一个Tuple Tree,一个Tuple Tree可能很大,也可能很小,与具体的业务需求有关。
另外,Acker也是一个Bolt组件,只不过我们实现处理自己业务逻辑时,不需要关心Acker Bolt的实现,在提交实现的Topology到Storm集群后,会在初始化Topology时系统自动为我们的Topology增加Acker这个Bolt组件,它的主要功能是负责跟踪我们自己实现的Topology中各个Spout/Bolt所处理的Tuple之间的关系(或者可以说是,跟踪Tuple Tree的处理进度)。
下面,我们描述一下Acker的机制,如下所示:

  1. Spout的一个Task创建一个Tuple时,即在Spout的nextTuple()方法中实现从特定数据源读取数据的处理逻辑中,会与Acker进行通信,向Acker发送消息,Acker保存该Tuple对应信息:{:spout-task task-id :val ack-val)}
  2. Bolt在emit一个新的子Tuple时,会保存子Tuple与父Tuple的关系
  3. 在Bolt中进行ack时,会计算出父Tuple与由该父Tuple新生成的所有子Tuple的一个异或值,将该值发送给Acker(计算异或值:tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 … ^ child-tuple-idN))。可见,这里Bolt并没有把所有生成的子Tuple发送给Acker,这要比发送一个异或值大得多了,只发送一个异或值大大降低了Bolt与Acker之间网络通信的开销
  4. Acker收到Bolt发送的异或值,与当前保存的task-id对应的初始ack-val做异或,tuple-id与ack-val相同,异或结果为0,但是子Tuple的child-tuple-id等并不互相相同,只有等所有的子Tuple的child-tuple-id都执行ack回来,最后ack-val就为0,表示整个Tuple树处理成功。无论成功与失败,最后都要从Acker维护的队列中移除。
  5. 最后,Acker会向产生该原始父Tuple的Spout对应的Task发送通知,成功或者失败,回调Spout的ack或fail方法。如果我们在实现Spout时,重写了ack和fail方法,处理回调就会执行这里的逻辑。

Storm设计:组件抽象

我们编写的处理业务逻辑的Topology提交到Storm集群后,就会发生任务的调度和资源的分配,从而也会基于Storm的设计,出现各种各样的组件。我们先看一下,Topology提交到Storm集群后的运行时部署分布图,如下图所示:
storm-topology-assignment
通过上图我们可以看出,一个Topology的Spout/Bolt对应的多个Task可能分布在多个Supervisor的多个Worker内部。而每个Worker内部又存在多个Executor,根据实际对Topology的配置在运行时进行计算并分配。
从运行Topology的Supervisor节点,到最终的Task运行时对象,我们大概需要了解Storm抽象出来的一些概念,由于相对容易,我简单说明一下:

  • Topology:Storm对一个分布式计算应用程序的抽象,目的是通过一个实现Topology能够完整地完成一件事情(从业务角度来看)。一个Topology是由一组静态程序组件(Spout/Bolt)、组件关系Streaming Groups这两部分组成。
  • Spout:描述了数据是如何从外部系统(或者组件内部直接产生)进入到Storm集群,并由该Spout所属的Topology来处理,通常是从一个数据源读取数据,也可以做一些简单的处理(为了不影响数据连续地、实时地、快速地进入到系统,通常不建议把复杂处理逻辑放在这里去做)。
  • Bolt:描述了与业务相关的处理逻辑。

上面都是一些表达静态事物(组件)的概念,我们编写完成一个Topology之后,上面的组件都以静态的方式存在。下面,我们看一下提交Topology运行以后,会产生那些动态的组件(概念):

  • Task:Spout/Bolt在运行时所表现出来的实体,都称为Task,一个Spout/Bolt在运行时可能对应一个或多个Spout Task/Bolt Task,与实际在编写Topology时进行配置有关。
  • Worker:运行时Task所在的一级容器,Executor运行于Worker中,一个Worker对应于Supervisor上创建的一个JVM实例
  • Executor:运行时Task所在的直接容器,在Executor中执行Task的处理逻辑;一个或多个Executor实例可以运行在同一个Worker进程中,一个或多个Task可以运行于同一个Executor中;在Worker进程并行的基础上,Executor可以并行,进而Task也能够基于Executor实现并行计算

Topology并行度计算

有关Topology的并行度的计算,官网上有一篇文章介绍(后面参考链接中已附上),我们这里详细解释一下,对于理解Storm UI上面的一些统计数据也会有很大帮助。在编写代码设置并行度的时候,并行度只是一个提示信息,Storm会根据这个提示信息并结合其他一些参数配置(Task个数、Worker个数),去计算运行时的并行度,这个并行度实际上描述的是,组成一个Topology的多个Spout/Bolt的运行时表现实体Task的分布,所以我们可能会想关注从一个Topology的角度去看,这些设置了并行度的Spout/Bolt对应的运行时Task,在集群的多个Worker进程之间,以及Executor内部是如何分布的。
下面是例子给出的Topology的设计,如下图所示:
storm-example-topology
对该例子Topology配置了2个Worker,对应的代码示例如下所示:

conf.setNumWorkers(2); // 该Topology运行在Supervisor节点的2个Worker进程中

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 设置并行度为2,则Task个数为2*1

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout"); // 设置并行度为2,设置Task个数为4 ,则Task个数为4

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt"); // 设置并行度为6,则Task个数为6*1

那么,下面我们看Storm是如何计算一个Topology运行时的并行度,并分配到2个Worker中的:

  • 计算Task总数:2*1+4+6*1=12(总计创建12个Task实例)
  • 计算运行时Topology并行度:10/2=5(每个Worker对应5个Executor)
  • 将12个Task分配到2个Worker中的5*2个Executor中:应该是每个Worker上5个Executor,将6个Task分配到5个Executor中
  • 每个Worker中分配6个Task,应该是分配3个Yellow Task、2个Green Task、1个Blue Task
  • Storm内部优化:会把同类型的Task尽量放到同一个Executor中运行
  • 分配过程:从Task个数最少的开始,1个Blue Task只能放到一个Executor,总计1个Executor被占用;2个Green Task可以放到同一个Executor中,总计2个Executor被占用;最后看剩下的3个Yellow Task能否分配到5-2=3个Executor中,显然每个Yellow Task对应一个Executor

从直观上看,其实分配Task到多个Executor中的分配结果有很多种,都能满足尽量让同类型Task在相同的Executor中,有关Storm的计算实现可以参考源码。
上述例子Topology在运行时,多个Task分配到集群中运行分布的结果,如下图所示:
storm-example-topology-task-assignment-result

Storm内部原理

一个Topology提交到Storm集群上运行,具体的处理流程非常微妙,有点复杂。首先,我们通过要点的方式来概要地说明:

  • 每个Executor可能存在一个Incoming Queue和一个Outgoing Queue,这两个队列都是使用的LMAX Disruptor Queue(可以通过相关资料来了解)
  • 两个LMAX Disruptor Queue的上游和下游,都会有相关线程去存储/取出Tuple
  • 每个Executor可能存在一个Send Thread,用来将处理完成生成的新的Tuple放到属于该Executor的Outgoing Queue队列
  • 每个Executor一定存在一个Main Thread,用来处理衔接Spout Task/Bolt Task与前后的Incoming Queue、Outgoing Queue
  • 每个Worker进程内部可能存在一个Receive Thread,用来接收由上游Worker中的Transfer Thread发送过来的Tuple,在一个Worker内部Receive Thread是被多个Executor共享的
  • 每个Worker进程内部可能存在一个Outgoing Queue,用来存放需要跨Worker传输的Tuple(其内部的Transfer Thread会从该队列中读取Tuple进行传输)
  • 每个Worker进程内部可能存在一个Transfer Thread,用来将需要在Worker之间传输的Tuple发送到下游的Worker内

上面,很多地方我使用了“可能”,实际上大部分情况下是这样的,注意了解即可。下面,我们根据Spout Task/Bolt Task运行时分布的不同情况,分别阐述如下:

Spout Task在Executor内部运行

Spout Task和Bolt Task运行时在Executor中运行有一点不同,如果Spout Task所在的同一个Executor中没有Bolt Task,则该Executor中只有一个Outgoing Queue用来存放将要传输到Bolt Task的队列,因为Spout Task需要从一个给定的数据源连续不断地读入数据而形成流。在一个Executor中,Spout Task及其相关组件的执行流程,如下图所示:
storm-spout-task-in-executor
上图所描述的数据流处理流程,如下所示:

  1. Spout Task从外部数据源读取消息或事件,在实现的Topology中的Spout中调用nextTuple()方法,并以Tuple对象的格式emit()读取到的数据
  2. Main Thread处理输入的Tuple,然后放入到属于该Executor的Outgoing Queue中
  3. 属于该Executor的Send Thread从Outgoing Queue中读取Tuple,并传输到下游的一个或多个Bolt Task去处理

Bolt Task在Executor内部运行

前面说过,Bolt Task运行时在Executor中与Spout Task有一点不同,一个Bolt Task所在的Executor中有Incoming Queue和Outgoing Queue这两个队列,Incoming Queue用来存放数据流处理方向上,该Bolt Task上游的组件(可能是一个或多个Spout Task/Bolt Task)发射过来的Tuple数据,Outgoing Queue用来存放将要传输到下游Bolt Task的队列。如果该Bolt Task是数据流处理方向上最后一个组件,而且对应execute()方法没有再进行emit()创建的Tupe数据,那么该Bolt Task就没有Outgoing Queue这个队列。在一个Executor中,一个Bolt Task用来衔接上游(Spout Task/Bolt Task)和下游(Bolt Task)的组件,在该Bolt Task所在的Executor内其相关组件的执行流程,如下图所示:
storm-bolt-task-in-executor
上图所描述的数据流处理流程,如下所示:

  1. Spout Task/Bolt Task将Tupe传输到下游该Bolt Task所在的Executor的Incoming Queue中
  2. Main Thread从该Executor的Incoming Queue中取出Tuple,并将Tupe发送给Bolt Task去处理
  3. Bolt Task执行execute()方法中的逻辑处理该Tuple数据,并生成新的Tuple,然后调用emit()方法将Tuple发送给下一个Bolt Task处理(这里,实际上是Main Thread将新生成的Tuple放入到该Executor的Outgoing Queue中)
  4. 属于该Executor的Send Thread从Outgoing Queue中读取Tuple,并传输到下游的一个或多个Bolt Task去处理

同一Worker内2个Spout Task/Bolt Task之间传输tuple

在同一个Worker JVM实例内部,可能创建多个Executor实例,那么我们了解一下,一个Tuple是如何在两个Task之间传输的,可能存在4种情况,在同一个Executor中的情况有如下2种:

  • 1个Spout Task和1个Bolt Task在同一个Executor中
  • 2个Bolt Task在同一个Executor中

我们后面会对类似这种情况详细说明,下面给出的是,2个不同的Executor中Task运行的情况,分别如下图所示:

  • 1个Spout Task和1个Bolt Task在不同的2个Executor中

storm-transfer-tuples-between-spout-and-bolt-task-in-same-worker-different-executor

  • 2个Bolt Task在不同的2个Executor中

storm-transfer-tuples-between-2-bolt-tasks-in-same-worker-different-executor
通过前面了解一个Spout Task和一个Bolt Task运行的过程,对上面两种情况便很好理解,不再累述。

不同Worker内2个Executor之间传输tuple

如果是在不同的Worker进程内,也就是在两个隔离的JVM实例上,无论是否在同一个Supervisor节点上,Tuple的传输的逻辑是统一的。这里,以一个Spout Task和一个Bolt Task分别运行在两个Worker进程内部为例,如下图所示:
storm-transfer-tuples-between-spout-and-bolt-task-in-different-worker
处理流程和前面的类似,只不过如果两个Worker进程分别在两个Supervisor节点上,这里Transfer Thread传输Tuple走的是网络,而不是本地。

Tuple在Task之间路由过程

下面,我们关心每一个Tuple是如何在各个Bolt的各个Task之间传输,如何将一个Tuple路由(Routing)到下游Bolt的多个Task呢?
这里,我们应该了解一下,作为在Task之间传输的消息的表示形式,定义TaskMessage类的代码,如下所示:

package backtype.storm.messaging;

import java.nio.ByteBuffer;

public class TaskMessage {
    private int _task;
    private byte[] _message;

    public TaskMessage(int task, byte[] message) {
        _task = task;
        _message = message;
    }

    public int task() {
        return _task;
    }

    public byte[] message() {
        return _message;
    }

    public ByteBuffer serialize() {
        ByteBuffer bb = ByteBuffer.allocate(_message.length + 2);
        bb.putShort((short) _task);
        bb.put(_message);
        return bb;
    }

    public void deserialize(ByteBuffer packet) {
        if (packet == null)
            return;
        _task = packet.getShort();
        _message = new byte[packet.limit() - 2];
        packet.get(_message);
    }
}

可见,每一个Task都给定一个Topology内部唯一的编号,就能够将任意一个Tuple正确地路由到下游需要处理该Tuple的Bolt Task。
假设,存在一个Topology,包含3个Bolt,分别为Bolt1、Bolt2、Bolt3,他们之间的关系也是按照编号的顺序设置的,其中Bolt1有个2个Task,Bolt2有2个Task,Bolt3有2个Task,这里我们只关心Bolt1 Task到Bolt2 Task之间的数据流。具体的路由过程,如下图所示:
storm-routing-tuples
上图中,Bolt2的两个Task分布到两个Worker进程之内,所以,当上游的Bolt1的2个Task处理完输入的Tuple并生成新的Tuple后,会有根据Bolt2的Task的编号,分别进行如下处理:

  • Bolt2 Task4分布在第一个Worker进程内,则Bolt1生成的新的Tupe直接由该Executor的Send Thread,放到第一个Worker内部的另一个Executor的Incoming Queue
  • Bolt2 Task5分布在第二个Worker进程内,则Bolt1生成的新的Tupe被Executor的Send Thread放到该Executor所在的第一个Worker的Outgoing Queue中,由第一个Worker的Transfer Thread发送到另一个Worker内(最终路由给Bolt2 Task5去处理)

通过上面处理流程可以看出,每个Executor应该维护Task与所在的Executor之间的关系,这样才能正确地将Tuple传输到目的Bolt Task进行处理。

参考资料

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(10): “Apache Storm内部原理分析

  1. 博主写的很好。关于博主说的Acker机制的第二点中的对当前ack的tuple与其所有子tuple先做一个异或运算,再将异或值发给Acker进行异或操作的优势,我想补充一下,不知道我的理解对不对。
    当前storm在ack一个tuple的时候,是通过将当前tuple与其所有子tuple进行异或运算产生的结果值发送给Acker,然后将该异或值与Acker中已存在的值进行异或,接着更新该Acker中保存的值,直到tuple tree中的所有tuple都被ack了,此时Acker中的对该tuple tree保留下来的异或值一定是0,如果不是,那么可以确定当前spout产生的tuple没有被full processed,需要考虑重发。
    如果storm在ack一个tuple的时候,只是将需要ack的tuple的ID直接与Acker中的值进行异或,那么在每次emit一个新的子Tuple的时候都需要与Acker中的值进行异或操作,同时在每次对tuple进行ack操作的时候,也需要将该tuple对应的ID值与Acker中的值进行异或。只有这样才能保证tuple tree是否被full processed.
    所以,可以看出,方法一比方法二少了一步操作,大大节省了网络开销。因此storm采用了这种巧妙的方法。我想博主要表达的也是这个意思吧?

  2. 写的不错,受益很多。有个问题问下,

    1 个 Spout Task 和 1 个 Bolt Task 在同一个 Executor 中
    2 个 Bolt Task 在同一个 Executor 中

    一个executor只能运行一个component的一个或多个task吧?

    “An executor is a thread that is spawned by a worker process. It may run one or more tasks for the same component (spout or bolt).”
    http://storm.apache.org/releases/0.9.6/Understanding-the-parallelism-of-a-Storm-topology.html

  3. 博主,你好,请教您一个问题,我在对storm进行压测的时候发现,当message大小为5kb时,会出现spout序列化OOM的问题,。测试环境是storm 1.1.1,同一节点,不同worker,spout和bolt的并行度为1.当spout和bolt在同一worker中时,则没有这个问题,另外当调小message大小或者topology.executor.send.buffer.size(值为8)时,OOM的问题也会解决,但是如果将topology.executor.send.buffer.size调下,storm的性能下降的很厉害(相当于是限流了吧)。dump spout heap,发现未发送的消息有差不多33万,占了1.6G的内存,worker内存大小是2G。现在的问题是不太清楚问题的根本原因,以及解决方法。我的猜测是因为5kb的数据是string,导致kryo序列化变慢,导致消息在spout端堆积,导致OOM?希望博主能给一点建议。以下是OOM信息:
    java.lang.OutOfMemoryError: GC overhead limit exceeded at com.esotericsoftware.kryo.io.Output.toBytes(Output.java:129) at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:45) at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__5183.invoke(worker.clj:193) at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4870.invoke(executor.clj:309) at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) at org.apache.storm.disruptor$consume_loop_STAR_$fn__4482.invoke(disruptor.clj:83) at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748)

    最后感谢博主花这么多时间看完我的问题,谢谢!

    • 1、Spout和Bolt在不在同一个Worker中,关系不大,你要给每个Worker的内存足够。
      2、看问题的现象是,Spout发的太快,以至于后面的Bolt处理不过来,可以尝试调大Bolt的并行度,调小Spout的并行度。
      3、设置下topology.max.spout.pending的值,默认是1,可以调成比如200,根据实际测试情况观察,调大或调小。
      4、将Acker数量调大点:topology.acker.executors,默认为1,可以调成2或3试试,不建议调的太大。
      5、其它一些参数,可以参考这里:https://github.com/apache/storm/blob/master/conf/defaults.yaml

      • 嗯,感谢博主,问题的症结确实是spout发的太快,导致netty这边出了问题。在dump堆信息后,发现OOM的原因是消息大部分堆积在了netty端,我觉得可能是瞬时spout速度太快,netty来不及发,结果netty中存了很多TaskMessage,放在了一个叫做writeRequestQueue的队列中。我给worker分配的是2G内存,在OOM的时候发现netty的这个writeRequestQueue占用了1.9个多G。另外我尝试设置了MaxSpoutPending和增加Bolt并行度,发现没有起什么作用,因此应该不是Bolt消费太慢导致。Worker之前有设置内存为4G,但是问题还是一样。随后我对spout发射消息做了均匀化处理,发现比没有均匀化时性能提升了2倍之多。所以我想是不是Storm这边的netty使用不当?但是道理上来说,设置了MAX.SPOUT.PENDING,spout应该会停止发射,那么应该不会对netty这边造成冲击,但是按照现在的情况来看,什么原因导致netty会堆积大量的内存呢?

Yanjun进行回复 取消回复

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>