正文
架构图如下:
使用方式:
(1) 导入 Kafka 的 Spark Streaming 整合包
(2) 创建 DStream
需要注意的几点:
1) Kafka 的 topic 和 partition 并不和 SS 生成的 RDD 的 partition 相对应,所以上面代码中 topicMap 里增加 threads 只能增加使用一个 receiver 消费这个 topic 的线程数,它并不能增加 Spark 处理数据的并行数,因为每个 input DStream 在一个 worker 机器上只创建一个接受单个数据流的 receiver。
2) 可以为不同 topic 和 group 创建多个 DStream 来使用多个 receiver 并行的接受数据。例如:一个单独的 Kafka input DStream 接受两个 topic 的数据可以分为两个 Kafka input DStream,每个只接受一个 topic 的数据,这样可以并行的接受速度从而提高整体吞吐量。
3) 如果开启了 wal 来保证数据不丢失话,需要设置 checkpoint 目录,并且像上面代码一样指定数据序列化到 hdfs 上的方式(比如:StorageLevel.MEMORY_AND_DISK_SER)
4) 建议每个批处理时间间隔周期接受到的数据最好不要超过接受 Executor 的内存 (Storage) 的一半。
要描述清楚 Receiver-based Approach ,我们需要了解其接收流程,分析其内存使用,以及相关参数配置对内存的影响。
当执行 SS 的 start 方法后,SS 会标记 StreamingContext 为 Active 状态,并且单独起个线程通过 ReceiverTracker 将从 ReceiverInputDStreams 中获取的 receivers 以并行集合的方式分发到 worker 节点,并运行他们。worker 节点会启动 ReceiverSupervisor。接着按如下步骤处理:
1) ReceiverSupervisor 会启动对应的 Receiver(这里是 KafkaReceiver)
2) KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调用 ReceiverSupervisor.pushSingle 方法填充数据,注意,这里是一条一条填充的。
3) ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。
到目前为止,整个过程不会有太多内存消耗,正常的一个线性调用。所有复杂的数据结构都隐含在 BlockGenerator 中。
BlockGenerator 会复杂些,重要的数据存储结构有四个:
1) 维护了一个缓存 currentBuffer ,这是一个变长的数组的 ArrayBuffer。currentBuffer 并不会被复用,而是每个 spark.streaming.blockInterval 都会新建一个空的变长数据替换老的数据作为新的 currentBuffer,然后把老的对象直接封装成 Block 放入到 blocksForPushing 的队列里,BlockGenerator 会负责保证 currentBuffer 只有一个。currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为 spark.streaming.receiver.maxRate,是单个 Receiver 每秒钟允许添加的条数。这个是 Spark 内存控制的第一步,填充 currentBuffer 是阻塞的,消费 Kafka 的线程直接做填充。
2) 维护了一个 blocksForPushing 的阻塞队列,size 默认为 10 个 (1.6.3 版本),可通过 spark.streaming.blockQueueSize 进行配置。该队列主要用来实现生产 - 消费模式,每个元素其实是一个 currentBuffer 形成的 block。
3) blockIntervalTimer 是一个定时器。其实是一个生产者,负责将当前 currentBuffer 的数据放到 blocksForPushing 中,并新建一个 currentBuffer。通过参数 spark.streaming.blockInterval 设置,默认为 200ms。放的方式很简单,直接把 currentBuffer 做为 Block 的数据源。这就是为什么 currentBuffer 不会被复用。
4) blockPushingThread 也是一个定时器,负责将 Block 从 blocksForPushing 取出来,
然后交给 BlockManagerBasedBlockHandler.storeBlock。10 毫秒会取一次,不可配置。到这一步,才真的将数据放到了 Spark 的 BlockManager 中。
下面我们会详细分析每一个存储对象对内存的使用情况:
首先自然要说下 currentBuffer,它缓存的数据会被定时器每隔 spark.streaming.blockInterval(默认 200ms)的时间拿走,这个缓存用的是 Spark 的运行时内存(我们使用的是静态内存管理模式,默认应该是 heap *0.2,如果是统一内存管理模式的话应该是 heap*0.25),而不是 storage 内存。如果 200ms 期间你从 Kafka 接受的数据足够大,则这部分内存很容易 OOM 或者进行大量的 GC,导致 receiver 所在的 Executor 极容易挂掉或者处理速度也很慢。如果你在 SparkUI 发现 Receiver 挂掉了,考虑有没有可能是这个问题。