【发布时间】:2020-07-17 10:14:58
【问题描述】:
是否可以从 kstream 应用程序读取我在 kafkaProducer 应用程序中设置的 Kafka 消息头?我的 KafkaProducer 看起来像这样;我在消息中设置了标题
public class Producer {
private final org.slf4j.Logger log = LoggerFactory.getLogger(Producer.class);
@Value("${topic.name}")
private String TOPIC;
private final KafkaTemplate<Integer, testEvent> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<Integer, testEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendTestEvent(DataDto data) throws Exception {
TestEvent=testEvent.newBuilder()
.setTestEventId(data.getTestEventId())
.setTest(data.getTest().toString())
.build();
Message<testEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, TOPIC)
.setHeader(KafkaHeaders.MESSAGE_KEY, 999)
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
.build();
this.kafkaTemplate.send(message);
log.info(String.format("Produced user -> %s", event));
}
Kstream 应用程序是
public class MessageReader {
@Bean
public KStream<String, testEvent> kstreamPromotionUppercase(StreamsBuilder builder) {
KStream<String, testEvent> sourceStream = builder.stream("test-topic");
sourceStream.print(Printed.<String, testEvent>toSysOut().withLabel("Original Stream"));
KStream<String, testEvent> uppercaseStream =sourceStream.mapValues(this::MessageReaderCode);
return sourceStream;
}
如何读取我设置到我的 kafkaproducer 中的 kstream 中的标头。
【问题讨论】:
-
嗨,保罗,我看到了,但是有没有可用的示例。我知道如何使用 Stream DSL,但不知道如何使用处理器 API。
-
我对 Kafka 也很感兴趣,并且我自己更喜欢 Stream DSL。我花了一些时间进行挖掘,但我认为您所追求的可能是 StreamsMetadataState:github.com/apache/kafka/blob/trunk/streams/src/main/java/org/…
标签: java apache-kafka-streams confluent-platform