Spark Streaming计算wordCount

1、计算原理

Spark Streaming计算wordCount
Spark Streaming计算wordCount

  • Sparkstreaming处理数据可以分为实时流或者
  • Sparkstreaming从flume或者kafka中拉取数据,而Sparkstreaming中会创建多个窗口,以RDD的形式存放这些数据,然后开始处理这些数据
  • Sparkstreaming含有一个特有的算子updateStateByKey,就是在state中累计之前窗口中的数据。
  • 如上图所示,窗口1先进行数据的统计,然后将数据放入到state中,然后,进行窗口2的数据统计,然后将state中的数据进行累加统计,依次类推,一直将所有的窗口内的数据统计完成,最终state中的数据即为统计结果。

2、代码实现

package com.njbdqn

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object MySs {
  //自定义函数:将各个窗口中的数据根据分组的key值累加
  //String:key值
  //Seq[Int]:窗口中的RDD数据
  //Option[Int]:state中的累加数据
  val addFunc = (it:Iterator[(String,Seq[Int],Option[Int])])=>{
    it.map(x=>{
      (x._1,x._2.sum+x._3.getOrElse(0))
    })
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("myss").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //设置读取数据时间为5秒,5秒创建一个窗口,统计时也是一个窗口一个窗口开始统计
    val ssc = new StreamingContext(sc,Seconds(5))
    //拉取socket信息,
    //在Linux下面安装netcat工具,进行网络数据传输
    //当我们在linux中传输数据后,这边SparkStreaming接收到数据后开始计数
    //192.168.153.200:虚拟机IP
    //1234:端口号
    val ds = ssc.socketTextStream("192.168.153.200", 1234)

    //统计所有窗口的数据
    //新建一个检查点路径,来统计各个state的数据统计值
    sc.setCheckpointDir("E:\\BigDataStudy\\SparkStreaming\\cks")
    val res = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(addFunc,
      new HashPartitioner(sc.defaultMinPartitions), true)

    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 运行该代码后,我们发现Sparkstreaming开始i进行数据处理,每5秒钟为一个窗口开始计算,由于还没有数据,所有没有计算结果
    Spark Streaming计算wordCount

3、安装NetCat,进行数据传输

  • 在Linux环境下安装netcat
yum install nmap-ncat.x86_64
  • 启动netcat,设置端口号为1234,要与代码中的端口号一致
nc -l 1234
  • 开始传输数据,此时我们发现,只要我们传输数据,Sparkstreaming就会帮我们累加每个窗口的数据的个数,最后得到总数据。

Spark Streaming计算wordCount

Spark Streaming计算wordCount

  • 注意:这边我们设置数据读取统计时间为5秒,也就是说,我们在每个5秒内传输的数据会被放到一个窗口中进行统计。然后将这些窗口进行累加计算,获得每个单词的数量。
上一篇:大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器


下一篇:基于Spark Streaming对新闻网站项目案例分析