【问题标题】:Apache Camel consuming from multiple queues on one ActiveMQ instanceApache Camel 从一个 ActiveMQ 实例上的多个队列中消费
【发布时间】:2020-04-01 17:30:01
【问题描述】:

我有一个 ActiveMQ 服务器实例,它运行多个数据源,如果消息消费失败,则将数据推送到两个队列 + 一个 DLQ。我使用 Apache Camel 来消费和处理来自这些队列的消息,并希望将其写入 InfluxDB。

但是,到目前为止,我未能让堆栈运行,以使 Apache Camel 并行消耗所有队列。我总是遇到这种错误:

ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port

如何让一个 Apache Camel 实例从多个队列中消费?

我尝试了两种方法:

目前,我的代码如下所示:

骆驼配置

@Configuration
public class CamelConfig {

    @Bean
    public ShutdownStrategy shutdownStrategy() {
        MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
        int                     timeout  = 1200;
        MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        strategy.setTimeout(timeout); // TODO make it configurable
        return strategy;
    }
}

ActiveMQ 客户端配置

@Configuration
public class ActiveMqClientConfig {

    @Bean
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://servername:61616");
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(passwd);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID("Influx Message Queue");
        connectionFactory.setConnectResponseTimeout(300);
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        return connectionFactory;
    }
}

流入配置

@Configuration
public class InfluxDBClientConfig {

    @Bean
    public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
        return () -> {
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            Builder builder = new OkHttpClient.Builder() //
                    .readTimeout(1200, TimeUnit.SECONDS) //
                    .writeTimeout(1200, TimeUnit.SECONDS) //
                    .connectTimeout(1200, TimeUnit.SECONDS) //
                    .retryOnConnectionFailure(true);
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            return builder;
        };
    }
}

具有多个路由的组件:

@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
    @Autowired
    private FrameworkConfig frameworkConfig;

    @Override
    public void configure() throws Exception {
        String consumerQueueq = "activemq:queue:queue1?"               //
                + "brokerURL=tcp://ip:port";
        String consumerActiveMqDLQ    = "activemq:queue:ActiveMQ.DLQ?"                     //
                + "brokerURL=tcp://ip:port";
        String consumerQueue2           = "activemq:queue:queue2?"                     //
                + "brokerURL=tcp://ip:port";
        String emitterInfluxDB        = "influxdb://influxDb?databaseName=databaseName"          
                + "&batch=true"                                                            //
                + "&retentionPolicy=retentionPolicy"
        String emitterStreamOut       = "stream:out";

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerCryringInbound) //   
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerActiveMqDLQ) //
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from olog_inbound to olog
        //************************************************************************
        from(consumerOlog) //
                .process(messagePayload -> {
                    System.out.println(messagePayload.getIn());
                }) //
                .to(emitterStreamOut);
    }
}

【问题讨论】:

    标签: java apache-camel activemq


    【解决方案1】:

    只有一个客户可以使用ClientID。它们必须是唯一的,而不是您可能想要手动设置的东西。另一种选择可能是设置ClientIDPrefix,以便更好地识别哪个应用正在使用。

    【讨论】:

    • 嗨,亚当,我尝试为每个“消费者”队列提供单独的客户端 ID(个人名称),并尝试在 ActiveMQ 配置中设置客户端 ID(UUID)。不过,我得到了同样的错误。我必须将各个路线(from.to)放在各个类中吗?
    • 您如何尝试为每个消费者提供单独的 clientID ?分享一些代码
    • 是的,我确实在定义消费者队列的三个字符串中设置了单独的客户端 ID
    • 所以错误一定不一样吧?您应该看到“客户端:your_new_unique_clientid 已从 tcp 连接”
    【解决方案2】:

    Camel Multiple Consumers Implementation Issue 建议的相反(一个@Component 中有多个from().to() 路由),我设法通过将路由拆分为单独的组件来完成这项工作,每个组件都有一个单独的clientId。此外,我用 UUID 替换了 ActiveMQ 配置中的静态客户端 ID。代码:

    ActiveMQ 配置:

    @Configuration
    public class ActiveMqClientConfig {
    
        @Bean
        public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
            MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL("tcp://servername:port");
            connectionFactory.setUserName(username);
            connectionFactory.setPassword(password);
            connectionFactory.setUseAsyncSend(false);
            connectionFactory.setClientID(UUID.randomUUID().toString());
            connectionFactory.setConnectResponseTimeout(300);
            MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            return connectionFactory;
        }
    }
    

    路由组件:

    @Component
    public class ActiveMqToInfluxRoute extends RouteBuilder {
        @Autowired
        private FrameworkConfig frameworkConfig;
    
        @Override
        public void configure() throws Exception {
            String consumerCryringInbound = "activemq:queue:queue1?"
                    + "brokerURL=tcp://activemq-server-ip:port" 
                    + "clientId=clientid1";
    
            String emitterInfluxDB = "influxdb://influxDb?databaseName=influx_db_name"
                    + "&batch=true"                                                           
                    + "&retentionPolicy=retentionPolicy";
    
            String emitterStreamOut       = "stream:out";
    
            //************************************************************************
            // Data from cryring_db_inbound to InfluxDB
            //************************************************************************
            from(consumerCryringInbound)    
                    .process(processor code)
                    .to(emitterInfluxDB)
                    .onException(Exception.class) 
                    .useOriginalMessage()
                    .handled(true) 
                    .log("error") 
                    .to(emitterStreamOut);
        }
    }
    

    ...对于其他路由类似,每个路由都有单独的 clientId。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-02-02
      • 1970-01-01
      • 1970-01-01
      • 2022-01-23
      • 1970-01-01
      • 2014-01-22
      • 2012-01-03
      相关资源
      最近更新 更多