【问题标题】:Spring Integration kafka : org.apache.kafka.common.config.ConfigException while running consumerSpring Integration kafka:运行消费者时出现org.apache.kafka.common.config.ConfigException
【发布时间】:2016-08-19 19:46:30
【问题描述】:

我尝试从代码运行 Kafka 消费者,但它总是异常,但我运行了 kafka-console-consumer.sh 文件来检查生产者它工作正常并显示代理收到的所有消息。以下是 pom.xml 代码和异常日志。请告诉我哪里错了。

public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:2181");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_coonfig" );
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.IntegerDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
}

这是我的测试类代码。

@Test
public void testSpringKafkaConsumer() throws InterruptedException {

    try{
    String topics[] = { "programTopic3" };
    ConsumerFactory<Integer, String> factory = new DefaultKafkaConsumerFactory<>(configs);
    factory.createConsumer();
    AbstractMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(factory,
            topics);
    container.setBeanName("container");

    final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
    container.setMessageListener(new MessageListener<Integer, String>() {

        @Override
        public void onMessage(ConsumerRecord<Integer, String> message) {
            // logger.info("received: " + message);
            System.out.println("received: --------+++++++++++++++------------" + message);
            records.add(message);
        }
    });
    KafkaMessageDrivenChannelAdapter<Integer, String> adaptor = new KafkaMessageDrivenChannelAdapter<>(container);

    adaptor.start();
    ConsumerRecord<Integer, String> poll = null;
    while((poll =records.take()) != null){
        System.out.println(poll.topic() + "  topic");
        System.out.println(poll.key() + "   key");
        System.out.println(poll.value()+ "  value");
    }

    }catch(Exception exception)
    {
        exception.printStackTrace();
        Assert.fail();
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>com.learn.kafka.integrate.spring</groupId>
<artifactId>SpringIntegrationKafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>SpringIntegrationKafka</name>
<description>Demo project for Spring Integration kafka</description>

<properties>
    <springVersion>4.2.5.RELEASE</springVersion>
    <springIntegrationVersion>4.2.5.RELEASE</springIntegrationVersion>
    <mockitoVersion>1.10.19</mockitoVersion>
</properties>
<repositories>
    <repository>
        <id>repository.spring.milestone</id>
        <name>Spring Milestone Repository</name>
        <url>http://repo.spring.io/milestone</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>${springIntegrationVersion}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>2.0.0.M1</version>
    </dependency>
    <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
</dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${springVersion}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-test</artifactId>
        <version>${springVersion}</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

异常日志:

org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:148)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:46)
at com.learn.kafka.integrate.spring.TestConsumer.testSpringKafkaConsumer(TestConsumer.java:83)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

【问题讨论】:

  • 您没有展示整个故事 ConsumerFactory&lt;Integer, String&gt; factory = new DefaultKafkaConsumerFactory&lt;&gt;(configs); - 似乎 configs 变量没有引用由 consumerConfigs() 创建的属性。

标签: spring spring-integration apache-kafka kafka-consumer-api


【解决方案1】:
org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.

您的new DefaultKafkaConsumerFactory&lt;&gt;(configs); 似乎没有使用consumerConfigs()

另一方面,KafkaMessageDrivenChannelAdapter 在其 ctor 中正是这样做的:

this.messageListenerContainer = messageListenerContainer;
this.messageListenerContainer.setAutoStartup(false);
this.messageListenerContainer.setMessageListener(this.listener);

因此,您的container.setMessageListener(new MessageListener&lt;Integer, String&gt;() { 无法访问。因此,records 中不会出现任何内容。

如果您还不了解此特定测试,我建议您避免使用 Spring Integration。

对于KafkaMessageDrivenChannelAdapter 变体,您必须将outputChannel 指定为QueueChannel 以使用poll 方式检索消息。

但你也必须在KafkaMessageDrivenChannelAdapter 周围做更多BeanFactory 的事情。

查看我们的测试用例了解更多信息:https://github.com/spring-projects/spring-integration-kafka/blob/master/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

同样关注基于Kafka-0.9的示例应用:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka

【讨论】:

  • 感谢您的宝贵反馈。我尝试了相同的代码,如果我给 9092 端口号和地址是 kafka 服务器端口,但如果从 kafka 消费者控制台运行,我提供 zookeeper 端口 2181。所以我对此感到困惑。
  • 我有错误:- 缺少所需的配置“group.id”,它没有默认值。如何解决这个错误?
猜你喜欢
  • 2019-07-28
  • 1970-01-01
  • 1970-01-01
  • 2018-09-22
  • 1970-01-01
  • 2017-08-17
  • 2015-11-13
  • 2016-10-31
  • 2015-09-15
相关资源
最近更新 更多