Flink DataStream Window 剔除器 Evictor

Evictor可在Window Function执行前或后,从原Window中剔除元素。

本文总结Flink DataStream Window内置的三种剔除器: CountEvictorDeltaEvictorTimeEvictor的剔除原理及使用。

  • CountEvictor: 数量剔除器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
  • DeltaEvictor: 阈值剔除器。计算Window中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。
  • TimeEvictor: 时间剔除器。保留Window中最近一段时间内的元素,并丢弃其余元素。

注意: 因为在Window Function执行前剔除比较好理解,所以这里的示例均为在Window Function之后剔除元素。

CountEvictor

剔除原理

不论是在Window Function之前剔除,或是在Window Function之后剔除,最终,都会调用CountEvictor#evict方法。
如下,可以看出CountEvictor的剔除原理:

  • 窗口中数据总条数<=要保留的数据条数(maxCount),不剔除。
  • 否则,从Window头部遍历并剔除。
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
	if (size <= maxCount) { //总条数<=要保留的数据条数(maxCount),不剔除
		return;
	} else { //否则,遍历剔除
		int evictedCount = 0;
		for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
			iterator.next();
			evictedCount++;
			if (evictedCount > size - maxCount) {
				break;
			} else {
				iterator.remove();
			}
		}
	}
}

Window Function执行之后,从原Window中剔除元素

// 测试数据: 每秒钟一条。
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

// API
// maxCount: 要保留的元素数。
// doEvictAfter: 是否在Window Function执行之后剔除元素。默认false。
// CountEvictor.of(long maxCount)
// CountEvictor.of(long maxCount, boolean doEvictAfter)

// 示例: 在Window Function执行后,从原Window中剔除元素,使得每次触发窗口计算后,原Window中只保留3个元素
kafkaStream
    // 将从Kafka获取的JSON数据解析成Java Bean
    .process(new KafkaProcessFunction())
    // 提取时间戳生成水印 maxOutOfOrdernessSeconds=0
    .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
    // 按用户分组
    .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
    // 构造TimeWindow
    .timeWindow(Time.seconds(10))
    // 触发器: ContinuousEventTimeTrigger
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(4)))
    // 剔除器: Window Function执行后从Window中剔除元素,剔除后Window中只保留3个元素
    .evictor(CountEvictor.of(3,true))
    // 窗口函数: 获取这段时间内每个用户浏览的所有记录
    .process(new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
            StringBuilder allRecords= new StringBuilder();
            for (UserActionLog element : elements) {
                allRecords.append(element).append("\n");
            }

            String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
            String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

            String result="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 窗口所有数据: \n"+allRecords;
            out.collect(result);

        }
    })
    .print();

    
// 结果
// 第一次触发
Key: user_1 窗口开始时间: 2019-11-10 10:42:10 窗口结束时间: 2019-11-10 10:42:20 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:10', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:11', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:12', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:13', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:14', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:15', eventType='browse', productID='product_3', productPrice=30}

// 第二次触发: 可以看到,第一次触发后,原Window中的元素只保留了3条(第13、14、15秒)
Key: user_1 窗口开始时间: 2019-11-10 10:42:10 窗口结束时间: 2019-11-10 10:42:20 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:13', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:14', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:15', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:16', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:17', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:19', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:18', eventType='browse', productID='product_2', productPrice=20}

// 第三次触发: 同理,可以看到,第二次触发后,原Window中的元素只保留了3条(第17、19、18秒)
Key: user_1 窗口开始时间: 2019-11-10 10:42:10 窗口结束时间: 2019-11-10 10:42:20 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:17', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:19', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 10:42:18', eventType='browse', productID='product_2', productPrice=20}

DeltaEvictor

剔除原理

不论是在Window Function之前剔除,或是在Window Function之后剔除,最终,都会调用DeltaEvictor#evict方法。
如下,可以看出DeltaEvictor的剔除原理:

  • 先找到当前窗口的最后一条元素。
  • 遍历窗口中的每一条元素。每条元素(A)和最后一条元素(L),依据用户提供的DeltaFunction计算出一个Delta。计算出的Delta大于等于设定的阈值,则剔除该元素(A)。
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
    // 找到当前窗口的最后一条元素
	TimestampedValue<T> lastElement = Iterables.getLast(elements);
	// 遍历每条元素并计算Delta。计算出的Delta大于等于设定的阈值,则剔除该条元素
	for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
		TimestampedValue<T> element = iterator.next();
		if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
			iterator.remove();
		}
	}
}

Window Function执行之后,从原Window中剔除元素

// 测试数据: 每秒钟一条。
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

// API
// threshold: 设定的阈值。
// deltaFunction: 计算Delta的函数。
// doEvictAfter: 是否在Window Function执行之后剔除元素。默认false。
// DeltaEvictor.of(double threshold, DeltaFunction<T> deltaFunction)
// DeltaEvictor.of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter)

// 示例:
kafkaStream
   // 将从Kafka获取的JSON数据解析成Java Bean
   .process(new KafkaProcessFunction())
   // 提取时间戳生成水印 maxOutOfOrdernessSeconds=0
   .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
   // 按用户分组
   .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
   // 构造TimeWindow
   .timeWindow(Time.seconds(10))
   // 触发器: ContinuousEventTimeTrigger
   .trigger(ContinuousEventTimeTrigger.of(Time.seconds(4)))
   // 剔除器: Window Function执行后从Window中剔除元素。
   // 这里的剔除规则: 该窗口中,每条元素(A)对应的商品价格-最后一条元素(L)对应的商品价格,大于等于设定的阈值,则剔除该条元素(A)。
   // 自定义剔除规则
   .evictor(DeltaEvictor.of(10.0, new DeltaFunction<UserActionLog>() {
       @Override
       public double getDelta(UserActionLog oldDataPoint, UserActionLog newDataPoint) {
           return oldDataPoint.getProductPrice()-newDataPoint.getProductPrice();
       }
   },true))
   // 窗口函数: 获取这段时间内每个用户浏览的所有记录
   .process(new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
       @Override
       public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
           StringBuilder allRecords= new StringBuilder();
           for (UserActionLog element : elements) {
               allRecords.append(element).append("\n");
           }

           String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
           String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

           String result="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 窗口所有数据: \n"+allRecords;
           out.collect(result);

       }
   })
   .print();

// 结果
// 第一次触发
Key: user_1 窗口开始时间: 2019-11-10 11:19:20 窗口结束时间: 2019-11-10 11:19:30 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:20', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:21', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:22', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:23', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:24', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:25', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:26', eventType='browse', productID='product_2', productPrice=20}

// 第二次触发
// 可以看到,根据阈值剔除原理,这里剔除了第21、22、23秒的数据。
Key: user_1 窗口开始时间: 2019-11-10 11:19:20 窗口结束时间: 2019-11-10 11:19:30 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:20', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:24', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:25', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:26', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:27', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:28', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:29', eventType='browse', productID='product_3', productPrice=30}

// 第三次触发
// 可以看到,根据阈值剔除原理,这里没有剔除元素。
Key: user_1 窗口开始时间: 2019-11-10 11:19:20 窗口结束时间: 2019-11-10 11:19:30 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:20', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:24', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:25', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:26', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:27', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:28', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:19:29', eventType='browse', productID='product_3', productPrice=30}

TimeEvictor

剔除原理

不论是在Window Function之前剔除,或是在Window Function之后剔除,最终,都会调用TimeEvictor#evict方法。
如下,可以看出TimeEvictor的剔除原理:

  • 找到当前窗口时间截断点: 当前窗口最大时间点-要保留的时间段。
  • 遍历窗口中的每一条元素。某条元素的时间<=截断点,则剔除该条元素。

一句话,即保留Window最近一段时间内的数据。

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
	if (!hasTimestamp(elements)) {
		return;
	}
    
    // 当前Window的最大时间戳
	long currentTime = getMaxTimestamp(elements);
	// 截断点
	long evictCutoff = currentTime - windowSize;

    // 当某条元素的时间戳<=截断点,则剔除该元素
	for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
		TimestampedValue<Object> record = iterator.next();
		if (record.getTimestamp() <= evictCutoff) {
			iterator.remove();
		}
	}
}

Window Function执行之后,从原Window中剔除元素

// 测试数据: 每秒钟一条。
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

// API
// windowSize: 保留最近windowSize时间段内的数据
// doEvictAfter: 是否在Window Function执行之后剔除元素。默认false。
TimeEvictor.of(Time windowSize)
TimeEvictor.of(Time windowSize, boolean doEvictAfter)

// 示例
kafkaStream
    // 将从Kafka获取的JSON数据解析成Java Bean
    .process(new KafkaProcessFunction())
    // 提取时间戳生成水印 maxOutOfOrdernessSeconds=0
    .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
    // 按用户分组
    .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
    // 构造TimeWindow
    .timeWindow(Time.seconds(10))
    // 触发器: ContinuousEventTimeTrigger
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(4)))
    // 剔除器: 保留最近3秒的数据。在Window Function之后执行剔除操作。
    .evictor(TimeEvictor.of(Time.seconds(3),true))
    // 窗口函数: 获取这段时间内每个用户浏览的所有记录
    .process(new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
            StringBuilder allRecords= new StringBuilder();
            for (UserActionLog element : elements) {
                allRecords.append(element).append("\n");
            }

            String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
            String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

            String result="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 窗口所有数据: \n"+allRecords;
            out.collect(result);

        }
    })
    .print();
    
// 结果
// 第一次触发
Key: user_1 窗口开始时间: 2019-11-10 11:45:20 窗口结束时间: 2019-11-10 11:45:30 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:20', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:21', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:22', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:23', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:24', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:25', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:26', eventType='browse', productID='product_1', productPrice=10}

// 第二次触发
// 可以看到,第一次触发后,剔除了<=23秒的数据
Key: user_1 窗口开始时间: 2019-11-10 11:45:20 窗口结束时间: 2019-11-10 11:45:30 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:24', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:25', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:26', eventType='browse', productID='product_1', productPrice=10}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:27', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:28', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:29', eventType='browse', productID='product_2', productPrice=20}

// 第三次触发
// 可以看到,第二次触发后,剔除了<=26秒的数据
Key: user_1 窗口开始时间: 2019-11-10 11:45:20 窗口结束时间: 2019-11-10 11:45:30 窗口所有数据: 
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:27', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:28', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_1', eventTime='2019-11-10 11:45:29', eventType='browse', productID='product_2', productPrice=20}
上一篇:Yarn提交Flink任务参数介绍


下一篇:Flink内核源码解析系列(六):通过YarnJobClusterEntryPoint类启动JobManager进程