【发布时间】:2018-12-13 11:45:50
【问题描述】:
我正在尝试执行一个 Google 数据流应用程序,但它抛出了这个异常
java.lang.IllegalArgumentException: No filesystem found for scheme gs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:213)
at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:700)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1028)
at br.com.sulamerica.mecsas.ExportacaoDadosPipeline.main(ExportacaoDadosPipeline.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
这是我的流水线代码片段
Pipeline.create()
.apply(PubsubIO.readStrings().fromSubscription(subscription))
.apply(new KeyExportacaoDadosToEntityTransform())
.apply(new ListKeyEmpresaSelecionadasTransform())
.apply(ParDo.of(new DoFn<List<Entity>, String>() {
@ProcessElement
public void processElement(ProcessContext c){
c.output(
c.element().stream()
.map(e-> e.getString("dscRazaoSocial"))
.collect(Collectors.joining("\r\n"))
);
}
}))
.apply(TextIO.write().to("gs://<my bucket>"))
.getPipeline()
.run();
这是用于执行我的管道的命令
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=br.com.xpto.foo.ExportacaoDadosPipeline \
-Dexec.args="--project=<projectID>\
--stagingLocation=gs://dataflow-xpto/exportacao/staging \
--output=gs://dataflow-xpto/exportacao/output \
--runner=DataflowRunner"
【问题讨论】:
-
您使用的是哪个 SDK 版本?我刚刚尝试使用WordCount code you get in the Quickstart 写入 GCS,我可以毫无问题地将文件写入 GCS。
-
您可能缺少对 GCS 文件系统的依赖项。也许在 Beam 中寻找可能支持 GCS 文件系统的软件包?
标签: java exception google-cloud-platform google-cloud-storage google-cloud-dataflow