【问题标题】:Converting CSV file to Java Objects (POJO) and send it to ActiveMQ queue将 CSV 文件转换为 Java 对象 (POJO) 并将其发送到 ActiveMQ 队列
【发布时间】:2020-07-06 20:57:49
【问题描述】:

我的目标是读取 csv 文件,将其转换为 Java 对象 (POJO) 并将 Java 对象一一发送到 ActiveMQ 队列。下面是代码:

public void configure() throws Exception {
    from("file:src/main/resources?fileName=data.csv")               
    .unmarshal(bindy)
    .split(body())
    .to("file:src/main/resources/?fileName=equityfeeds.txt")
    .split().tokenize(",").streaming().to("jms:queue:javaobjects.upstream.queue");          
}

问题: 1.当我执行代码时,没有文件(equityfeeds.txt)被创建,也没有对象进入队列。怎么了?我现在不需要做任何处理。我只需要将 csv 解组为 POJO,并将 Java 对象一一发送到 ActiveMQ 队列。

EquityFeeds (POJO)

@CsvRecord(separator = ",",skipFirstLine = true)
public class EquityFeeds {

    @DataField(pos = 1) 
    private String externalTransactionId;

    @DataField(pos = 2)
    private String clientId;

    @DataField(pos = 3)
    private String securityId;

    @DataField(pos = 4)
    private String transactionType;

    @DataField(pos = 5, pattern = "dd/MM/YY")
    private Date transactionDate;

    @DataField(pos = 6)
    private float marketValue; 

    @DataField(pos = 7)
    private String priorityFlag;

请帮忙。请告诉我哪里出错了。

@pvpkiran:下面是我的 Camel Code for producer:

public void configure() throws Exception {
            from("file:src/main/resources?fileName=data.csv")               
                .unmarshal(bindy)
                .split(body())
                .streaming().to("jms:queue:javaobjects.upstream.queue");
}

以下是我的消费者代码(使用 JMS API):

@JmsListener(destination = "javaobjects.upstream.queue")
public void javaObjectsListener(final Message objectMessage) throws JMSException {
        Object messageData = null;
        if(objectMessage instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) objectMessage;
            messageData = objMessage.getObject();
        }
        System.out.println("Object: "+messageData.toString());
    }

我没有使用 Camel 来使用 JMSMessage。在消费者中,我使用 JMS API 来消费消息。我也没有测试代码。消息已进入 ActiveMQ,我正在使用 JMS API(如上)来使用消息。在终端中我得到 NullPointerException。还有 2 条消息进入 ActiveMQ.DLQ,给出以下错误消息:

java.lang.Throwable:Delivery[7] 超过重新交付策略限制:RedeliveryPolicy {destination = null,collisionAvoidanceFactor = 0.15,maximumRedeliveries = 6,maximumRedeliveryDelay = -1,initialRedeliveryDelay = 1000,useCollisionAvoidance = false,useExponentialBackOff =假,backOffMultiplier = 5.0,redeliveryDelay = 1000,preDispatchCheck = true},原因:null

【问题讨论】:

  • bindy 在这一行是什么.unmarshal(bindy) 也可以给一个样本 csv
  • final BindyCsvDataFormat bindy=new BindyCsvDataFormat(camelproject.EquityFeeds.class); sample csv **externalTransactionId,clientId,securityId,transactionType,transactionDate,marketValue,priorityFlag SAPEXTXN1,GS,ICICI,BUY,23/11/13,101.9,Y SAPEXTXN2,AS,REL,SELL ,20/11/13,121.9,N **

标签: java csv apache-camel activemq bindy


【解决方案1】:

试试这个。这应该工作

from("file:src/main/resources?fileName=equityfeeds.csv")
                    .unmarshal(new BindyCsvDataFormat(EquityFeeds.class))
                    .split(body())
                    .streaming().to("jms:queue:javaobjects.upstream.queue");
// This route is for Testing
from("jms:queue:javaobjects.upstream.queue").to("bean:camelBeanComponent?method=processRoute"); 

并编写一个消费者组件 bean

@Component
public class CamelBeanComponent {
    public void processRoute(Exchange exchange) {
        System.out.println(exchange.getIn().getBody());
    }
}

这个打印出来(如果你需要这样的输出,你需要添加toString()

EquityFeeds(externalTransactionId=SAPEXTXN1, clientId=GS, securityId=ICICI, transactionType=BUY, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=101.9, priorityFlag=Y)
EquityFeeds(externalTransactionId=SAPEXTXN2, clientId=AS, securityId=REL, transactionType=SELL, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=121.9, priorityFlag=N)

如果你使用.split().tokenize(","),那么每行(不是完整的行)中的每个字段都会转换为EquityFeeds对象(其他字段为空)作为消息发送到队列

【讨论】:

  • 我根据您上面的回答将我的代码更改为:public void configure() throws Exception { from("file:src/main/resources?fileName=data.csv") .unmarshal(bindy) .split(body()) .streaming().to("jms:queue:javaobjects.upstream.queue"); } 所有消息都进入了 ActiveMQ,但似乎 csv 消息没有正确解组为 Java 对象,因为当我使用它们时,我是得到 NullPointerException。请多多指教。
  • 当然 .. 这是代码 ...@JmsListener(destination = "javaobjects.upstream.queue") public void javaObjectListener(final Message objectMessage) throws JMSException { Object messageData = null; if(objectMessage instanceof ObjectMessage) { ObjectMessage objMessage = (ObjectMessage) objectMessage; messageData = objMessage.getObject(); } System.out.println("Object: "+messageData.toString()); } 我得到所有对象的 NullPointerException。此外,我刚刚注意到有 2 条消息发送到 ActiveMQ.DLQ
  • 您可以编辑问题并更新代码,而不是评论。如果您使用骆驼,则不需要@JmsListener。使用我提供的确切代码进行测试。队列消息将被读取并发送到上述方法。在示例中为camelBeanComponent::processRoute
  • 我刚刚编辑了问题并更新了代码。让我知道是否需要任何其他信息。请指导我并帮助我克服这个问题。
  • 好的。很公平。这个问题是关于“Converting CSV file to Java Objects (POJO) and send it to ActiveMQ queue”。我想已经实现了(从我的测试路线中可以看到)所以请接受这个问题。查看教程并了解为什么消费是一个问题。如果有问题再问一个问题
猜你喜欢
  • 1970-01-01
  • 2018-12-23
  • 2017-07-11
  • 2021-02-05
  • 1970-01-01
  • 2014-12-20
  • 2021-06-02
  • 1970-01-01
  • 2019-06-09
相关资源
最近更新 更多