【发布时间】:2015-04-17 22:33:18
【问题描述】:
我已在 Dataflow 服务上成功执行了以下管道/作业的某些类型。但是,我的一种始终失败并出现后续错误。有什么想法吗?
import com.google.api.services.datastore.DatastoreV1;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.DatastoreIO;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
public class DatastoreExperiment {
public static void main(String[] args) {
String dataset = "mailfoogae";
String kind = "TrackedThread";
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
DatastoreV1.KindExpression entityKind = DatastoreV1.KindExpression.newBuilder().setName(kind).build();
DatastoreV1.Query entityQuery = DatastoreV1.Query.newBuilder().addKind(entityKind).build();
PCollection<DatastoreV1.Entity> entities = p.apply(DatastoreIO.readFrom(dataset, entityQuery));
entities.apply(ParDo.of(new DoFn<DatastoreV1.Entity, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(1);
}
}))
.apply(Combine.globally(new Sum.SumIntegerFn()))
.apply(ParDo.of(new DoFn<Integer, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
}
}))
.apply(TextIO.Write.named("WriteMyFile-" + kind).to("gs://temp.streak.com/outputData-" + kind + ".txt"));
p.run();
}
}
并产生以下错误:
Apr 17, 2015, 2:45:59 PM (9c3980cbff15b512): java.io.IOException: Failed to start reading from source: Datastore: host https://www.googleapis.com; dataset mailfoogae; query: kind { name: "TrackedLink" } filter { composite_filter { operator: AND filter { property_filter { property { name: "__key__" } operator: GREATER_THAN_OR_EQUAL value { key_value { partition_id { dataset_id: "s~mailfoogae" } path_element { kind: "TrackedLink" name: "5f8e8209-617f-44d0-95df-36dcdea79151" } } } } } filter { property_filter { property { name: "__key__" } operator: LESS_THAN value { key_value { partition_id { dataset_id: "s~mailfoogae" } path_element { kind: "TrackedLink" name: "7d1e71de-9a37-4063-9569-33c0dad538fe" } } } } } } } at
com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:321) at
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:174) at
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:121) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:130) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:95) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:139) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:124) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: Backend Error at
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:721) at
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:712) at
com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:313) ... 11 more
Caused by: com.google.api.services.datastore.client.DatastoreException: Backend Error at
com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at
com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at
com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:771) at
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:719) ... 13 more
Caused by: com.google.api.client.http.HttpResponseException: 503 Service Unavailable Backend Error at
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054) at
com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78) ... 17 more
【问题讨论】:
-
所以,只是为了澄清一下:同样的管道适用于其他类型,但这种特殊类型会出现此错误? 1)如果您使用 DirectPipelineRunner 运行它是否有效? (即在本地而不是在服务上)2)您是否能够直接在 Datastore 浏览器中访问此类数据而无需 Dataflow?
-
1) 可以,但速度非常慢
-
因此,问题似乎在于 Dataflow 的 Datastore 读取器指定的查询限制太高,这会触发服务中的错误。您可以尝试使用本地构建的 Dataflow SDK 运行您的管道,并将 DatastoreIO.DatastoreReader (github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/…) 中的 QUERY_LIMIT 从 5000 更改为 500?同时,我将尝试将此更改引入下一个 SDK 版本。 Datastore 团队还表示,他们正在进行改进以使这种调整变得不必要,但目前是这样。
-
当然 - 我建议您以与之前完全相同的方式在 Dataflow 服务上运行,但使用经过修改的 SDK。为此,您可以从 github 查看 SDK,进行此修复,然后使用 maven 重建工件并在本地部署它。然后让你的程序依赖这个本地构建的工件版本并重新运行。
-
修复已被推送到 GitHub github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/… - 但是 Maven 工件推送目前不太频繁,目前大约每几周一次。
标签: google-cloud-datastore google-cloud-dataflow