【问题标题】:Failed to start bean kafkaListenerContainer: java.lang.IllegalArgumentException无法启动 bean kafkaListenerContainer: java.lang.IllegalArgumentException
【发布时间】:2018-04-17 15:42:54
【问题描述】:

我使用this 作为示例来使用 spring 集成读取文件,它工作正常,但是当我尝试将文件发送到 kafka 生产者时它不起作用。我试图在互联网上查找此问题,但找不到帮助。 这是我的代码:

文件:MessageProcessingIntegrationFlow.java:

@Bean
public IntegrationFlow writeToFile() {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
       .transform(m -> new StringBuilder((String)m).toString().toUpperCase())
//                .handle(fileWritingMessageHandler)
    .handle(loggingHandler())
    .handle(kafkaProducerMessageHandler())
    .get();
}



 //producing channel
@Bean(name="kafkaChannel")
public DirectChannel kafkaChannel() {
    return new DirectChannel();
}

@Bean
public DirectChannel consumingChannel() {
  return new DirectChannel();
}


    @Bean
@ServiceActivator(inputChannel = "kafkaChannel")
public MessageHandler kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression(kafkaTopic));
    handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> properties = new HashMap<>();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  // introduce a delay on the send to allow more messages to accumulate
  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

  return properties;
}

//consumer configuration....
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
  KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
      new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer());
  kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
  return kafkaMessageDrivenChannelAdapter;
}

@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
  ContainerProperties containerProps = new ContainerProperties(kafkaTopic); //set topic name
  return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(
      consumerFactory(), containerProps);
}

@Bean
public ConsumerFactory<?, ?> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> properties = new HashMap<>();
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
  // automatically reset the offset to the earliest offset
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  return properties;
}

这里是堆栈跟踪:

 org.springframework.context.ApplicationContextException: Failed to start bean 'kafkaListenerContainer'; nested exception is java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at com.porterhead.Application.main(Application.java:25) [classes/:na]
 Caused by: java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided
at org.springframework.util.Assert.isTrue(Assert.java:92) ~[spring-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:199) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
... 12 common frames omitted


     

我不知道我做错了什么。如果您需要有关此的更多详细信息,请告诉我。 谢谢。

【问题讨论】:

    标签: spring spring-boot apache-kafka spring-integration spring-kafka


    【解决方案1】:

    KafkaProducerMessageHandler 是单向组件,它不会产生回复。它只是发布到 Kafka 主题,仅此而已。因此,您无法像处理 handle(loggingHandler()) 那样继续处理它。 KafkaProducerMessageHandler 必须是流中的最后一个端点。与 FileWritingMessageHandler 不同的是 AbstractReplyProducingMessageHandler 并继续流程。

    尽管如此,请考虑在未来正确描述问题:什么是预期的,什么是错误的。答案是我最好的猜测,因为我知道所有这些组件的代码。

    【讨论】:

    • 我想读取文件并将其发送给kafka生产者,有没有其他方法可以将数据发送给kafka?
    • 我们知道了,但问题是什么。当没有特别的问题时,我们不能在这里推测。而且:这里没有人为您编写代码。这是我们的责任。
    • 您只需从您的流程中删除handle(loggingHandler())。也是单向的,不能在流中间
    • 我删除了 loggingHandler(),请参阅编辑后的问题。谢谢。
    • 你使用旧的 Spring Boot 。它与 Spring Kafka 1.2.2 不兼容。考虑切换到最新的:projects.spring.io/spring-boot。你已经需要 Spring Framework 4.3.x
    猜你喜欢
    • 2019-01-02
    • 2022-06-29
    • 2019-10-26
    • 2019-05-15
    • 1970-01-01
    • 1970-01-01
    • 2014-03-14
    • 2020-02-01
    • 1970-01-01
    相关资源
    最近更新 更多