【问题标题】:write to multiple Kafka topics in apache-beam?在 apache-beam 中写入多个 Kafka 主题?
【发布时间】:2020-07-06 18:14:19
【问题描述】:

我正在执行一个简单的字数统计程序,其中我使用一个 Kafka 主题(生产者)作为输入源,然后我对其应用 pardo 以计算字数。现在我需要帮助根据它们的频率写出不同主题的词。假设所有频率均匀的单词将转到主题 1,其余的将转到主题 2。

谁能帮我举个例子?

【问题讨论】:

    标签: apache-kafka apache-beam apache-beam-kafkaio


    【解决方案1】:

    这可以使用 Kafka.io writeRecord 方法完成,该方法采用 Producer 然后使用 new Produce("topic_name","key","value") -

    下面是代码-:

         static class ExtractWordsFn extends DoFn<String, String> {
            private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
            private final Distribution lineLenDist =
                    Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
    
            @ProcessElement
            public void processElement(@Element String element, OutputReceiver<String> receiver) {
                lineLenDist.update(element.length());
                if (element.trim().isEmpty()) {
                    emptyLines.inc();
                }
                String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
                for (String word : words) {
                    if (!word.isEmpty()) {
                        receiver.output(word);
                    }
                } 
            }
        }
    
       
        public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> {
            @Override
            public ProducerRecord<String, String> apply(KV<String, Long> input) {
                if(input.getValue()%2==0)
                 return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString());
                else
                    return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString());
            }
        }
    
        public static class CountWords
                extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
            @Override
            public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
    
                PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
                PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
                return wordCounts;
            }
        }
    
     p.apply("ReadLines", KafkaIO.<Long, String>read()
                    .withBootstrapServers("localhost:9092")
                    .withTopic("copy")// use withTopics(List<String>) to read from multiple topics.
                    .withKeyDeserializer(LongDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
                    .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                    .withLogAppendTime()
                    .withReadCommitted()
                    .commitOffsetsInFinalize()
                    .withProcessingTime()
                    .withoutMetadata()
            )
    .apply(Values.create())
    .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
    .apply(new CountWords())  
    .apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>>
    .setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
    .apply("WriteCounts", (KafkaIO.<String, String>writeRecords()
     .withBootstrapServers("localhost:9092")
     //.withTopic("test")
     .withKeySerializer(StringSerializer.class)
     .withValueSerializer(StringSerializer.class)
                    ))
    

    【讨论】:

    • 启用kafka属性auto.commit后就不需要commitOffsetsInFinalize()
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-03
    • 1970-01-01
    • 2021-05-20
    • 1970-01-01
    • 2018-07-15
    • 1970-01-01
    相关资源
    最近更新 更多