Spark系列: 流计算Spark Streaming
shilinlee opened this issue · 0 comments
流计算概念
- 流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息。
- 流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。
- 对于一个流计算系统来说,它应达到如下需求:
- 高性能:处理大数据的基本要求,如每秒处理几十万条数据。
- 海量式:支持TB级甚至是PB级的数据规模。
- 实时性:必须保证一个较低的延迟时间,达到秒级别,甚至是毫秒级别。
- 分布式:支持大数据的基本架构,必须能够平滑扩展。
- 易用性:能够快速进行开发和部署。
- 可靠性:能可靠地处理流数据。
- 流计算处理过程包括数据实时采集、数据实时计算和实时查询服务。
Spark Streaming简介
Spark Streaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。Spark Streaming可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。
Spark Streaming设计
Spark Streaming是Spark的核心组件之一,为Spark提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。
Spark Streaming的基本原理是将实时输入数据流以**时间片(秒级)**为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示。
Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按照时间片(如1秒)分成一段一段的DStream,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终转变为对相应的RDD的操作。例如,下图展示了进行单词统计时,每个时间片的数据(存储句子的RDD)经flatMap操作,生成了存储单词的RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。
DStream操作
概述
Spark Streaming工作原理
在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行task。
在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上。每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)。Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。
Spark Streaming程序基本步骤
- 通过创建输入DStream来定义输入源
- 通过对DStream应用转换操作和输出操作来定义流计算。
- 用streamingContext.start()来开始接收数据和处理流程。
- 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
- 可以通过streamingContext.stop()来手动结束流计算进程。
创建StreamingContext对象
如果要运行一个Spark Streaming程序,就需要首先生成一个StreamingContext对象,它是Spark Streaming程序的主入口。因此,在定义输入之前,我们首先介绍如何创建StreamingContext对象。我们可以从一个SparkConf对象创建一个StreamingContext对象。
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> sc = SparkContext.getOrCreate()
>>> ssc = StreamingContext(sc, 1) // 1表示每隔1秒钟就自动执行一次流计算
输入源类型
基本输入源
示例程序:文件流(DStream)
示例程序:套接字流(DStream)
示例程序:RDD队列流(DStream)
高级数据源
示例程序:Kafka
示例程序:Flume
转换操作
DStream转换操作包括无状态转换和有状态转换。
- 无状态转换:每个批次的处理不依赖于之前批次的数据。
- 有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。
DStream无状态转换操作
- map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
- flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
- filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
- repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
- union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
- count():统计源DStream中每个RDD的元素数量;
- reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
- countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
- reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
- join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
- cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
- transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。
DStream有状态转换操作
对于DStream有状态转换操作而言,当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化(updateStateByKey)的转换。
滑动窗口转换操作
滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。
下面给给出一些窗口转换操作的含义:
- window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的DStream;
- countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
- reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
- reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
- reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
- countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。
updateStateByKey操作
当我们需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。
下面我们就给出一个具体实例。以“套接字流”为例子来介绍,我们统计单词词频采用的是无状态转换操作,也就是说,每个批次的单词发送给NetworkWordCount程序处理时,NetworkWordCount只对本批次内的单词进行词频统计,不会考虑之前到达的批次的单词,所以,不同批次的单词词频都是独立统计的。
对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
while True:
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("127.0.0.1", 4444))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
$ nc -lk 4444
hadoop
spark
hadoop
spark
hadoop
spark