【问题标题】:How to read Kafka message header from a kstream apllication如何从 kstream 应用程序中读取 Kafka 消息头
【发布时间】:2020-07-17 10:14:58
【问题描述】:

是否可以从 kstream 应用程序读取我在 kafkaProducer 应用程序中设置的 Kafka 消息头?我的 KafkaProducer 看起来像这样;我在消息中设置了标题

public class Producer {
private final org.slf4j.Logger log = LoggerFactory.getLogger(Producer.class);
@Value("${topic.name}")
private String TOPIC;

private final KafkaTemplate<Integer, testEvent> kafkaTemplate;

@Autowired
public Producer(KafkaTemplate<Integer, testEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
 }
 public void sendTestEvent(DataDto data) throws Exception {
TestEvent=testEvent.newBuilder()
.setTestEventId(data.getTestEventId())
.setTest(data.getTest().toString())
.build();
Message<testEvent> message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.TOPIC, TOPIC)
                .setHeader(KafkaHeaders.MESSAGE_KEY, 999)
                .setHeader(KafkaHeaders.PARTITION_ID, 0)
                .setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
                .build();
      this.kafkaTemplate.send(message);
      log.info(String.format("Produced user -> %s", event));
}

Kstream 应用程序是

public class MessageReader {


@Bean
public KStream<String, testEvent> kstreamPromotionUppercase(StreamsBuilder builder) {
    
    KStream<String, testEvent> sourceStream = builder.stream("test-topic");

    sourceStream.print(Printed.<String, testEvent>toSysOut().withLabel("Original Stream"));
    
    KStream<String, testEvent> uppercaseStream =sourceStream.mapValues(this::MessageReaderCode);
    
    return sourceStream;
}

如何读取我设置到我的 kafkaproducer 中的 kstream 中的标头。

【问题讨论】:

标签: java apache-kafka-streams confluent-platform


【解决方案1】:

下面是一个简单的示例,说明如何处理您的 KStream:

sourceStream.process(() -> new ReadKafkaHeaderProcessor());

那么在ReadKafkaHeaderProcessor的process方法中可以这样做:

@Override
public void process(Object key, Object value) {
    Header header = context.headers().lastHeader(KafkaHeaders.TOPIC);
}

【讨论】:

    猜你喜欢
    • 2021-10-23
    • 2020-05-02
    • 2018-09-24
    • 1970-01-01
    • 2021-11-20
    • 1970-01-01
    • 1970-01-01
    • 2021-08-27
    • 1970-01-01
    相关资源
    最近更新 更多