【问题标题】:How to listen to two RabbitMQ queues with spring-cloud-stream如何使用 spring-cloud-stream 监听两个 RabbitMQ 队列
【发布时间】:2021-06-07 13:22:27
【问题描述】:

我有一个正在工作的应用程序,它侦听单个 RabbitMQ 队列。

但是,当我添加另一个使用消息的 bean 并尝试将其绑定到另一个队列时,两个队列都不会在 RabbitMQ 中创建,并且在手动创建它们时,不会从这些队列中消耗任何消息。

我创建的小 kotlin 项目来演示这个问题:

@SpringBootApplication
class SpringCloudStreamTwoRabbitConsumersApplication

fun main(args: Array<String>) {
    runApplication<SpringCloudStreamTwoRabbitConsumersApplication>(*args)
}
package com.example.springcloudstreamtworabbitconsumers

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class Listener1Config {

    @Bean
    fun listener1(): Consumer<Message<String>> {
        return Consumer { input -> println(input) }
    }

}
package com.example.springcloudstreamtworabbitconsumers

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class Listener2Config {

    @Bean
    fun listener2(): Consumer<Message<String>> {
        return Consumer { input -> println(input) }
    }

}

application.properties:

# Rabbit properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Listener 1
spring.cloud.stream.bindings.listener1-in-0.destination=exchange1
spring.cloud.stream.bindings.listener1-in-0.group=exchange1-queue
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.binding-routing-key-delimiter=,
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.bindingRoutingKey=binding.key.1,binding.key.1.1
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.exchangeType=topic
spring.cloud.stream.rabbit.bindings.listener1-in-0.consumer.autoBindDlq=true

# Listener 2
spring.cloud.stream.bindings.listener2-in-0.destination=exchange2
spring.cloud.stream.bindings.listener2-in-0.group=exchange2-queue
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.binding-routing-key-delimiter=,
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.bindingRoutingKey=binding.key.2,binding.key.2.1
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.exchangeType=topic
spring.cloud.stream.rabbit.bindings.listener2-in-0.consumer.autoBindDlq=true

build.gradle.kts:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.4.3"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version "1.4.30"
    kotlin("plugin.spring") version "1.4.30"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_1_8

repositories {
    mavenCentral()
}

extra["springCloudVersion"] = "2020.0.1"

dependencies {
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-rabbit")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "1.8"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

当我注释掉其中一个侦听器 bean 时,另一个会按预期工作。但是,当两个 bean 都处于活动状态时,RabbitMQ 中不会创建队列,如果我手动创建队列并将消息发送到交换器,也不会从队列中读取消息。

我在这里做错了什么?

【问题讨论】:

    标签: spring-boot kotlin rabbitmq spring-cloud-stream


    【解决方案1】:

    框架只能检测单个功能。当你有多个时,你需要指定:

    spring.cloud.function.definition=listener1;listener2
    

    https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#spring_cloud_function

    如果您只有java.util.function.[Supplier/Function/Consumer] 类型的单个bean,您可以跳过spring.cloud.function.definition 属性,因为这样的功能bean 将被自动发现。但是,使用此类属性以避免任何混淆被认为是最佳做法。

    【讨论】:

      猜你喜欢
      • 2018-10-27
      • 2020-11-07
      • 2018-01-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-02-22
      • 2017-07-30
      • 1970-01-01
      相关资源
      最近更新 更多