【问题标题】:how to consume an object from producer to consumer in kafka?如何在kafka中消费从生产者到消费者的对象?
【发布时间】:2017-09-07 21:33:07
【问题描述】:

以下是生产者和消费者的类。何时能够生成数据并且无法使用以下代码使用它。有人能帮我一下吗?我在编码中做错了吗?我的目标是从消费者那里读取CustomMessage 对象并将数据存储在数据库中。

在我的 cmd 提示符中,我打开了 5 个实例,1 个用于 zookeeper,1 个用于 kafka,1 个用于生产者,1 个用于消费者。我真的不明白。当我运行生产者和消费者类时,是否需要让所有实例保持正常运行?

这里的任何指针都会很有帮助。

提前致谢。

producer class:::
            
    package com.kafka.test.demo;
    
    import java.io.IOException;
    import java.util.Properties;
    
    import javax.xml.parsers.ParserConfigurationException;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.xml.sax.SAXException;
    
    public class KafkaaProducer {
        public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException {
            Properties props = new Properties();
//customMessage is a pojo object which should be send to the consumer..
            CustomMessage  customMessage= new CustomMessage();
            customMessage.setMessage("hello kafka");
            customMessage.setFan("1234213123");
            customMessage.setSourceSystem("Dmap");
            customMessage.setStatus("Unenrolled");
            customMessage.setMessageTyep("Simple Message");
            customMessage.setCreatedTime("5");
            customMessage.setProcessedTime("6");
            customMessage.setRetryCount("3");
            props.put("metadata.broker.list", "localhost:9092");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            //CustomMessageSerializer
props.put("key.serializer","com.kafka.test.demo.CustomMessageSerializer"); 
            props.put("value.serializer", "com.kafka.test.demo.CustomMessageSerializer");
            try {
                KafkaProducer<String, CustomMessage> producer = new KafkaProducer<String, CustomMessage>(props);
                producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", "customMessage",customMessage));
                //producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", customMessage));
                System.out.println("Message " + "" + " sent !!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    consumer class::
    package com.kafka.test.demo;
    
    import java.net.UnknownHostException;
    import java.util.Collections;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import com.mongodb.BasicDBObject;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoClient;
    
    public class KafkaaConsumer {
        public static void main(String[] args) throws InterruptedException {
            Properties props = new Properties();
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "testgroup");
            props.put("zookeeper.session.timeout.ms", "4000");
            props.put("zookeeper.sync.time.ms", "300");
            props.put("rebalance.backoff.ms", "40000");
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("value.deserializer", "com.kafka.test.demo.CustomMessageDeserializer");
            props.put("key.deserializer", "com.kafka.test.demo.CustomMessageDeserializer");
            //perisitMessage();
            try{
                KafkaConsumer<String,CustomMessage> consumer = new KafkaConsumer<String, CustomMessage>(props);
                consumer.subscribe(Collections.singletonList("NewMessageTopic"));
                while (true) {
                    ConsumerRecords<String, CustomMessage> messages = consumer.poll(100);
                    for (ConsumerRecord<String, CustomMessage> message : messages) {
                      System.out.println("Message received " + message);
                    }
                  perisitMessage();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void perisitMessage() {
            // TODO Auto-generated method stub
            CustomMessage  customMessage = new CustomMessage();
            customMessage.setMessage("hello kafka");
            customMessage.setFan("1234213123");
            customMessage.setSourceSystem("Dmap");
            customMessage.setStatus("Unenrolled");
            customMessage.setMessageTyep("Simple Message");
            customMessage.setCreatedTime("5");
            customMessage.setProcessedTime("6");
            customMessage.setRetryCount("3");
            try {
                 MongoClient mongoClient = new MongoClient( "localhost" , 27017 );
                 DB db = mongoClient.getDB("DeviceTrack");
                 DBCollection msgCollection = db.getCollection("messages");
                 BasicDBObject document = new BasicDBObject();
                 document.put("message", customMessage.getMessage());
                 document.put("fan", customMessage.getFan());
                 document.put("SourceSystem", customMessage.getSourceSystem());
                 document.put("RetryCount", customMessage.getRetryCount());
                 document.put("ProcessedTime", customMessage.getProcessedTime());
                 document.put("CreatedTime", customMessage.getCreatedTime());
                 document.put("MessageTyep", customMessage.getMessageTyep());
                 document.put("Status", customMessage.getStatus());
                 msgCollection.insert(document);
                 System.out.println("Inserted in the data in DB succesfully");
    
            } catch (UnknownHostException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
package com.kafka.test.demo;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomMessageDeserializer implements Deserializer {

    public Object deserialize(String arg0, byte[] arg1) {
        ObjectMapper mapper = new ObjectMapper();
        System.out.println("arg1"+arg1);
        CustomMessage message = null;
        try {
            message = mapper.readValue(arg1, CustomMessage.class);
        } catch (Exception e) {

            e.printStackTrace();
        }
        System.out.println(""+message);
        return message;
    }



    public void close() {
        // TODO Auto-generated method stub

    }

    public void configure(Map arg0, boolean arg1) {
        // TODO Auto-generated method stub

    }

}
        
    package com.kafka.test.demo;
    
    import java.util.Map;
    
    import org.apache.kafka.common.serialization.Serializer;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    public class CustomMessageSerializer implements Serializer {
    
        public byte[] serialize(String arg0, Object arg1) {
            byte[] retVal = null;
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                retVal = objectMapper.writeValueAsString(arg1).getBytes();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("value ::::::"+retVal);
            return retVal;
        }
    
        public void close() {
            // TODO Auto-generated method stub
    
        }
    
        public void configure(Map arg0, boolean arg1) {
            // TODO Auto-generated method stub
    
        }
    }
    
package com.kafka.test.demo;

public class CustomMessage {
    
    private String messageId;
    private String parentMsgId;
    private String fan;
    private String message;
    private String sourceSystem;
    private String status;
    private String messageTyep;
    private String createdTime;
    private String processedTime;
    private String retryCount;
    
    /**
     * @return the messageId
     */
    public String getMessageId() {
        return messageId;
    }
    /**
     * @param messageId the messageId to set
     */
    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }
    /**
     * @return the parentMsgId
     */
    public String getParentMsgId() {
        return parentMsgId;
    }
    /**
     * @param parentMsgId the parentMsgId to set
     */
    public void setParentMsgId(String parentMsgId) {
        this.parentMsgId = parentMsgId;
    }
    /**
     * @return the fan
     */
    public String getFan() {
        return fan;
    }
    /**
     * @param fan the fan to set
     */
    public void setFan(String fan) {
        this.fan = fan;
    }
    /**
     * @return the message
     */
    public String getMessage() {
        return message;
    }
    /**
     * @param message the message to set
     */
    public void setMessage(String message) {
        this.message = message;
    }
    /**
     * @return the sourceSystem
     */
    public String getSourceSystem() {
        return sourceSystem;
    }
    /**
     * @param sourceSystem the sourceSystem to set
     */
    public void setSourceSystem(String sourceSystem) {
        this.sourceSystem = sourceSystem;
    }
    /**
     * @return the status
     */
    public String getStatus() {
        return status;
    }
    /**
     * @param status the status to set
     */
    public void setStatus(String status) {
        this.status = status;
    }
    /**
     * @return the messageTyep
     */
    public String getMessageTyep() {
        return messageTyep;
    }
    /**
     * @param messageTyep the messageTyep to set
     */
    public void setMessageTyep(String messageTyep) {
        this.messageTyep = messageTyep;
    }
    /**
     * @return the createdTime
     */
    public String getCreatedTime() {
        return createdTime;
    }
    /**
     * @param createdTime the createdTime to set
     */
    public void setCreatedTime(String createdTime) {
        this.createdTime = createdTime;
    }
    /**
     * @return the processedTime
     */
    public String getProcessedTime() {
        return processedTime;
    }
    /**
     * @param processedTime the processedTime to set
     */
    public void setProcessedTime(String processedTime) {
        this.processedTime = processedTime;
    }
    /**
     * @return the retryCount
     */
    public String getRetryCount() {
        return retryCount;
    }
    /**
     * @param retryCount the retryCount to set
     */
    public void setRetryCount(String retryCount) {
        this.retryCount = retryCount;
    }
}

【问题讨论】:

  • 您使用的是什么 kafka 版本。 ps传真的结果是什么| grep 卡夫卡

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

是的,您需要运行 zookeeper,然后运行 ​​Kafka 代理,然后启动您的消费者,然后开始生产,您应该在消费者中看到它。

此外,当您使用自定义对象时,我建议您使用 Jackson 序列化为字符串并发送它。另一方面,您始终可以反序列化它。然后将所有序列化程序更改为字符串。

【讨论】:

    【解决方案2】:

    您只需要 zookeeper 和 kafka 实例。

    1. 启动 Zookeeper
    2. 启动 Kafka
    3. 创建您的主题(“NewMessageTopic”)
    4. 启动您的生产者和消费者代码

    如果我理解你的话,你使用“kafka-console-producer”和“kafka-console-consumer”?使用 Kafka 集群不需要它们。如果您的代码有效,这应该没问题。如果通过 cmd 启动 kafka 需要做很多工作,您可以编写一个 .bat。

    喜欢

     :startZK
    echo Zookeeper wird gestartet
    Start "Zookeper" C:\zookeeper-3.4.9\bin\zkServer.cmd
    echo Bitte warten bis Zookeeper gestartet ist.
    pause
    echo Kafka Wird Gestartet
    Start "Kafka" C:\kafka_2.11-0.10.2.0\bin\windows\kafka-server-start.bat C:\kafka_2.11-0.10.2.0\config\server.properties
    
    goto Top
    

    乍一看您的代码看起来是正确的。 我不知道你只打印你得到的数据到你的system.out?

                while (true) {
                    ConsumerRecords<String, CustomMessage> messages = consumer.poll(100);
                    for (ConsumerRecord<String, CustomMessage> message : messages) {
                      System.out.println("Message received " + message);<-- just a syso not more :/
                    }
                  perisitMessage(); <-- maybe give him the message ?
                }
    

    如果你在大纲上看到你的消息??它工作得很好。如果你没有看到任何东西,我今晚可以仔细看看。给我一个提示。但我没有使用 MongoDB 的经验。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-01-21
      • 2018-12-18
      • 1970-01-01
      • 1970-01-01
      • 2015-03-25
      • 2019-07-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多