【问题标题】:How to configure kafka producer topic with more than one partition using Spring integration kafka如何使用 Spring 集成 kafka 配置具有多个分区的 kafka 生产者主题
【发布时间】:2016-07-28 12:22:58
【问题描述】:

我阅读了很多文章,但没有找到如何使用 Spring Integration Kafka 配置具有多个分区的主题(在运行时创建的主题)的 Producer。

我正在使用github link 为我的应用程序了解和配置 kafka。

请提供相同的解决方案

还有一点,kafkaHeader.messageKey 有什么用。

更新:下面是我开发的测试代码
` 公共类 OutboundTests { @测试 public void mytest() 抛出异常{

    KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>();
    ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic");
    meta.setValueClassType(SMSNotificationVO.class);
    meta.setKeyClassType(String.class);
    meta.setValueEncoder(new SMSObjectSerializer());

    ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10");
    meta1.setValueClassType(SMSNotificationVO.class);
    meta1.setKeyClassType(String.class);
    meta1.setValueEncoder(new SMSObjectSerializer());
    Properties producerProperties = new Properties();
    producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries"
    ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092");
    ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties);


    ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject());
    ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject());

    Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>();
    map.put("sms_topic", p);
    map.put("sms_topic_10", p1);

    ctx.setProducerConfigurations(map);

    // java code to send message
    Map<String, String> params = new HashMap<>();
    params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1");
    List<String> smsRecipients = new ArrayList<>();
    smsRecipients.add("9953225211");


     String message = "This Test message from junit topic sms_topic_1";
     Integer messageType =1;

    SMSNotificationVO vo = new SMSNotificationVO();
    vo.setMessage(message);
    vo.setMessageType(messageType);
    vo.setParams(params);
    vo.setSmsRecipients(smsRecipients);


    try{
     KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx);
     handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10"));
     handler.handleMessage(MessageBuilder.withPayload(vo)
             //if i remove the below comments I will get null pointer exception
             //.setHeader(KafkaHeaders.MESSAGE_KEY, "some key")
             //.setHeader(KafkaHeaders.PARTITION_ID, "1")
                .setHeader(KafkaHeaders.TOPIC, "sms_topic_10")
                .build());

    }catch(Exception e){
        e.printStackTrace();
        Assert.fail("Kafka SMS Producer fail");
    }

}  
}`

我得到空指针异常。以下是提及日志:

`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@7b94089b]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 24 more`

谢谢

【问题讨论】:

    标签: spring spring-integration apache-kafka kafka-producer-api


    【解决方案1】:

    主题与Producer 无关。您应该在发送或接收消息之前预先配置主题。

    您可以在运行时通过 Spring Integration 高级 API 使用 AdminUtils.createTopic() 创建主题,这只是为了方便测试,在生产环境中应避免使用。

    KafkaHeaders.messageKey 完全映射到标准 Kafka messageKey 并且可以使用 yes 来确定@​​987654326@ 中的目标partition

    因此,您绝对应该去 Apache Kafka 官方文档了解如何使用“多分区”创建主题,以及从 Producer 方面如何处理 partitionmessageKey 参数。

    【讨论】:

    • 感谢您的回复 我心中的疑问很少,如下所示: 1. messagekey 用于确定主题中的分区(有些像散列)然后如何选择我想要的那个键随机选择密钥,以便每次发布消息时它都会进入其他分区以利用并行性。 2. 在链接提及中,message-key-expressionpartition-id-expression 基于这些值是提及 3. 如果提及 message-key-expression,那么只有我可以使用 KafkaHeaders.messageKey。谢谢
    • 如果您在发送操作中指定特定的partition,则分区确定算法会绕过messageKey。这些表达式完全用于 Kafka Producer 操作。查看它的 API。事实上,KafkaHeaders.messageKey 可以用来代替message-key-expression
    • 但是如果你看到我的代码,setHeader(KafkaHeaders.MESSAGE_KEY, "some key") 给出了空指针异常
    • M-m-m。看起来您使用了足够的旧库版本。升级到最新的spring-integration-kafka-1.3.0 怎么样?
    • 项目使用 spring-integration-kafka-1.1.1。我会尝试 1.3.0 版本。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-26
    • 1970-01-01
    • 2019-02-03
    • 2019-01-11
    • 2019-11-30
    • 2017-02-05
    相关资源
    最近更新 更多