【发布时间】:2019-02-25 10:44:31
【问题描述】:
鉴于这个传奇:
@Saga
@Getter
@Slf4j
public class TasksForStateSaga {
@Autowired
transient CommandGateway commandGateway;
@Autowired
transient EventBus eventBus;
@Autowired
transient TaskService taskService;
Map<String, TaskStatus> tasks = new HashMap<>();
ApplicationState applicationState;
@StartSaga
@SagaEventHandler(associationProperty = "id")
public void on(ApplicationStateChangedEvent event) {
applicationState = event.getNewState();
log.info("Planning tasks for application {} in state {}", event.getId(), applicationState);
taskService.getTasksByState(applicationState).stream()
.map(task -> ScheduleTaskCommand.builder()
.applicationId(event.getId())
.taskId(IdentifierFactory.getInstance().generateIdentifier())
.targetState(applicationState)
.taskName(task.getTaskName())
.build())
.peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
.forEach(command -> commandGateway.send(command));
}
@SagaEventHandler(associationProperty = "applicationId")
public void on(TaskFinishedEvent event) {
tasks.replace(event.getTaskId(), FINISHED);
long notFinished = getUnfinishedCount();
log.info("Task {} has just finished, ready {} of {}", event.getTaskName(), tasks.size() - notFinished, tasks.size());
if (notFinished == 0) {
log.info("All tasks for application {}.{} finished, ending this saga", event.getApplicationId(), applicationState);
eventBus.publish(GenericEventMessage.asEventMessage(
TaskForStateDoneEvent.builder()
.applicationId(event.getApplicationId())
.state(applicationState)
.build()
));
SagaLifecycle.end();
}
}
private long getUnfinishedCount() {
return tasks.values().stream()
.filter(state -> !FINISHED.equals(state))
.count();
}
}
并让这个测试(Spock)测试第一种方法:
class TasksForStateSagaTest extends Specification {
SagaTestFixture sagaFixture
def setup() {
sagaFixture = new SagaTestFixture<>(TasksForStateSaga)
}
def 'should schedule task for the application state'() {
given:
def applicationId = '1'
def taskService = Mock(TaskService)
def tasks = [
ApplicationStateAwareTaskDefinition.builder().taskName('task1').build(),
ApplicationStateAwareTaskDefinition.builder().taskName('task2').build(),
]
sagaFixture.registerResource(taskService)
sagaFixture.givenAggregate(applicationId)
when:
sagaFixture
.whenPublishingA(new ApplicationStateChangedEvent(id: applicationId, newState: ApplicationState.NEW))
.expectActiveSagas(1)
.expectDispatchedCommandsMatching(payloadsMatching(
exactSequenceOf(
equalTo(new ScheduleTaskCommand(applicationId: applicationId, targetState: ApplicationState.NEW, taskName: 'task1'),
new IgnoreField(ScheduleTaskCommand, 'taskId')),
equalTo(new ScheduleTaskCommand(applicationId: applicationId, targetState: ApplicationState.NEW, taskName: 'task2'),
new IgnoreField(ScheduleTaskCommand, 'taskId')),
andNoMore()
)
))
then:
1 * taskService.getTasksByState(ApplicationState.NEW) >> tasks
}
}
但我实际上不知道如何测试第二种方法,它使用 Saga 的内部状态。
谁能建议我如何通过 SagaTestFixture 设置内部传奇的状态? 甚至更多,这是实现此类 saga 的好方法,还是我有一些概念问题阻止我轻松测试 end saga 方法?
@StartSaga 方法设置内部状态 - 生成 taskId 并将其设置为 map @EndSaga 方法读取地图并检查所有任务是否在发送 TaskForStateDoneEvent 事件之前完成
谢谢!
【问题讨论】:
标签: axon