【发布时间】:2017-10-06 15:55:12
【问题描述】:
我在 Storage 中有一个 CSV 文件,我想读取它并将其写入 BigQuery Table。这是我的 CSV 文件,其中第一行是标题:
GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114
这是我的代码:
public class StorgeBq {
public static class StringToRowConverter extends DoFn<String, TableRow> {
private String[] columnNames;
private boolean isFirstRow = true;
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
String[] parts = c.element().split(",");
if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setZone("europe-west1-c");
options.setProject("mydata-dev");
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
.apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
.apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
.to("mydata-dev:DF_TEST.dataflow_table")
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
p.run().waitUntilFinish();
}
}
有一些问题: 1)当作业开始执行时,我看到有一个名为“DropInputs”的进程,我没有在我的代码中定义它!并在所有任务之前开始运行,Why??
2) 为什么管道不从第一个任务“ReadLines”开始? 3)在日志文件中,我看到在“WriteToBq”任务中,它试图找到其中一个数据作为字段,例如“1st Grade Teachers”不是字段而是“GroupName”的数据:
"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",
【问题讨论】:
-
你有工作ID吗?我认为 DropInputs 不应该出现在这里。
标签: java google-bigquery google-cloud-platform google-cloud-dataflow apache-beam