【问题标题】:Unit Testing Spring Stream Kafka单元测试 Spring Stream Kafka
【发布时间】:2020-07-07 14:53:34
【问题描述】:

我正在尝试使用 spring boot 对 kafka 的使用进行单元测试,但输入通道有问题。以下是我正在做的事情的摘录。

public interface MyCustomStreamBinding{
   @Input
   SubscribableChannel consumeChannel();

   @Output
   MessageChannel produceChannel();
}

@EnableBinding(value = { Source.class, MyCustomStreamBinding.class })
public class StreamConfiguration {
...
}

@Service
public class MyService {

  private final MyCustomStreamBinding streamBinding;
  public MyService(MyCustomStreamBinding streamBinding) {
    this.streamBinding = streamBinding;
  }

  public void sendMessage() {
    streamBinding.produceChannel().send(new SomeObject);
  }

  @StreamListener("consumeChannel")
  public void consumeChannel(SomeObject payload){
    // do processing of payload
  }
}

然后在我的测试用例中我有

@SpringBootTest(classes = {MyApp.class})
class MyServiceTest {
  private MyService myService;

  @Autowired
  private MyCustomStreamBinding streamBinding;
  @Autowired
  private MessageCollector messageCollector;

  @BeforeEach
  public void setup(){
    myService = new MyService(streamBinding);
  }

  @Test
  public void TestMessaging(){
   myService.sendMessage();

   Message<?> m = messageCollector.forChannel(streamBinding.produceChannel()).poll();
   assertThat(m.getPayload(), equalTo(new SomeObject()));
  }
}

如何测试 consumeChannel 以及它是否按预期实际执行了处理?

【问题讨论】:

    标签: spring spring-boot junit mockito spring-kafka


    【解决方案1】:

    这里我有一个示例,它由 2 个侦听器组成,用于使用数据和生成数据。与@SpringBootTest 一起,您可以使用@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"}) 禁用Web 服务。然后使用JUnit5@ExtendWith(SpringExtension.class),使用嵌入式kafka集群@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)

    采用这个简单的服务,它在侦听器process-in-0 上接收数据,将其转换为大写并在侦听器process-out-0 上发出新数据。

    public interface KafkaListenerBinding {
        @Input("process-in-0")
        KStream<String, String> inputStream();
    
        @Output("process-out-0")
        KStream<String, String> outStream();
    }
    
    @Service
    @EnableBinding(KafkaListenerBinding.class)
    public class KafkaListenerService {
    
        @StreamListener("process-in-0")
        @SendTo("process-out-0")
        public KStream<String, String> transformToUpperCase(KStream<String, String> input) {
            input.peek((k, v) -> log.info("Received Input: {}", v));
            return input.mapValues(v -> v.toUpperCase());
        }
    }
    

    使用嵌入式 kafka 集群对其进行测试。请注意,实际的 kafka claster 不一定可用。然后就可以使用brokers: ${spring.embedded.kafka.brokers}这个属性了。

    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"})
    @ExtendWith(SpringExtension.class)
    @EmbeddedKafka(topics = {"output-topic"}, partitions = 1)
    @TestPropertySource(properties = {
            "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
            "spring.kafka.admin.properties.bootstrap.servers=${spring.embedded.kafka.brokers}"
    })
    public class KafkaListenerServiceTest {
    
        @Autowired
        EmbeddedKafkaBroker embeddedKafkaBroker;
        @SpyBean
        KafkaListenerService kafkaListenerServiceSpy;
        private Consumer<String, String> consumer;
    
        @BeforeEach
        public void setUp() {
            Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("group1", "true", embeddedKafkaBroker));
            consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
            embeddedKafkaBroker.consumeFromAllEmbeddedTopics(consumer);
        }
    
        @AfterEach
        public void tearDown() {
            consumer.close();
        }
    
        @Test
        public void SimpleProcessorApplicationTest() throws ExecutionException, InterruptedException {
            Set<String> actualResultSet = new HashSet<>();
            Set<String> expectedResultSet = new HashSet<>();
            expectedResultSet.add("HELLO1");
            expectedResultSet.add("HELLO2");
    
            Map<String, Object> senderProps = producerProps(embeddedKafkaBroker);
            DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
            try {
                KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
                template.setDefaultTopic("input-topic");
    
                template.sendDefault("hello1").get();
                verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));
    
                template.sendDefault("hello2").get();
                verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));
    
                int receivedAll = 0;
                while (receivedAll < 2) {
                    ConsumerRecords<String, String> cr = getRecords(consumer);
                    receivedAll = receivedAll + cr.count();
                    cr.iterator().forEachRemaining(r -> {
                        System.out.println("result: " + r.value());
                        actualResultSet.add(r.value());
                    });
                }
    
                assertThat(actualResultSet.equals(expectedResultSet)).isTrue();
            } finally {
                pf.destroy();
            }
        }
    }
    

    并像这样配置您的 application.yml 文件,并确保不使用 schema.registry.url: not-used 启用架构注册表:

    spring:
      kafka:
        consumer:
          group-id: group-01
      cloud:
        stream:
          bindings:
            process-in-0:
              destination: input-topic
            process-out-0:
              destination: output-topic
            notification-input-channel:
              destination: pos-topic
          kafka:
            streams:
              binder:
                brokers: ${spring.embedded.kafka.brokers}
                configuration:
                  schema.registry.url: not-used
                  commit.interval.ms: 100
                  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              bindings:
                process-in-0:
                  consumer:
                    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                process-out-0:
                  producer:
                    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    ---
    
    

    【讨论】:

      猜你喜欢
      • 2018-11-10
      • 1970-01-01
      • 1970-01-01
      • 2017-09-05
      • 1970-01-01
      • 1970-01-01
      • 2015-09-06
      • 2018-04-28
      • 1970-01-01
      相关资源
      最近更新 更多