基于Spark Streaming对新闻网站项目案例分析

目录

一、需求分析

新闻网站需求:

  • pv
  • uv
  • 注册用户数
  • 热门板块

数据处理流程:

数据源  -> kafka  ->  spark streaming

二、数据准备

(1)数据格式

网站日志格式 :

date,timestamp,userid,pageid,section,action

日志字段说明:

date: 日期,yyyy-MM-dd格式 
timestamp: 时间戳 
userid: 用户id 
pageid: 页面id 
section: 版块 
action: 用户行为,两类,点击页面和注册

数据展示:

2020-12-20 1608451521565 364 422 fashion view
2020-12-20 1608451521565 38682 708 aviation view
2020-12-20 1608451521565 65444 270 internet view
2020-12-20 1608451521565 4805 250 tv-show view
2020-12-20 1608451521565 1130 743 movie view
2020-12-20 1608451521565 85320 605 carton view
2020-12-20 1608451521565 null 581 movie view
2020-12-20 1608451521565 null null null register

kafka消费者启动:

bin/kafka-console-consumer.sh --bootstrap-server bigdata-pro-m04:9092 --topic spark

(2)基于Java开发实时数据生成器

这里生成的实时数据流,生成的传递给kafka,每隔1秒随机生成1000条数据。

package com.kfk.spark.news_analysis_project;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * 访问日志Kafka Producer
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/12
 * @time : 7:51 下午
 */
public class AccessProducer extends Thread{

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
    private static String date;

    // 版块内容
    private static String[] sections = new String[] {"country", "international", "sport",
            "entertainment", "movie", "carton",
            "tv-show", "technology", "internet",
            "car", "military", "funny",
            "fashion", "aviation", "government"};

    private static Random random = new Random();

    private static int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    private Producer<String,String> producer;
    private String topic;

    /**
     * 构造函数
     * @param topic
     */
    public AccessProducer(String topic){
        this.topic = topic;
        producer = new KafkaProducer<String, String>(createProducerConfig());
        date = simpleDateFormat.format(new Date());
    }

    /**
     * createProducerConfig
     * @return
     */
    public Properties createProducerConfig(){
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", "bigdata-pro-m04:9092");
        return properties;
    }

    @Override
    public void run(){
        int counter = 0;

        while (true){
            // 生成1000条访问数据
            for (int i = 0;i < 1000;i++){
                String log = null;

                // 生成条访问数据
                if (newOldUserArr[random.nextInt(10)] == 1){
                    log = getAccessLog();
                } else {
                    log = getRegisterLog();
                }

                // 将数据发送给kafka
                producer.send(new ProducerRecord<String, String>(topic,log));

                counter++;
                if (counter == 100){
                    counter = 0;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    /**
     * 生成注册数据
     * @return
     */
    private static String getRegisterLog(){

        StringBuffer stringBuffer = new StringBuffer("");

        // 生成时间戳
        long timestamp = System.currentTimeMillis();

        // 随机生成userid(默认1000注册用户,每天1/10的访客是未注册用户)
        Long userid = 0L;
        int newOldUser = newOldUserArr[random.nextInt(10)];
        if (newOldUser == 1){
            userid = null;
        } else {
            userid = (long)random.nextInt(100000);
        }

        // 随机生成pageid,共1000个页面
        long pageid = random.nextInt(1000);

        // 随机生成板块
        String section = sections[random.nextInt(sections.length)];

        // 生成固定的行为,view
        String action = "view";

        return stringBuffer.append(date).append(" ")
                .append(timestamp).append(" ")
                .append(userid).append(" ")
                .append(pageid).append(" ")
                .append(section).append(" ")
                .append(action).toString();
    }

    /**
     * 生成访问数据
     * @return
     */
    private static String getAccessLog(){

        StringBuffer stringBuffer = new StringBuffer("");

        // 生成时间戳
        long timestamp = System.currentTimeMillis();

        // 新用户都是userid为null
        Long userid = null;

        // 生成随机pageid,都是null
        Long pageid = null;

        // 生成随机版块,都是null
        String section = null;

        // 生成固定的行为,view
        String action = "register";

        return stringBuffer.append(date).append(" ")
                .append(timestamp).append(" ")
                .append(userid).append(" ")
                .append(pageid).append(" ")
                .append(section).append(" ")
                .append(action).toString();
    }

    public static void main(String[] args) {
        AccessProducer accessProducer = new AccessProducer("spark");
        accessProducer.start();
    }
}

三、实施过程

将每一个需求写入到一个方法中,运行流程分析及结果以注释的形式显示。
关于数据模型,可以参考基于Spark SQL对新闻网站项目案例分析这篇文章
https://blog.csdn.net/weixin_45366499/article/details/111119234

package com.kfk.spark.common;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {

    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(5));
    }
}
package com.kfk.spark.news_analysis_project;

import com.kfk.spark.common.CommStreamingContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.*;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/20
 * @time : 4:11 下午
 */
public class NewsRealTime {

    /**
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     * 2020-12-20 1608451521565 38682 708 aviation view
     * ...
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {

        JavaStreamingContext jssc = CommStreamingContext.getJssc();

        // sparkstreaming与kafka连接
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "streaming_kafka_1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        // 设置topic
        Collection<String> topics = Collections.singletonList("spark");

        // kafka数据源
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        /**
         * stream -> map -> JavaDStream
         */
        JavaDStream<String> accessDstream = stream.map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> v1) throws Exception {
                return v1.value();
            }
        });

        /**
         * accessDStream -> filter -> action(view)
         */
        JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {

                String[] lines = v1.split(" ");
                String action = lines[5];
                String actionValue = "view";
                if (actionValue.equals(action)){
                    return true;
                } else {
                    return false;
                }
            }
        });

        // 求网页的pv
        calculatePagePV(filterDstream);

        // 求网页的uv
        calculatePageUV(filterDstream);

        // 求注册用户数
        calculateRegistercount(accessDstream);

        // 求热门板块
        calculateUserSectionPV(accessDstream);

        jssc.start();
        jssc.awaitTermination();

    }

    /**
     * 求网页的pv
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     *
     * 数据演化过程:
     * filterDstream -> mapToPair -> <2020-12-20_422,1> -> reduceByKey -> <2020-12-20_422,5>
     *
     * @param filterDstream
     */
    public static void calculatePagePV(JavaDStream<String> filterDstream){

        /**
         * filterDstream -> mapToPair -> <2020-12-20_422,1>
         */
        JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String,Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<String,Integer>(line[0] + "_" + line[3], 1);
            }
        });

        /**
         * pairDstream -> reduceByKey -> <2020-12-20_422,5>
         */
        JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });

        pvStream.print();

        /**
         * (2020-12-21_16,1)
         * (2020-12-21_548,1)
         * (2020-12-21_881,1)
         * (2020-12-21_27,1)
         * (2020-12-21_771,1)
         * (2020-12-21_344,2)
         * (2020-12-21_313,1)
         * (2020-12-21_89,1)
         * (2020-12-21_14,1)
         * (2020-12-21_366,1)
         * ...
         */
    }

    /**
     * 求网页的uv
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     * 2020-12-20 1608451521565 364 422 fashion view
     * 2020-12-20 1608451521565 365 422 fashion view
     * 2020-12-20 1608451521565 366 422 fashion view
     * 2020-12-20 1608451521565 367 422 fashion view
     * 2020-12-20 1608451521565 367 453 fashion view
     *
     * 数据演化过程:
     * 第一步:map
     * (2020-12-20,364,422)
     * (2020-12-20,364,422)
     * (2020-12-20,365,422)
     * (2020-12-20,366,422)
     * (2020-12-20,367,422)
     * (2020-12-20,367,453)
     *
     * 第二步:rdd -> distinct
     * (2020-12-20,364,422)
     * (2020-12-20,365,422)
     * (2020-12-20,366,422)
     * (2020-12-20,367,422)
     * (2020-12-20,367,453)
     *
     * 第三步:mapToPair
     * <2020-12-20_422,1>
     * <2020-12-20_422,1>
     * <2020-12-20_422,1>
     * <2020-12-20_422,1>
     * <2020-12-20_453,1>
     *
     * 第四步:reduceByKey
     * <2020-12-20_422,4>
     * <2020-12-20_453,1>
     *
     * @param filterDstream
     */
    public static void calculatePageUV(JavaDStream<String> filterDstream){

        /**
         * filterDstream -> map -> (2020-12-20,364,422)
         */
        JavaDStream<String> mapDstream = filterDstream.map(new Function<String, String>() {
            @Override
            public String call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return line[0] + "," + line[2] + "," + line[3];
            }
        });

        /**
         * mapDstream -> distinct
         */
        JavaDStream<String> distinctDstream = mapDstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
            @Override
            public JavaRDD<String> call(JavaRDD<String> lines) throws Exception {
                return lines.distinct();
            }
        });

        /**
         * distinctDstream -> mapToPair -> <2020-12-20_422,1>
         */
        JavaPairDStream<String,Integer> pairDstream = distinctDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String lines) throws Exception {
                String[] line = lines.split(",");
                return new Tuple2<>(line[0] + "_" + line[2], 1);
            }
        });

        /**
         * pairDstream -> reduceByKey -> <2020-12-20_422,4>
         */
        JavaPairDStream<String,Integer> uvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });

        uvStream.print();

        /**
         * (2020-12-21_492,1)
         * (2020-12-21_85,2)
         * (2020-12-21_18,1)
         * (2020-12-21_27,2)
         * (2020-12-21_825,1)
         * (2020-12-21_366,1)
         * (2020-12-21_89,1)
         * (2020-12-21_14,2)
         * (2020-12-21_69,1)
         * (2020-12-21_188,1)
         * ...
         */

    }

    /**
     * 求注册用户数:过滤出action=register的数据就可以
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     *
     * 数据演化过程:
     * accessDStream -> filter -> action(register) -> mapToPair -> reduceByKey
     *
     * @param accessDstream
     */
    public static void calculateRegistercount(JavaDStream<String> accessDstream){

        /**
         * accessDStream -> filter -> action(register)
         */
        JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {

                String[] lines = v1.split(" ");
                String action = lines[5];
                String actionValue = "register";
                if (actionValue.equals(action)){
                    return true;
                } else {
                    return false;
                }
            }
        });


        /**
         * filterDstream -> mapToPair -> <2020-12-20_register,1>
         */
        JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String,Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<String,Integer>(line[0] + "_" + line[5], 1);
            }
        });

        /**
         * pairDstream -> reduceByKey -> <2020-12-20_register,5>
         */
        JavaPairDStream<String,Integer> registerCountStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });

        registerCountStream.print();

        /**
         * (2020-12-21_register,11)
         */
    }

    /**
     * 求出热门板块
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     *
     * 数据演化过程:
     * filterDstream -> mapToPair -> <2020-12-20_fashion,1> -> reduceByKey -> <2020-12-20_fashion,5>
     *
     * @param filterDstream
     */
    public static void calculateUserSectionPV(JavaDStream<String> filterDstream){

        /**
         * filterDstream -> mapToPair -> <2020-12-20_fashion,1>
         */
        JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String,Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<String,Integer>(line[0] + "_" + line[4], 1);
            }
        });

        /**
         * pairDstream -> reduceByKey -> <2020-12-20_fashion,5>
         */
        JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });

        pvStream.print();

        /**
         * (2020-12-21_internet,16)
         * (2020-12-21_military,24)
         * (2020-12-21_aviation,21)
         * (2020-12-21_carton,19)
         * (2020-12-21_government,25)
         * (2020-12-21_tv-show,19)
         * (2020-12-21_country,14)
         * (2020-12-21_movie,13)
         * (2020-12-21_international,16)
         * ...
         */

    }
}

这里没有做过多的业务分析,可以根据前面所学的知识进行扩展,将算子灵活运用组合起来!


以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

上一篇:Spark Streaming计算wordCount


下一篇:如何将网络音频流保存到文件(c / java)