【问题标题】:How to create queues in qpid with RestTemplate?如何使用 RestTemplate 在 qpid 中创建队列?
【发布时间】:2019-12-14 02:24:50
【问题描述】:

我正在尝试为使用 RabbitMQ 的应用程序编写集成测试,为此我正在使用 Qpid 代理。我设法启动了服务器并且我的测试正在连接到它,但是我需要在启动之前在 Qpid 中创建队列。 因为我有大量的队列,所以我动态创建bean:

applicationContext.getBeanFactory().registerSingleton(queueName, queue);

这需要在启动之前创建队列。

这是 qpid 配置文件:

{
  "name": "tst",
  "modelVersion": "2.0",
  "defaultVirtualHost" : "default",
  "authenticationproviders" : [ {
    "name" : "noPassword",
    "type" : "Anonymous",
    "secureOnlyMechanisms": []
        },
    {
      "name" : "passwordFile",
      "type" : "PlainPasswordFile",
      "path" : "/src/test/resources/passwd.txt",
      "secureOnlyMechanisms": [],
      "preferencesproviders" : [{
        "name": "fileSystemPreferences",
        "type": "FileSystemPreferences",
        "path" : "${qpid.work_dir}${file.separator}user.preferences.json"
        }
      ]
    }
   ],
  "ports" : [
    {
      "name": "AMQP",
      "port": "5673",
      "authenticationProvider": "passwordFile",
      "protocols": [
        "AMQP_0_10",
        "AMQP_0_8",
        "AMQP_0_9",
        "AMQP_0_9_1"
      ]
    }],
  "virtualhostnodes" : [ {
    "name" : "default",
    "type" : "JSON",
    "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
  }]

}

来自官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130) 我读到可以为 REST 调用创建队列,所以我尝试使用 RestTemplate 来实现这一点,但它似乎没有创建队列。

    @BeforeClass
    public static void startup() throws Exception {
        brokerStarter = new BrokerManager();
        brokerStarter.startBroker();

        RestTemplate restTemplate = new RestTemplate();
        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");
        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");
    }

谁能解释我做错了什么?谢谢!

【问题讨论】:

    标签: java spring-boot rabbitmq qpid


    【解决方案1】:

    我使用 REST API 解决了同样的问题。为了创建/删除队列以进行集成测试,我使用以下配置文件 (qpid-config.json):

    {
      "name": "EmbeddedBroker",
      "modelVersion": "8.0",
      "authenticationproviders": [
        {
          "name": "anonymous",
          "type": "Anonymous"
        }
      ],
      "ports": [
        {
          "name": "AMQP",
          "bindingAddress": "localhost",
          "port": "${qpid.amqp_port}",
          "protocols": [ "AMQP_1_0" ],
          "authenticationProvider": "anonymous",
          "virtualhostaliases" : [ {
            "name" : "nameAlias",
            "type" : "nameAlias"
          }, {
            "name" : "defaultAlias",
            "type" : "defaultAlias"
          }, {
            "name" : "hostnameAlias",
            "type" : "hostnameAlias"
          } ]
        },
        {
          "name" : "HTTP",
          "port" : "${qpid.http_port}",
          "protocols" : [ "HTTP" ],
          "authenticationProvider" : "anonymous"
        }
      ],
      "virtualhostnodes": [
        {
          "name": "default",
          "defaultVirtualHostNode": "true",
          "type": "Memory",
          "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
        }
      ],
      "plugins" : [
        {
          "type" : "MANAGEMENT-HTTP",
          "name" : "httpManagement"
        }
      ]
    }
    

    相关的 Gradle 依赖项:

        testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0
        testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")
        testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")
        testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")
    
        testImplementation("org.springframework.boot:spring-boot-starter-webflux")
        testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")
    

    启动代理的代码 (Kotlin):

        private fun startQpidBroker() {
            val attributes: MutableMap<String, Any> = HashMap()
            val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!
            attributes["type"] = "Memory"
            attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()
            attributes["startupLoggedToSystemOut"] = true
            System.setProperty("qpid.amqp_port", "$amqpPort")
            System.setProperty("qpid.http_port", "$httpPort")
            // needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues
            System.setProperty("qpid.tests.mms.messagestore.persistence", "true")
            broker.startup(attributes)
        }
    

    删除/创建队列的代码:

        private fun recreateQueue(queueName: String) {
            val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");
            try {
                client.method(HttpMethod.DELETE)
                        .uri("/api/latest/queue/default/$queueName")
                        .retrieve()
                        .toBodilessEntity()
                        .block()
                        .statusCode
            } catch (e: WebClientResponseException) {
                if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable
                    throw e
                }
            }
    
            client.method(HttpMethod.PUT)
                    .uri("/api/latest/queue/default/default/$queueName")
                    .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                    .body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))
                    .retrieve()
                    .toBodilessEntity()
                    .block()
                    .statusCode
        }
    

    【讨论】:

      【解决方案2】:

      我设法通过使用连接工厂解决了这个问题:

                  @Autowired
                  ConnectionFactory factory;
      
                  ....
                  factory.setHost("localhost");
                  factory.setPort(qpid_server_port);
                  try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
                      String queue = "queue-x";
                      channel.queueDeclare(queue, true, false, false, null);
                      //channel.queueBind(queue, "exchange-x" , "routing-key-x");
      
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
      

      【讨论】:

        猜你喜欢
        • 2022-01-20
        • 2018-10-26
        • 2019-06-25
        • 1970-01-01
        • 1970-01-01
        • 2018-10-28
        • 2019-08-19
        • 1970-01-01
        • 2011-04-22
        相关资源
        最近更新 更多