【发布时间】:2021-06-15 22:36:28
【问题描述】:
我正在尝试 BigQueryIO.write<T>() 使用 Apache Beam 指南 (BigQuery I/O connector) 和 BigQueryIO javadoc 中的代码示例,两者似乎都有一个示例,其中包含一些 Quote 类,该类具有 java Instant 类型的字段(不清楚是java.time.Instant,还是org.joda.time.Instant),
但是,我从 Jackson 那里得到一个例外,说“不支持类型”,无论如何我都无法让它工作,下面是完整的堆栈。
我不知道在我的管道中哪里可以“添加 com.fasterxml.jackson.datatype:jackson-datatype-joda 模块”。有什么想法吗?
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Joda date/time type `org.joda.time.Instant` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-joda" to enable handling (through reference chain: com.google.api.services.bigquery.model.TableRow["timestamp"])
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
at com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer.serialize(UnsupportedTypeSerializer.java:35)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:808)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:764)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:720)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:35)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4487)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3742)
at org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder.encode(TableRowJsonCoder.java:45)
at org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder.encode(TableRowJsonCoder.java:32)
at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:42)
at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
代码是这样的:
class Quote {
final Instant timestamp;
final String exchange;
final String symbol;
final double price;
Quote(Instant timestamp, String exchange, String symbol, double price) {
// initialize all member variables.
}
}
PCollection<Quote> quotes = ...
quotes.apply(BigQueryIO
.<Quote>write()
.to("my-project:my_dataset.my_table")
.withSchema(new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
new TableFieldSchema().setName("exchange").setType("STRING"),
new TableFieldSchema().setName("symbol").setType("STRING"),
new TableFieldSchema().setName("price").setType("FLOAT"))))
.withFormatFunction(quote -> new TableRow().set(..set the columns..))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
【问题讨论】:
-
您是否以某种方式缺少“jackson_databind”依赖项?例如,如果您使用的是 Maven,
com.fasterxml.jackson.core jackson-databind 2.12.1 -
@chamikara -- 我的 pom.xml 中有
jackson-databind和jackson-datatype-joda -
注意,如果我注释掉 Instant/TIMESTAMP 字段,那么一切正常,并且我确实将行写入 BQ
标签: java google-bigquery apache-beam