【发布时间】:2018-02-20 06:49:00
【问题描述】:
我的输入数据如下所示:
[someGarbagevalue]{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}
[someGarbagevalue]{"Id": 2, "Address": {"City":"Mumbai"}}
[someGarbagevalue]{"Id": 3, "Address": {"Street":"XYZ Road"}}
[someGarbagevalue]{"Id": 4}
[someGarbagevalue]{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}
在读取数据后,我正在剥离 [someGarbagevalue],然后尝试写入 BigQuery:
class processFunction(beam.DoFn):
def process(self, element):
global line
line = element[element.find(']') + 1:].strip()
return [line]
def run(argv=None):
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
first = p | 'read' >> ReadFromText(wordcount_options.input)
second = (first
| 'process' >> (beam.ParDo(processFunction()))
| 'write' >> beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table')
问题:
- 如何将数据作为每个
line类型写入 BigQuerySTRING。 - 如果我将数据作为每一行写入 BigQuery,我将如何查询 BigQuery 表?
当前错误:
Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Error while reading data, error message: JSON parsing error in row starting at position 0: Value encountered without start of object.
【问题讨论】:
标签: python google-bigquery google-cloud-dataflow apache-beam