【问题标题】:Apache Flink how to sink from Java ObjectNode -> JSON string?Apache Flink 如何从 Java ObjectNode -> JSON 字符串下沉?
【发布时间】:2018-05-17 13:15:37
【问题描述】:

所以这需要 JSON 字符串 -> Java ObjectNode。

    final DataStream<ObjectNode> inputStream = env
        .addSource(new RMQSource<ObjectNode>(
            connectionConfig,                   // config for the RabbitMQ connection
            "start",                            // name of the RabbitMQ queue to consume
            true,                               // use correlation ids; can be false if only at-least-once is required
            new JSONDeserializationSchema()))   // deserialization schema to turn messages into Java objects
        .setParallelism(1);                     // non-parallel source is only required for exactly-once

如何将它们从 Java ObjectNode -> JSON 字符串放回?

stream.addSink(new RMQSink<ObjectNode>(
            connectionConfig,
            "stop",
            new JSONSerializationSchema()
        ));

JSONSerializationSchema 不存在,但我需要类似的东西。

【问题讨论】:

    标签: apache-flink flink-streaming flink-cep


    【解决方案1】:

    像这样使用自定义SerializationSchema

    stream.addSink(new RMQSink<ObjectNode>(
                connectionConfig,
                "stop",
                new SerializationSchema<ObjectNode>() {
                        @Override
                        public byte[] serialize( ObjectNode element ) {
                            return element.toString().getBytes();
                        }
                }
            ));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-09-22
      • 2014-09-18
      • 1970-01-01
      • 2021-05-24
      • 1970-01-01
      • 2021-07-15
      • 2014-06-01
      相关资源
      最近更新 更多