shilinlee/blog

Spark系列: 流计算Spark Streaming

shilinlee opened this issue · 0 comments

流计算概念

  • 流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息。
  • 流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。
  • 对于一个流计算系统来说,它应达到如下需求:
    • 高性能:处理大数据的基本要求,如每秒处理几十万条数据。
    • 海量式:支持TB级甚至是PB级的数据规模。
    • 实时性:必须保证一个较低的延迟时间,达到秒级别,甚至是毫秒级别。
    • 分布式:支持大数据的基本架构,必须能够平滑扩展。
    • 易用性:能够快速进行开发和部署。
    • 可靠性:能可靠地处理流数据。
  • 流计算处理过程包括数据实时采集、数据实时计算和实时查询服务。

image

Spark Streaming简介

Spark Streaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。Spark Streaming可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。

Spark Streaming设计

Spark Streaming是Spark的核心组件之一,为Spark提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。

image

Spark Streaming的基本原理是将实时输入数据流以**时间片(秒级)**为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示。

image

Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按照时间片(如1秒)分成一段一段的DStream,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终转变为对相应的RDD的操作。例如,下图展示了进行单词统计时,每个时间片的数据(存储句子的RDD)经flatMap操作,生成了存储单词的RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。

image

DStream操作

概述

Spark Streaming工作原理

在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行task。

在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上。每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)。Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。

Spark Streaming程序基本步骤

  • 通过创建输入DStream来定义输入源
  • 通过对DStream应用转换操作和输出操作来定义流计算。
  • 用streamingContext.start()来开始接收数据和处理流程。
  • 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
  • 可以通过streamingContext.stop()来手动结束流计算进程。

创建StreamingContext对象

如果要运行一个Spark Streaming程序,就需要首先生成一个StreamingContext对象,它是Spark Streaming程序的主入口。因此,在定义输入之前,我们首先介绍如何创建StreamingContext对象。我们可以从一个SparkConf对象创建一个StreamingContext对象。

>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> sc = SparkContext.getOrCreate()
>>> ssc = StreamingContext(sc, 1)  // 1表示每隔1秒钟就自动执行一次流计算

输入源类型

基本输入源

示例程序:文件流(DStream)

示例程序:套接字流(DStream)

示例程序:RDD队列流(DStream)

高级数据源

示例程序:Kafka

示例程序:Flume

转换操作

DStream转换操作包括无状态转换有状态转换

  • 无状态转换:每个批次的处理不依赖于之前批次的数据。
  • 有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。

DStream无状态转换操作

  • map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
  • flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
  • filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
  • repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
  • union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
  • count():统计源DStream中每个RDD的元素数量;
  • reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
  • countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
  • reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
  • join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
  • cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
  • transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

DStream有状态转换操作

对于DStream有状态转换操作而言,当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换追踪状态变化(updateStateByKey)的转换。

滑动窗口转换操作

滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。

image

下面给给出一些窗口转换操作的含义:

  • window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的DStream;
  • countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
  • reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
  • countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

updateStateByKey操作

当我们需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。

下面我们就给出一个具体实例。以“套接字流”为例子来介绍,我们统计单词词频采用的是无状态转换操作,也就是说,每个批次的单词发送给NetworkWordCount程序处理时,NetworkWordCount只对本批次内的单词进行词频统计,不会考虑之前到达的批次的单词,所以,不同批次的单词词频都是独立统计的。

对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
while True:
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
 
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
 
    lines = ssc.socketTextStream("127.0.0.1", 4444))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
 
    running_counts.pprint()
 
    ssc.start()
    ssc.awaitTermination()
$ nc -lk 4444
hadoop
spark
hadoop
spark
hadoop
spark