【问题标题】:Subscribing to an Azure Service Bus Topic with Spring Boot and AMQP使用 Spring Boot 和 AMQP 订阅 Azure 服务总线主题
【发布时间】:2017-08-15 01:52:59
【问题描述】:

我设置了一个名为“state-changed”的 Azure 服务总线主题,它有一个名为“reverb”的订阅。我正在尝试使用@JmsListener 设置一个方法来订阅该主题,但出现错误:

2017-03-22 18:34:41.049  WARN 23356 --- [enerContainer-6] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'state-changed' - trying to recover. Cause: The messaging entity 'sb://[MySERVICEBUS].servicebus.windows.net/state-changed' could not be found. TrackingId:d2b442f79e0f44bdb449861ea57155ce_G44, SystemTracker:gateway6, Timestamp:3/22/2017 6:34:37 PM

javax.jms.JMSException: The messaging entity 'sb://[MySERVICEBUS].servicebus.windows.net/state-changed' could not be found. TrackingId:d2b442f79e0f44bdb449861ea57155ce_G44, SystemTracker:gateway6, Timestamp:3/22/2017 6:34:37 PM
    at org.apache.qpid.amqp_1_0.jms.impl.TopicSubscriberImpl.createClientReceiver(TopicSubscriberImpl.java:111) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.<init>(MessageConsumerImpl.java:129) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.TopicSubscriberImpl.<init>(TopicSubscriberImpl.java:46) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:544) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:59) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:870) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:215) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1189) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1165) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_77]

我一直在使用这篇博文来尝试让一切正常运行:http://ramblingstechnical.blogspot.co.uk/p/using-azure-service-bus-with-spring-jms.html

我可以使用 JmsTemplate 向主题添加消息,并使用 Azure 文档中概述的普通旧 Java JMS 库从中读取消息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-jms-api-amqp 所以我知道我的主题确实有效并且可以访问,它似乎只是当我用 Spring 配置它时,我做错了什么。

我的配置如下:

@Configuration
public class JmsConfiguration
{

    @Bean
    public JmsListenerContainerFactory topicJmsListenerContainerFactory() throws NamingException
    {
        DefaultJmsListenerContainerFactory returnValue = new DefaultJmsListenerContainerFactory();

        Context context = context();
        ConnectionFactory cf = connectionFactory(context);

        returnValue.setConnectionFactory(cf);
        returnValue.setSubscriptionDurable(Boolean.TRUE);
        return returnValue;
    }

    private Context context() throws NamingException
    {
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
        env.put(Context.PROVIDER_URL, "src/main/resources/servicebus.properties");
        Context context = new InitialContext(env);
        return context;
    }



    /**
     * @param context
     * @return
     * @throws NamingException
     */
    private ConnectionFactory connectionFactory(Context context) throws NamingException
    {
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        return cf;
    }

}

servicebus.properties(用户名和密码等已编辑):

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF=amqps://[USER]:[PASSWORD]@[MYSERVICEBUS]

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.workflow = workflow
topic.state-changed = stage-changed

最后是我的监听类:

@Component
public class TestListener
{
    Logger logger = LoggerFactory.getLogger(LoggingWorkflowEventHandler.class);

    @JmsListener(destination = "state-changed", containerFactory = "topicJmsListenerContainerFactory", subscription = "reverb")
    public void onMessage(String message)
    {
        logger.info("Received message from topic: {}", message);
    }
}

如果有人设法让这个工作,我会很感激一些指点。

【问题讨论】:

  • 您遇到了什么问题?根据您的配置代码,env.put(Context.PROVIDER_URL, "src/main/resources/servicebus.properties"); 代码中的servicebus.properties 文件在运行中似乎没有加载。请检查并更改为classes根路径下的相对路径。
  • 其他组件使用相同的配置,它们似乎工作正常。我可以使用该配置向主题发送消息,只是我的侦听器不起作用。

标签: java spring azure azureservicebus


【解决方案1】:

如果您使用 Spring Boot,则可以使用准备好的 Azure ServiceBus JMS Spring Boot Starter,它可以开箱即用。

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-servicebus-jms-spring-boot-starter</artifactId>
    <version>2.3.5</version>
</dependency>

查看教程页面https://docs.microsoft.com/en-us/azure/developer/java/spring-framework/configure-spring-boot-starter-java-app-with-azure-service-bus

【讨论】:

    【解决方案2】:

    创建或更改 Trustrore:

    由于我们要建立与服务总线的安全 amqps 连接,我们需要将所有必需的 SSL 证书存储在信任库中。似乎现有的证书都不包含所需的证书,我 - 为了透明 - 创建了一个这样的新证书:

    通过访问https://&lt;URL-Of-Your-Servicebus&gt;获取所需的证书,例如https://XXXXX.servicebus.cloudapi.de 在浏览器中。然后单击 URL 中的“锁定”(或浏览器显示的任何安全连接)并从那里打开证书。

    保存当前证书:

    []

    当询问要导出的格式时,选择“DER binary”并将其保存为“.cer”文件,例如“1.cer”

    您很可能会看到您的证书基于证书链,这意味着它依赖于其他证书。对于每个点击“显示证书”:

    []

    并以与之前相同的方式保存该文件。重复直到获得根证书。在本例中,您将得到三个 *.cer 文件。为了进一步参考,我将它们称为 1.cer、2.cer 和 3.cer

    您现在应该为这些证书创建一个新的 Truststore 文件

    /opt/webMethods9/jvm/jvm/bin/keytool -import -file
    /opt/webMethods9/IntegrationServer/instances/default/packages/DEV_fse/resources/1.cer -keystore azureTruststore.jks -alias "D-TRUST Root Class 3 CA 2 2009"
    
    /opt/webMethods9/jvm/jvm/bin/keytool -import -file 
    /opt/webMethods9/IntegrationServer/instances/default/packages/DEV_fse/resources/2.cer -keystore azureTruststore.jks -alias "D-TRUST SSL Class 3 CA 1 2009"
    
    /opt/webMethods9/jvm/jvm/bin/keytool -import -file 
    /opt/webMethods9/IntegrationServer/instances/default/packages/DEV_fse/resources/3.cer -keystore azureTruststore.jks -alias "servicebus.cloudapi.de"
    

    您将被要求第一次为这个新创建的信任库设置密码。现在将信任库移动到/opt/webMethods9/IntegrationServer/config/certs/trusted(供以后参考)。您可以将它作为信任库添加到 IS(通过使用 admin-UI “Security > Keystore” 和“Create Truststore Alias”),但没有技术需要这样做,因为在我们的例子中 IS 没有使用信任库- 它只被 QPID 使用。

    为 JNDI 创建一个属性文件 您需要创建一个servicebus.properties 文件来充当伪 JNDI 服务器的数据源。从技术上讲,您可以将该文件放在您想要的任何位置,但我建议将其放在“XXXXXXConnection”包的“资源”文件夹中。这应该是该文件的内容:

    # servicebus.properties - sample JNDI configuration
    # Register a ConnectionFactory in JNDI using the form:
    # connectionfactory.[jndi_name] = [ConnectionURL]
    connectionfactory.SBCF = amqps://XXXXXX.servicebus.cloudapi.de?jms.username=xxxxx&jms.password=xxxxxxx&amqp.idleTimeout=120000&jms.receiveLocalOnly=true&transport.trustStoreLocation=/opt/webMethods9/IntegrationServer/config/certs/trusted/azureTruststore.jks
    # Register some queues in JNDI using the form 
    # queue.[jndi_name] = [physical_name] 
    # topic.[jndi_name] = [physical_name]
    queue.QUEUE = myqueue
    ​
    

    一些解释:​

    1. SBCF 将是连接工厂的 JNDI 查找名称。稍后在您的 JMS-Connection 中需要此名称
    2. xxxxxx.servicebus.cloudapi.de 是您的服务总线的 URL
    3. jms.username 将由您友好的 Azure 管理员提供
    4. jms.password 将由您友好的 Azure 管理员提供。但请注意,您需要对从管理员那里获得的内容进行 URL 编码,然后才能在此 URL 中使用它。例如,这可以通过在 Designer 中手动调用 IS 服务 pub.string:URLEncode 来完成。​​
    5. amqp.idleTimeout 需要设置为 120000(或更高),否则您无法连接到 SB
    6. jms.receiveLocalOnly 需要设置为 true 因为否则你无法连接到 SB
    7. transport.trustStoreLocation 需要将包含创建安全 (AMQPS) 连接所需的所有 SSL 证书的信任库指向 SB
    8. queue.QUEUE: QUEUE 是 JNDI-Lookup 名称,稍后您将在 JMS-Client 中使用来发送消息或在 JMS-Trigger 中使用来接收消息。你应该设置一个更有意义的东西。此值(示例中为“myqueue”)是 SB 上的队列名称,必须由 Azure 管理员提供。

    []

    仅有的两个重要值是:

    1. “初始上下文工厂”:org.apache.qpid.jms.jndi.JmsInitialContextFactory
    2. “Provider URL”:必须指向您创建的 servicebus.properties,例如file:/opt/webMethods9/IntegrationServer/instances/default/packages/XXXXXXConnection/resources/servicebus.properties

    【讨论】:

      【解决方案3】:

      您的错误消息表明未找到您的目的地名称(未找到消息实体)。 请注意,您需要以如下特定方式告诉 Azure 您的订阅名称:

      <TopicName>/Subscriptions/<SubscriptionName>
      

      在你的情况下:

      state-changed/Subscriptions/reverb
      

      希望有帮助

      干杯 赛博

      【讨论】:

      • 在 Azure 门户中为我的主题创建订阅后为我工作。
      猜你喜欢
      • 2016-02-10
      • 2016-12-29
      • 2018-08-27
      • 1970-01-01
      • 2017-10-23
      • 2018-07-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多