【问题标题】:Merge Kafka record Values from single stream合并来自单个流的 Kafka 记录值
【发布时间】:2021-05-18 08:25:02
【问题描述】:

我有一个主题,它接收带有可能 部分 数据的 JSON 记录。我想合并这些数据,所以我尝试在最终数据记录中收集尽可能多的信息。

 t1: { id: '1234', attribute1: 'foo' }
 t2: { id: '1234', attribute2: 'bar' }

合并记录值后的期望流:

 t1: { id: '1234', attribute1: 'foo' }
 t2: { id: '1234', attribute1: 'bar', attribute2: 'bar' }

为此我尝试了:

 //key of the topic is id
 KStream<String, MyObject> input = ...
 return input.groupByKey().reduce((current, newEvent) -> return newEvent.merge(current)).toStream();

但这只会产生一个条目,因为 groupy/reduce 会产生一个 KTable。有没有可能做到这一点?


编辑:流定义是正确的,reduce 似乎默认情况下不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,请使用配置属性:

 cache.max.bytes.buffering: 0

必须设置。

【问题讨论】:

    标签: java apache-kafka-streams spring-kafka


    【解决方案1】:

    试试这个

    public class MergeStreams {
    
        public Topology buildTopology(Properties allProps) {
            final StreamsBuilder builder = new StreamsBuilder();
    
            final String rockTopic = allProps.getProperty("input.rock.topic.name");
            final String classicalTopic = allProps.getProperty("input.classical.topic.name");
            final String allGenresTopic = allProps.getProperty("output.topic.name");
    
            KStream<String, SongEvent> rockSongs = builder.stream(rockTopic);
            KStream<String, SongEvent> classicalSongs = builder.stream(classicalTopic);
            KStream<String, SongEvent> allSongs = rockSongs.merge(classicalSongs);
    
            allSongs.to(allGenresTopic);
            return builder.build();
        }
    
        public void createTopics(Properties allProps) {
            AdminClient client = AdminClient.create(allProps);
    
            List<NewTopic> topics = new ArrayList<>();
    
            topics.add(new NewTopic(
                    allProps.getProperty("input.rock.topic.name"),
                    Integer.parseInt(allProps.getProperty("input.rock.topic.partitions")),
                    Short.parseShort(allProps.getProperty("input.rock.topic.replication.factor"))));
    
            topics.add(new NewTopic(
                    allProps.getProperty("input.classical.topic.name"),
                    Integer.parseInt(allProps.getProperty("input.classical.topic.partitions")),
                    Short.parseShort(allProps.getProperty("input.classical.topic.replication.factor"))));
    
            topics.add(new NewTopic(
                    allProps.getProperty("output.topic.name"),
                    Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                    Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));
    
            client.createTopics(topics);
            client.close();
        }
    
        public Properties loadEnvProperties(String fileName) throws IOException {
            Properties allProps = new Properties();
            FileInputStream input = new FileInputStream(fileName);
            allProps.load(input);
            input.close();
    
            return allProps;
        }
    
        public static void main(String[] args) throws Exception {
            if (args.length < 1) {
                throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
            }
    
            MergeStreams ms = new MergeStreams();
            Properties allProps = ms.loadEnvProperties(args[0]);
            allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
            allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
            Topology topology = ms.buildTopology(allProps);
    
            ms.createTopics(allProps);
    
            final KafkaStreams streams = new KafkaStreams(topology, allProps);
            final CountDownLatch latch = new CountDownLatch(1);
    
            // Attach shutdown handler to catch Control-C.
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close(Duration.ofSeconds(5));
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }
    }
    

    需要更多细节,clickhere

    【讨论】:

    • 这是关于将多个流合并为一个,而不是将实际记录合并到一个流中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-17
    • 1970-01-01
    • 2018-04-25
    • 1970-01-01
    相关资源
    最近更新 更多