【发布时间】:2019-06-23 20:58:39
【问题描述】:
我已经建立了一个带有单个分区的 kafka 主题。
kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1
可以在该主题中推送许多消息。
但我希望单个消费者(对于给定的组)一个接一个地处理这些消息。
spring:
application:
name: file-consumer
cloud:
stream:
kafka:
binder:
type: kafka
brokers: localhost
defaultBrokerPort: 29092
defaultZkPort: 32181
configuration:
max.request.size: 300000
max.message.bytes: 300000
bindings:
fileWriteBindingInput:
consumer:
autoCommitOffset: false
bindings:
fileWriteBindingInput:
binder: kafka
destination: files.write
group: ${spring.application.name}
contentType: 'text/plain'
还有Java示例代码
@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
// I Would like here to synchronize the processing of messages one by one
// But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message
acknowledgment.acknowledge();
}
我的配置中缺少什么?
我想,虽然消息没有被确认(偏移量没有增加),但没有其他消息从同一个分区消费。
【问题讨论】:
标签: spring-boot apache-kafka spring-cloud spring-cloud-stream