【问题标题】:How to manage Apache-Beam TextIO exceptions into failures?如何将 Apache-Beam TextIO 异常管理为失败?
【发布时间】:2022-08-16 02:02:48
【问题描述】:

如何将 TextIO 异常转换为失败? 有时当我使用 TextIO.read() 我有

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.FileNotFoundException:没有文件匹配规范: src/test/resources/config/qqqqqqq 如何区分独立失败列表的例外情况? 例如这段代码: 我有一个包含其他文件列表的文件,需要将所有文件中的所有行作为一个列表读取

   PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);
    PCollection<String> lines = pipeline           
            .apply(TextIO.read().from(\"src/test/resources/config/W-PSFV-LOG-FILE-2022-05-16_23-59-59.txt\"))
            .apply(MapElements.into(TypeDescriptors.strings()).via(line -> \"src/test/resources/config/\" + line))
            .apply(TextIO.readAll());
    ;
    lines.apply(Log.ofElements());
    pipeline.run();

但是,如果其中一个文件损坏,它会抛出 FileNotFoundException 并停止。不想停下来,我想获取所有现有文件的列表并列出损坏的文件

    标签: java error-handling apache-beam apache-beam-io


    【解决方案1】:

    我认为您可以使用死信队列来解决您的问题。

    Beam 建议在 MapElements 中使用 TupleTagsexceptionIntoexceptionVia 方法进行本机错误处理。

    然后它返回一个Result 结构,具有良好的输出 PCollection 和失败的 PCollection。

    您还可以使用名为 Asgarde 的库:

    https://github.com/tosun-si/asgarde

    PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
    
        PCollection<String> lines = pipeline           
                .apply(TextIO.read().from("src/test/resources/config/W-PSFV-LOG-FILE-2022-05-16_23-59-59.txt"))
    
        WithFailures.Result<PCollection<String>, Failure> result = CollectionComposer.of(lines)
                .apply(MapElements.into(TypeDescriptors.strings()).via(line -> "src/test/resources/config/" + line));
        ;
    
       // Gets outputs and Failure PCollections.
       PCollection<String> output = result.output();
       PCollection<Failure> failures = result.failures();
       
       // Then you can sink your Failures in database, GCS file or topic if needed...
       ......
        
        pipeline.run();
    
    

    Failure 对象由Asgarde 库提出,并将当前输入元素作为字符串和异常:

    public class Failure implements Serializable {
        private final String pipelineStep;
        private final String inputElement;
        private final Throwable exception;
    
    

    如果要使用此代码,则必须导入 Asgarde 库,例如在 pom.xml 文件中使用 Maven:

    <dependency>
        <groupId>fr.groupbees</groupId>
        <artifactId>asgarde</artifactId>
        <version>0.19.0</version>
    </dependency>
    

    或使用 Gradle:

    implementation group: 'fr.groupbees', name: 'asgarde', version: '0.19.0'
    

    PS:我是Asgarde 库的创建者,项目的自述文件显示了许多将Dead letter queue 与本机BeamAsgarde 库一起应用的示例。

    不要犹豫,阅读项目的自述文件:https://github.com/tosun-si/asgarde

    【讨论】:

    • 抱歉,您提供的代码无法编译:WithFailures.Result&lt;PCollection&lt;String&gt;, Failure&gt; result = CollectionComposer.of(lines) .apply(MapElements.into(TypeDescriptors.strings()).via(line -&gt; "src/test/resources/config/" + line)); 错误:Cannot resolve method 'apply(MapElements&lt;NewInputT, String&gt;)'
    • 如果要使用此代码,则必须导入 Asgarde 库,我编辑了第一篇文章来解释,如何导入它。
    • 感谢阿斯加德。不能使用它,因为大项目已经有自己的这样的解决方案。是否可以在 MapElements .via() 中调用 TEXTIO.read()/radall() 并在 result.failures(); 中捕获异常? > 所有异常都需要由错误管理器捕获并使用转换而不是 doFn
    • 我认为您不能在 MapElements 中使用 TEXTIO.read().readAll(),因为 TextIO.read 是输入连接器。 MapElements 是一个应用转换的本机组件,在幕后它等同于带有DoFnParDo
    • 如果您的文件不大,也许您可​​以编写从ParDoDoFn 中的路径读取文件的逻辑,在此逻辑中,您可以应用错误处理。如果文件很大,则不建议这样做,因为您的转换是在工作人员内部计算的。
    【解决方案2】:

    您可以先使用 FileIO 将文件拆分为可读的现有文件和不存在的文件。

    PCollection<KV<String, String>> categoryAndFiles = p
         .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
          // withCompression can be omitted - by default compression is detected from the filename.
         .apply(FileIO.readMatches().withCompression(GZIP))
         .apply(MapElements
             // uses imports from TypeDescriptors
             .into(kvs(strings(), strings()))
             .via((ReadableFile f) -> {
               try {
                 f.open();
                 return KV.of(
                     "readable-existing",
                     f.getMetadata().resourceId().toString());
               } catch (IOException ex) {
                 return KV.of(
                     "readable-existing",
                     f.getMetadata().resourceId().toString());
               }
             }));
    

    改编自example

    【讨论】:

      猜你喜欢
      • 2019-06-22
      • 1970-01-01
      • 1970-01-01
      • 2021-12-04
      • 2022-08-16
      • 1970-01-01
      • 1970-01-01
      • 2019-04-05
      • 1970-01-01
      相关资源
      最近更新 更多