Spark Streaming简介及运用(含案例)

目录

1、简介

Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。

常用数据源:Kafka、Flume、HDFS、TCP socket、File等
处理后的数据传输到:HDFS、Databases、实时报表(仪表盘dashboards)

SparkStreaming提供了许多高级API算子,可以实现底层用复杂的算法处理数据。
Spark Streaming简介及运用(含案例)
SparkStreaming 接受实时输入的数据流然后将数据切分成不同的批次,然后经过Spark引擎将结果数据以batch的形式生成最后的流,实际上SparkStreaming是微批处理。示意图如下:

Spark Streaming简介及运用(含案例)

2、应用示例

pom依赖

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.11</artifactId>
   <version>2.0.0</version>
</dependency>

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>2.0.0</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.11</artifactId>
   <version>2.4.5</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.5</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.5</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.5</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.6.6</version>
</dependency>

案例一:采集端口数据实现wordcount(Scala版本)

采集7777端口的数据,利用Spark Streaming实现wordcount,结果输出到控制台。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo1 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf,配置master为local
    val conf:SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstream")

	// 实例化StreamingContext
    // 设置采集周期,指定的3秒为每次采集的时间间隔
    val streamingContext = new StreamingContext(conf,Seconds(3))

    //指定采集的方法
    var socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.136.20", 7777)

    val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\\s+"))
    val mapStream: DStream[(String, Int)] = wordStream.map(x=>(x,1))
    val wordcountStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    //打印
    wordcountStream.print()

    //启动收集器
    streamingContext.start()
    //等待处理停止
    streamingContext.awaitTermination()
  }
}

启动端口输入测试数据:

nc -lk 7777

案例二:采集端口数据实现wordcount(Java版本)

采集7777端口的数据,利用Spark Streaming实现wordcount,结果输出到控制台。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class SparkStreamJavaDemo1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkstream");

        //采集周期,指定的3秒为每次采集的时间间隔
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(3));
        //指定采集的方法
        JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.136.20", 7777);
        //逻辑实现
        JavaDStream<String> fm = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] split = s.split("\\s+");
                return Arrays.asList(split).iterator();
            }
        });
        JavaPairDStream<String, Integer> pair = fm.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> reduceByKey = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //停止
        reduceByKey.print();

        //启动收集器
        jsc.start();

        //等待处理停止
        try {
            jsc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

案例三:采集目录下的文件数据实现wordcount

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamFileDataSourceDemo2 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("fileDataSource")
    val streamingContext = new StreamingContext(conf,Seconds(5))

    //采集器
    val fileDStream: DStream[String] = streamingContext.textFileStream("D:\\sparkStreaming")

    val wordStream: DStream[String] = fileDStream.flatMap(line=>line.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()  
  }
}

案例四:采集Kafka数据实现wordcount

从Kafka读数据,Spark Streaming读出来。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamKafkaSource {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("kafkademo").setMaster("local[*]")

    val streamingContext = new StreamingContext(conf,Seconds(5))   //批处理时间设置为5秒
    //需要设置检查点
    streamingContext.checkpoint("in/checkpoint")  //检查点存放目录
    
    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup1")
    )

    //采集器
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams)  //SparkKafkaDemo是Kafka的topic
    )
    val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map((_,1))
    //无状态
    //val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
    //有状态
    val sumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
      case (seq, buffer) => {
        println(seq,seq.sum,buffer.getOrElse(0))   //(CompactBuffer(1),1,5) 5代表已经有的计数,1代表新传了一个,最终会输出6
        //seq指序列,buffer指新传入的数据
        val sum = buffer.getOrElse(0) + seq.sum
        Option(sum)
      }
    }
    sumStream.print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

创建生产信息进行测试

kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092

案例五:自定义采集器

需要继承Receiver

import java.io.{BufferedReader, InputStreamReader}

import org.apache.spark.{SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
//自定义采集器
class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  var socket:java.net.Socket = null

  def receive():Unit={
    socket = new java.net.Socket(host,port)
    val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))

    var line:String=null
    while ((line=reader.readLine())!=null){
    //采集器简单的逻辑。如果消息是end,去除,否则保留
      if(line.equals("end")){
        return
      }else{
        this.store(line)
      }
    }
  }

  override def onStart(): Unit = {
    new Thread(new Runnable {
      override def run(): Unit = {
        receive()
      }
    }).start()

  }

  override def onStop(): Unit = {
    if (socket!=null){
      socket.close()
      socket=null
    }
  }
}

object MyReceiverDemo{
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new  SparkConf().setAppName("myreceiverdemo").setMaster("local[*]")
    val streamingContext: StreamingContext = new StreamingContext(conf,Seconds(5))

    //调用自定义采集器
    val receiverStream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.136.20",7777))

    val lineStream: DStream[String] = receiverStream.flatMap(line=>line.split("\\s+"))

    val wordStream: DStream[(String, Int)] = lineStream.map((_,1))

    val sumStream: DStream[(String, Int)] = wordStream.reduceByKey(_+_)

    sumStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
上一篇:浪院长 | spark streaming的使用心得


下一篇:大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器