flink(十):Table&Sql注册表和视图

说明

  • 本文属于实战,讲解 Flink1.12 版本java代码注册表和视图的实现方法,开发环境搭建,参考上篇文章,这里不再赘述。

资料

  • 官方Flink 1.12中文版Table API&SQL文档地址

实现

讲解

  • 代码结构分为5部分,
    • 准备环境 env
    • 数据输入 source
    • 数据处理 transformation
    • 数据输出 sink
    • 启动任务 execute

代码

  • 所有代码基于java1.8
import static org.apache.flink.table.api.Expressions.$;

import java.util.Arrays;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/***
 * 
 * @Description Table API And SQl实例一:将DataStream注册为动态表或视图,再使用sql进行统计查询。
 */
public class DataStreamToTableAndView {
	
	public static void main(String[] args) throws Exception {
		//TODO 1. env环境准备
		StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
		
		//TODO 2. source
		DataStream<Order> orderA=env.fromCollection(Arrays.asList(
				new Order(1L,"beer",3),
				new Order(1L,"diaper",4),
				new Order(3L,"rubber",2)
				));
		
		DataStream<Order> orderB=env.fromCollection(Arrays.asList(
				new Order(2L,"beer",3),
				new Order(2L,"diaper",3),
				new Order(3L,"rubber",1)
				));
		
		
		//TODO 3. transformation 将DataStream数转换Table和View,然后查询
		Table tableA=tableEnv.fromDataStream(orderA,$("user"),$("product"),$("amount"));  //注册表
		
		tableEnv.createTemporaryView("tableB",orderB,$("user"),$("product"),$("amount")); //注册视图
		
		/**
		 * 查询:tableA中amount>2 和tableB中amount>1定时护甲并合并。
		 */
		String sql="select * from "+tableA+" where amount>2 union select * from tableB where amount>1 ";
//		sql="select * from "+tableA+" where amount>2 ";
		Table resultTable=tableEnv.sqlQuery(sql);
		System.out.println("结果表约束:");
		resultTable.printSchema();
		System.out.println("表名:"+resultTable);
		
		//将table转为DataStream
//		DataStream<Order> resultDs=tableEnv.toAppendStream(resultTable, Order.class);   //仅insert 操作修改动态表数据
		
		/**
		 * retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、
		 * 将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
		 */
		DataStream<Tuple2<Boolean, Order>> resultDsTwo=tableEnv.toRetractStream(resultTable, Order.class); //
		
		//TODO 4. sink
		resultDsTwo.print();
		
		//TODO 5. execute
		env.execute("");
	}

	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class Order{
		public Long user;
		public String product;
		public int amount;
	}
}

总结

  • Flink 支持table和视图开发,功能上各有特色,使用上看个人习惯。
  • 个人倾向于Table开发。但整体上table接口相比传统java编码风格有很大不同,使用上需要适应。另外各个版本Flink接口变动较大,旧版本大量接口废弃,希望后期能稳定下来。不然后期版本升级,大量功能接口升级,无异于二次开发。
上一篇:python中基本类型的连接组合和互相转换13种方式


下一篇:Source的并行度问题