【发布时间】:2020-07-22 18:11:54
【问题描述】:
几天前我问了一个问题,关于从 kafka.send() 方法中存根未来的响应。 @kriegaex here 正确回答并解释了这一点 虽然我遇到了另一个问题,但我如何测试这个未来响应的 onSuccess 和 onFailure 回调。这是正在测试的代码。
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class KakfaService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final LogService logService;
public KakfaService(KafkaTemplate kafkaTemplate, LogService logService){
this.kafkaTemplate = kafkaTemplate;
this.logService = logService;
}
public void sendMessage(String topicName, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LogDto logDto = new LogDto();
logDto.setStatus(StatusEnum.SUCCESS);
logService.create(logDto)
}
@Override
public void onFailure(Throwable ex) {
LogDto logDto = new LogDto();
logDto.setStatus(StatusEnum.FAILED);
logService.create(logDto)
}
});
}
}
这是测试代码
import com…….KafkaService
import com…….LogService
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
import org.springframework.util.concurrent.ListenableFuture
import org.springframework.util.concurrent.ListenableFutureCallback
import org.springframework.util.concurrent.SettableListenableFuture
import spock.lang.Specification
public class kafaServiceTest extends Specification {
private KafkaTemplate<String, String> kafkaTemplate;
private KafkaService kafaService;
private SendResult<String, String> sendResult;
private SettableListenableFuture<SendResult<?, ?>> future;
private RecordMetadata recordMetadata
private String topicName
private String message
def setup() {
topicName = "test.topic"
message = "test message"
sendResult = Mock(SendResult.class);
future = new SettableListenableFuture<>();
recordMetadata = new RecordMetadata(new TopicPartition(topicName, 1), 1L, 0L, 0L, 0L, 0, 0);
kafkaTemplate = Mock(KafkaTemplate.class)
logService = Mock(LogService.class)
kafkaSservice = new KafkaSservice(kafkaTemplate, logService);
}
def "Test success send message method"() {
given:
sendResult.getRecordMetadata() >> recordMetadata
ListenableFutureCallback listenableFutureCallback = Mock(ListenableFutureCallback.class);
listenableFutureCallback.onFailure(Mock(Throwable.class))
future.addCallback(listenableFutureCallback)
when:
kafkaService.sendMessage(topicName, message)
then:
1 * kafkaTemplate.send(_ as String, _ as String) >> future
// test success of failed callbacks
}
}
我已经尝试了以下文章,但无济于事,我可能对这个工具的使用有误解。
- https://stackoverflow.com/a/56677098/13258992
- https://stackoverflow.com/a/56677098/13258992
- https://www.baeldung.com/mockito-callbacks
- https://www.javatips.net/api/org.springframework.util.concurrent.successcallback
更新:部分工作
我能够通过使用在回调中点击 onSuccess 和 onFailure
future.set(sendResult) 和 future.setException(new Throwable()) 分别(感谢 @GarryRussell 回答 here)。但问题是验证onSuccess 和onFailure 方法的行为。对于示例,我有一个日志对象实体,我在其中保存状态(成功或失败),对此行为的断言始终返回 true。这是成功场景的更新测试代码。
def "Test success send message method"() {
given:
sendResult.getRecordMetadata() >> recordMetadata
future.set(sendResult)
when:
kafkaService.sendMessage(topicName, message)
then:
1 * kafkaTemplate.send(_ as String, _ as String) >> future
1 * logService.create(_) >> {arguments ->
final LogDto logDto = arguments.get(0)
// this assert below should fail
assert logDto.getStatus() == LogStatus.FAILED
}
}
我观察到的另一件事是,当我运行代码协变时,onSuccess 和 onFailure 回调方法的右花括号上仍然有一个红色代码指示。
【问题讨论】:
-
您似乎没有在任何地方注入您的日志服务模拟。因此应用程序与它没有连接,因此无法调用它。您的示例服务类也不包含使用日志服务的任何线索。那么服务在哪里发挥作用呢?就像我在上一个问题中解释的那样,您没有提供MCVE,所以我无法重现您的问题。有很多代码,好吧。但是具有讽刺意味的是,您遇到问题的部分丢失了。如果您愿意更新问题并通知我,我会再看一下。 :-)
-
同样在您的第二个变体中,
kafkaStreamGateway突然来自哪里?是否与前面示例中的kafkaService相同? MCVE 意味着您创建一个独立的示例,在应用程序上下文之外单独运行它,以检查您是否忘记了任何依赖项,然后将其发布在此处。否则很容易忽略一些东西。作为替代方案,您可以将 MCVE 推送到 GitHub(包括 Maven 或 Gradle 构建配置),然后在此处发布链接。那会更好。 -
嗨@kriegaex,我更新了这个问题,我在 setup() 方法中注入了
logService。 kafkaStreamGateway 是即时测试的另一个包装函数。我现在从示例中删除它:) -
是的,但我仍然看不到日志服务或它在您的应用程序类中的使用方式。
-
嗨@kriegaex 我更新了正在测试的类,抱歉,如果这里有很多更改,该类还有其他依赖项和逻辑,但我试图只提供主要功能。目前我正在测试它的主要功能是使用带有回调的 kafkatemplate 发送。
标签: unit-testing groovy spock spring-kafka