Beam Schema定义

注解方式

@DefaultSchema(JavaFieldSchema.class)
public class Address {

    public final String city;
    public final String street;
    public final int pincode;
    @SchemaCreate
    public Address(String city, String street, int pincode) {
        super();
        this.city = city;
        this.street = street;
        this.pincode = pincode; 
    }
}

Avro IO读取文件时,自动推断出Schema信息

通过Row.withSchema设置Schema

Schema appSchema = Schema.builder().addStringField("string1").addInt32Field("int1").build();
Row row1 = Row.withSchema(appSchema).addValues("aaa,bbb", 1).build();
Row row2 = Row.withSchema(appSchema).addValues("ccc,ddd", 2).build();
Row row3 = Row.withSchema(appSchema).addValues("ddd,eee", 3).build();
PCollection<Row> inputTable =
        PBegin.in(p).apply(Create.of(row1, row2, row3).withRowSchema(appSchema));

通过PCollection.setSchema/PCollection.setRowSchema设置Schema

Pipeline p = Pipeline.create(options);
PCollection<Object> objs = p.apply(new CustomSource());

Schema type = Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
PCollectionTuple.of(new TupleTag<>("somedata"), objs).apply(SqlTransform.query("SELECT c1 FROM somedata")).setSchema(type, SerializableFunctions.identity(), SerializableFunctions.identity());
p.run().waitUntilFinish();

 

通过AvroUtils.schemaCoder设置Schema

private static PCollection<GenericRecord> inferSchema(
    PCollection<GenericRecord> input, org.apache.avro.Schema schema) {
  org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
  if (!input.hasSchema()) {
    input.setCoder(AvroUtils.schemaCoder(schema));
  }
  return input;
}

 

上一篇:beam search


下一篇:Beam简介