Spark Streaming

内容预览:
  • 3、sparkStreaming特性 1、易用性 可以像编写离线批处理一样去编写流式...~
  • 5、Dstream Discretized Stream是Spark Streaming的基础抽象,代表持续...~
  • 7、DStream操作实战 需要引入对应的jar包       <dep...~

Spark Streaming

1、课程目标

  • 1、掌握sparkStreaming底层原理和架构

  • 2、掌握DStream的原理

  • 3、掌握sparkStreaming与flume整合

  • 4、掌握sparkStreaming与kafka整合

2、sparkStreaming

  • 它是一个可扩展,高吞吐具有容错性的流式计算。

3、sparkStreaming特性

  • 1、易用性

    • 可以像编写离线批处理一样去编写流式程序

    • 可以使用java/python/R

  • 2、容错性

    • 保证数据恰好只被处理一次

  • 3、融合spark体系

4、sparkStreaming原理

  • Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。

5、Dstream

  • Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流.

6、Dstream操作

  • transformation

    • 它是一个转换,与RDD中的transformation算子操作类似,它会生成一个新的Dstream

  • outputOperations

    • 类似于rdd中的action操作,触发任务的运行,得到结果数据。

7、DStream操作实战

需要引入对应的jar包

      

 <dependency>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>

  • 1.SparkStreaming接受socket数据,实现单词计数WordCount

package cn.itcast.stream


import org.apache.spark.
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.

//todo:利用sparkStreaming接受socket数据,实现单词计数
object SparkStreamingSocket {
def main(args: Array[String]): Unit
= {
//1、创建sparkConf 设置master的地址local[N] ,n必须大于1,其中1个线程负责去接受数据,另一线程负责处理接受到的数据
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[2]")
// 2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建streamingContext,需要sparkContext和以多久时间间隔为一个批次
val ssc = new StreamingContext(sc,Seconds(5))
//4、通过streaming接受socket数据
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999)
//5、切分每一行
val words: DStream[String] = stream.flatMap(_.split(" "))
//6、每个单词记为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)

//8、打印
result.print()

//9、开启流式计算
ssc.start()
//一直会阻塞,等待退出
ssc.awaitTermination()
}
}

  • 2.SparkStreaming接受socket数据,实现单词计数累加 

package cn.itcast.stream


import org.apache.spark.
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.

//todo:利用sparkStreaming接受socket数据,实现所有批次单词计数结果累加
object SparkStreamingSocketTotal {

//定义一个方法
//currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1)
//historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {
val newValue: Int
= currentValues.sum+historyValues.getOrElse(0)
Some(newValue)
}

def main(args: Array[String]): Unit
= {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketTotal").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建streamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//设置checkpoint目录
ssc.checkpoint("./ck")
//4、接受socket数据
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999)
//5、切分每一行
val words: DStream[String] = stream.flatMap(_.split(" "))
//6、把每一个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)

//8、打印结果数据
result.print()

//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}

  • 3.SparkStreaming开窗函数reduceByKeyAndWindow,实现单词计数

package cn.itcast.stream


import org.apache.spark.
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.

//todo:利用sparkStreaming开窗函数 reducebyKeyAndWindow实现单词计数
object SparkStreamingSocketWindow {

//定义一个方法
//currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1)
//historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {
val newValue: Int
= currentValues.sum+historyValues.getOrElse(0)
Some(newValue)
}

def main(args: Array[String]): Unit
= {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketWindow").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建streamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//设置checkpoint目录
ssc.checkpoint("./ck")
//4、接受socket数据
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999)
//5、切分每一行
val words: DStream[String] = stream.flatMap(_.split(" "))
//6、把每一个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、相同单词出现的次数累加
//reduceByKeyAndWindow该方法需要三个参数
//reduceFunc:需要一个函数
//windowDuration:表示窗口的长度
//slideDuration:表示窗口滑动时间间隔,即每隔多久计算一次
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(5),Seconds(10))

//8、打印结果数据
result.print()

//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}

* 4.SparkStreaming开窗函数统计一定时间内的热门词汇

package cn.itcast.stream


import org.apache.spark.import org.apache.spark.rdd.RDDimport org.apache.spark.streaming.import org.apache.spark.streaming.dstream.

//todo:利用sparkStreaming开窗函数统计单位时间内热门词汇----出现频率比较高的词汇object SparkStreamingSocketWindowHotWords { //定义一个方法 //currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1) //historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100) def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {


val newValue: Int
= currentValues.sum+historyValues.getOrElse(0)
Some(newValue)
}

def main(args: Array[String]): Unit
= {


//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketWindowHotWords").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建streamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//设置checkpoint目录
ssc.checkpoint("./ck")
//4、接受socket数据
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999)
//5、切分每一行
val words: DStream[String] = stream.flatMap(_.split(" "))
//6、把每一个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、相同单词出现的次数累加
//reduceByKeyAndWindow该方法需要三个参数
//reduceFunc:需要一个函数
//windowDuration:表示窗口的长度
//slideDuration:表示窗口滑动时间间隔,即每隔多久计算一次
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(5))

//8、按照单词出现的次数降序排列
val sortedDstream: DStream[(String, Int)] = result.transform(rdd => {
//将rdd中数据按照单词出现的次数降序排列
val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
//取出前3位
val sortHotWords: Array[(String, Int)] = sortedRDD.take(3)
//打印前3位结果数据
sortHotWords.foreach(x => println(x))
sortedRDD
})

//9、打印排序后的结果数据
sortedDstream.print()

//10、开启流式计算
ssc.start()
ssc.awaitTermination()
}}

8、SparkStreaming整合Flume

  • Poll方式

      package cn.itcast.dstream.flume


import java.net.InetSocketAddress

import org.apache.spark.
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.
import org.apache.spark.streaming.flume.

//todo:利用sparkStreaming对接flume数据,实现单词计算------Poll拉模式
object SparkStreamingFlume_Poll {
def main(args: Array[String]): Unit
= {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingFlume_Poll").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//定义一个flume地址集合,可以同时接受多个flume的数据
val address=Seq(new InetSocketAddress("192.168.200.100",9999),new InetSocketAddress("192.168.200.101",9999))

//4、获取flume中数据
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK_SER_2)
//5、从Dstream中获取flume中的数据 {"header":xxxxx "body":xxxxxx}
val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
//6、切分每一行,每个单词计为1
val wordAndOne: DStream[(String, Int)] = lineDstream.flatMap(_.split(" ")).map((_,1))
//7、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//8、打印输出
result.print()

//开启计算
ssc.start()
ssc.awaitTermination()

}
}

  • push方式

package cn.itcast.dstream.flume


import org.apache.spark.
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.
import org.apache.spark.streaming.flume.

//todo:利用sparkStreaming对接flume数据,实现单词计数------Push推模式
object SparkStreamingFlume_Push {

def main(args: Array[String]): Unit
= {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingFlume_Push").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//4、获取flume中的数据
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"192.168.11.123",9999)
//5、从Dstream中获取flume中的数据 {"header":xxxxx "body":xxxxxx}
val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
//6、切分每一行,每个单词计为1
val wordAndOne: DStream[(String, Int)] = lineDstream.flatMap(_.split(" ")).map((_,1))
//7、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//8、打印输出
result.print()

//开启计算
ssc.start()
ssc.awaitTermination()
}
}

9、Spark Streaming整合kafka实战

  • KafkaUtils.createDstream方式(基于kafka高级Api—–偏移量由zk保存)

package cn.itcast.kafka


import org.apache.spark.
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.immutable

//todo:sparkStreaming整合kafka---基于receiver(高级api)
object SparkStreamingKafkaReceiver {
def main(args: Array[String]): Unit
= {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName(
"SparkStreamingKafkaReceiver")
.setMaster(
"local[4]") //线程数要大于receiver个数
.set("spark.streaming.receiver.writeAheadLog.enable","true")
//表示开启WAL预写日志,保证数据源的可靠性
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建streamingContext
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint(
"./spark_receiver")
//4、准备zk地址
val zkQuorum="node1:2181,node2:2181,node3:2181"
//5、准备groupId
val groupId="spark_receiver"
//6、定义topic 当前这个value并不是topic对应的分区数,而是针对于每一个分区使用多少个线程去消费
val topics=Map("spark_01" ->2)
//7、KafkaUtils.createStream 去接受kafka中topic数据
//(String, String) 前面一个string表示topic名称,后面一个string表示topic中的数据
//使用多个reveiver接受器去接受kafka中topic数据
val dstreamSeq: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)]
= KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
}
)

//利用streamingcontext调用union,获取得到所有receiver接受器的数据
val totalDstream: DStream[(String, String)] = ssc.union(dstreamSeq)

//8、获取kafka中topic的数据
val topicData: DStream[String] = totalDstream.map(_._2)
//9、切分每一行
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
//10、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//11、打印结果数据
result.print()


//12、开启流式计算
ssc.start()
ssc.awaitTermination()

}
}

  • KafkaUtils.createDirectStream方式(基于kafka低级Api—–偏移量由客户端程序保存)

package cn.itcast.kafka


import kafka.serializer.StringDecoder
import org.apache.spark.
import org.apache.spark.streaming.
import org.apache.spark.streaming.dstream.
import org.apache.spark.streaming.kafka.KafkaUtils

//todo:sparkSteaming整合kafka----采用direct(低级Api)
object SparkStreamingKafkaDirect {
def main(args: Array[String]): Unit
= {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafkaDirect").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel(
"WARN")
//3、创建streamingcontext
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint(
"./spark_direct") //它会保存topic的偏移量
//4、准备kafka参数
val kafkaParams=Map("metadata.broker.list"->"node1:9092,node2:9092,node3:9092","group.id" ->"spark_direct")
//5、准备topic
val topics=Set("spark_01")
//6、获取kafka中的数据
val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//7、获取topic中的数据
val data: DStream[String] = dstream.map(_._2)
//8、切分每一行,每个单词计为1,把相同单词出现的次数累加
val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//9、打印结果数据
result.print()

//10、开启流式计算
ssc.start()
ssc.awaitTermination()

}
}

以上就是:Spark Streaming 的全部内容。

本站部分内容来源于互联网和用户投稿,如有侵权请联系我们删除,谢谢。
Email:[email protected]


0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论