window expects a time attribute for grouping in a stream environment.

完整报错如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
 at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
 at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
 at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
 at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
 at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
 at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
 at GroupByWindowAggregation.main(GroupByWindowAggregation.java:44)

 

牢记概念:

Flink的Table必定有Schema
 

调试手段

代码中加入:

System.out.println(orders.getSchema());

root
 |-- user: BIGINT
 |-- product: STRING
 |-- amount: INT
 |-- rowtime: TIMESTAMP(3) *ROWTIME*

 

解决方案

Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());

对应的OrderStream是:


// *************************************************************************
//     USER DATA TYPES
// *************************************************************************

/*
 * Simple POJO.
 */


import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;

public class OrderStream
{
    public Long user;
    public String product;
    public int amount;
    public Long rowtime;

    public OrderStream()
    {
    }

    public OrderStream(Long user, String product, int amount,Long rowtime)
    {
        this.user = user;
        this.product = product;
        this.amount = amount;
        this.rowtime=rowtime;
    }

    @Override
    public String toString() {
        return "Order{" +
                "user=" + user +
                ", product='" + product + '\'' +
                ", amount ='" + amount  + '\'' +
                ", rowtime="  + rowtime +
                '}';
    }
}

 对应的主程序为:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


    DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
            new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
            new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
            new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
            new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
            new OrderStream(1L, "diaper", 4,1505528400L),//2017-09-16 10:20:00
            new OrderStream(1L, "diaper", 4,1505528400L)//2017-09-16 10:20:00
    ));


        Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
        System.out.println(orders.getSchema());

 

上一篇:python Redis使用


下一篇:在资金进行保留两位小数的时候,进行下边的转化