【发布时间】:2021-05-19 22:03:56
【问题描述】:
我有一个接受@Payload 字符串的@StreamListener。 为了测试这个 Listener 类,我使用嵌入式 Kafka 编写了一个 Junit 类。 运行测试类时出现以下错误
错误
错误 o.s.i.handler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException: 无法将 GenericMessage 从 [[B] 转换为 [java.lang.String]
如果我将 @Payload 的数据类型从 String 更改为 byte[] 消息将被我的侦听器类拾取。
有人可以帮我知道这里的问题吗? 我想这与云流配置有关。
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
topics = {
"input",
"output"})
public class TestUtils {
public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@BeforeEach
public void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
}
@Test
public void someTest() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(KEY_SERIALIZER, StringSerializer.class);
senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
template.setDefaultTopic("input");
template.sendDefault("foo");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"input_group",
"false",
this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("output"));
ConsumerRecords<String, String> records = consumer.poll(10_000);
consumer.commitSync();
Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
这是我的 application.yaml 的样子。
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enable-dlq: true
dlq-name: output
dlq-producer-properties:
retries: 1
binder:
brokers: ${spring.embedded.kafka.brokers}
replicationFactor: ${replication_factor:1}
autoCreateTopics: true
autoAddPartitions: true
configuration:
retries: 1
batch.size: 16384
linger.ms: 1
enable.idempotence: true
buffer.memory: 33554432
request.timeout.ms: 3000
transaction.timeout.ms: 3000
max.block.ms: ${kafka_max_block_time:5000}
max.poll.records: 80
poll.timeout: 10000
commit.retries: 1
commit.retry.interval: 1000
session.timeout.ms.config: 50000
shutdown.signal: INT,TERM
acks: "all"
bindings:
output:
destination: output
contentType: application/json
producer:
partitionCount: ${partition_count:1}
input:
destination: input
contentType: application/json
partitioned: true
group: input_group
【问题讨论】:
标签: java apache-kafka spring-cloud-stream junit5 embedded-kafka