【问题标题】:SideInputs corrupt the data in DataFlow's PipelineSideInputs 破坏了 DataFlow 管道中的数据
【发布时间】:2017-11-02 19:15:58
【问题描述】:

我有一个 Dataflow 管道(SDK 2.1.0,Apache Beam 2.2.0),它只是从 GCS 读取 RDF(在 N-Triples 中,所以它只是文本文件),以某种方式对其进行转换并将其写回 GCS,但是在不同的桶中。在这个管道中,我使用三个单个文件(每个侧输入一个文件)的侧输入,并在 ParDo 中使用它们。

为了在 Java 中使用 RDF,我使用 Apache Jena,因此每个文件都被读入 Model 类的一个实例。由于 Dataflow 没有 Coder,我自己开发了它(RDFModelCoder,见下文)。它在我创建的许多其他管道中运行良好。

这个特定管道的问题是当我添加侧输入时,执行失败并出现异常,表明数据损坏,即添加了一些垃圾。一旦我删除了侧输入,管道就会成功完成执行。

异常(它是从RDFModelCoder 抛出的,见下文):

Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
    at org.apache.jena.atlas.io.IO.exception(IO.java:233)
    at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
    at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
    at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
    at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
    at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
    at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
    at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
    at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
    at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
    at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
    at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
    at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
    at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
    at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
    at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
    at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)

在这里你可以看到垃圾(在最后):

<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������

管道:

val one = p.apply(TextIO.read().from(config.getString("source.one")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val two = p.apply(TextIO.read().from(config.getString("source.two")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val three = p.apply(TextIO.read().from(config.getString("source.three")))
             .apply(Combine.globally(SingleValue()))
             .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val sideInput = PCollectionList.of(one).and(two).and(three)
                .apply(Flatten.pCollections())
                .apply(View.asList())

p.apply(RDFIO.Read
                  .from(options.getSource())
                  .withSuffix(RDFLanguages.strLangNTriples))
 .apply(ParDo.of(SparqlConstructETL(config, sideInput))
                        .withSideInputs(sideInput))
 .apply(RDFIO.Write
                  .to(options.getDestination())
                  .withSuffix(RDFLanguages.NTRIPLES))

这里只是提供完整的图片是SingleValueConvertToRDFModel ParDos的实现:

class SingleValue : SerializableFunction<Iterable<String>, String> {
    override fun apply(input: Iterable<String>?): String {
        if (input != null) {
            return input.joinToString(separator = " ")
        }
        return ""
    }
}

class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
    private val lang: String = outputLang.name

    @ProcessElement
    fun processElement(c: ProcessContext?) {
        if (c != null) {
            val model = ModelFactory.createDefaultModel()
            model.read(StringReader(c.element()), null, lang)
            c.output(model)
        }
    }
}

RDFModelCoder的实现:

class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
                    private val encodeLang: String = RDFLanguages.strLangNTriples)
    : AtomicCoder<Model>() {

    private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)

    override fun decode(inStream: InputStream): Model {
        val bytes = StreamUtils.getBytes(inStream)
        val model = ModelFactory.createDefaultModel()

        model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here

        return model
    }

    override fun encode(value: Model, outStream: OutputStream?) {
        value.write(outStream, encodeLang, null)
    }

}

我多次检查侧输入文件,它们很好,它们有 UTF-8 编码。

【问题讨论】:

  • 这不是耶拿问题。输入不是有效的 UTF-8。 PeekReader 使用相当大的缓冲区(128K 字节)来改进字节到字符的转换。因此 MalformedInputException。 length = 1 表示该块中它是非法的单字节 UTF-8 序列。

标签: jena google-cloud-dataflow apache-beam


【解决方案1】:

错误很可能是在RDFModelCoder 的实现中。在实现encode/decode 时,必须记住提供的InputStreamOutputStream 并非由正在编码/解码的当前实例专有。例如。在当前Model 的编码形式之后,InputStream 中可能有更多数据。使用 StreamUtils.getBytes(inStream) 时,您将同时获取当前编码的 Model 的数据以及流中的任何其他数据。

通常在编写新的Coder 时,最好只组合现有的Coder,而不是手动解析流:这样不太容易出错。我建议将模型转换为byte[] 并使用ByteArrayCoder.of() 对其进行编码/解码。

【讨论】:

  • 不幸的是,我无法使用Java序列化机制或Kryo(例如)进行(反)序列化,因为模型不容易序列化,所以我必须将其写入文本格式并阅读它背部。我尝试使用 Kryo 来完成这项任务,但没有成功。
  • 如何确保我不会从输入流中读取超出需要的内容?
  • 看起来有帮助,我仍在测试,但第一次测试通过了!我查看了 StringUtf8Coder,发现他们调用 VarInt.encodeVarInt.decode 来写入/读取流中块的长度。我不知道为什么我没有早点找到它,可能是因为文档不是那么清楚...
【解决方案2】:

Apache Jena 提供了具有 Hadoop IO 支持的 Elephas IO 模块,因为 Beam 支持 Hadoop InputFormat IO,您应该能够使用它来读取您的 NTriples 文件。

这可能会更有效率,因为NTriples support in Elephas 能够并行化 IO 并避免将整个模型缓存到内存中(实际上它根本不会使用Model):

Configuration myHadoopConfiguration = new Configuration(false);

// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
                               NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need

// Read data only with Hadoop configuration.
p.apply("read",
         HadoopInputFormatIO.<LongWritable, TripleWritable>read()
        .withConfiguration(myHadoopConfiguration);

当然,这可能需要您对整体管道进行一些重构。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-28
    • 1970-01-01
    • 1970-01-01
    • 2023-04-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多