【问题标题】:Kafka Connect: how to parse String to MapKafka Connect:如何将 String 解析为 Map
【发布时间】:2019-09-04 03:58:24
【问题描述】:

假设我有一个文件,其中填充了JSON 对象/行,并由换行符 (\n) 分隔。当基于 FileStreamSource 的连接器读取此文件时,它会将每一行视为java.lang.String

如何将 java.lang.String 解析为 java.util.Mapstruct 以执行进一步的转换(例如,使用 MaskField 屏蔽字段或使用 ExtractField)?

PS:问题不在于如何将一些java.lang.String 解析为java.util.Mapstruct,而是关于如何将这种解析逻辑与Kafka 集成(自定义Kafka 转换?)或通过其他方式获得相同的结果手段(例如在 Kafka 中配置某些东西或使用特定的连接器/转换等)

【问题讨论】:

    标签: json apache-kafka apache-kafka-connect


    【解决方案1】:

    正如 Apache Kafka 文档所说,FileStreamSource 并不是完全支持生产的连接器...

    也许你最好使用 spooldir 连接器,它支持行分隔的 JSON https://github.com/jcustenborder/kafka-connect-spooldir/blob/master/README.md

    【讨论】:

    • 问题是关于从java.lang.Stringjava.util.Mapstruct,而不是从文件或其他地方获取数据。
    • 您的问题是关于一般使用 Connect 的问题,不是吗?所以我的回答是 - 使用不同的连接器来支持你的要求
    【解决方案2】:

    有两种可能的方法:

    1. 您可以使用 Confluent 平台并使用适当的 KSQL 查询 (https://docs.confluent.io/current/ksql/docs/tutorials/index.html#ksql-tutorials) 运行连接器。
    2. 您可以启动 Kafka Stream 应用程序 (https://kafka.apache.org/documentation/streams/) 以及您的 Source 连接器。流应用程序将从连接器放置消息的主题/-s 中读取消息。您需要在 Kafka 流应用程序中实现转换逻辑。处理消息后,Stream 应用程序将其放入输出主题。下面是流应用代码的示例结构。
    Properties props = new Properties();
    
    ...
    
    final StreamsBuilder builder = new StreamsBuilder();
    Pattern pattern = Pattern.compile(<YOUR_INPUT_TOPIC_PATTERN>);
    KStream<String, String> source = builder.stream(pattern);
    
    ...
    
    source.mapValues((k,v) -> {
         Gson gson = new Gson();
         Map map = gson.fromJson(v, Map.class);
    
         // here is your transformation logi
    
         return v;
    }).to(<YOUR_OUTPUT_TOPIC>);
    
    ...
    
    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    
    ...
    
    streams.start();
    

    【讨论】:

    • 这只是java代码;这与 Kafka 连接器有何关系?您是否建议此代码应该是与例如使用的自定义转换器的一部分。源连接器?
    • @adrhc 是的,您需要使用类似此代码的内容并构建自己的转换器
    • @adrhc,为避免自定义源连接器或接收器连接器,您可以启动一个简单的 Kafka 流并获取带有所需字段/-s 的新消息。
    • @dmvkl - “获取带有提取字段的新消息”是什么意思?我无法从字符串中获取字段,这就是整个问题。
    • @dmvkl - 请详细说明您的答案或删除它,因为只是一些显示如何解析 JSON 字符串的 java 代码,这不是我试图解决的问题
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-04
    • 1970-01-01
    • 2020-05-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多