【发布时间】:2019-04-28 17:52:09
【问题描述】:
我正在使用 NestJS CQRS 配方来管理两个实体之间的交互:用户和用户配置文件。该架构是一个 API Gateway NestJS 服务器 + 每个微服务(User、UserProfile 等)的 NestJS 服务器。
我已经通过 API Gateway 上的 User 和 UserProfile 模块使用他们自己的 sagas/events/commands 设置了基本交互:
- 创建用户时,会创建用户配置文件
- 当用户配置文件创建失败时,之前创建的用户被删除
详细说明:
在 User module 中,CreateUser command 引发一个 UserCreated event 被 User saga 拦截,这将触发 CreateUserProfile command(来自 UserProfile module)。
如果后者失败,UserProfileFailedToCreate event 会引发并被 UserProfile saga 拦截,这将触发 DeleteUser command(来自 User模块)。
一切正常。
如果 CreateUser 命令失败,我resolve(Promise.reject(new HttpException(error, error.status)) 向最终用户表明在用户创建过程中出现了问题。
我的问题是我无法为 CreateUserProfile 命令复制相同的行为,因为显然 HTTP 请求承诺已经从第一个命令中解决了。
所以我的问题是:如果后续命令在 saga 中失败,有什么方法可以使命令失败?我知道 HTTP 请求与 saga 触发的任何后续命令完全断开,但我想知道是否有人已经在这里玩过事件或其他东西来复制这个数据流?
我使用 CQRS 的原因之一是,除了为微服务之间的数据交互提供更简洁的代码之外,还能够在任何链接命令失败的情况下回滚存储库操作,这可以正常工作。 但我需要一种方法向最终用户表明该链遇到问题并已回滚。
UserController.ts
@Post('createUser')
async createUser(@Body() createUserDto: CreateUserDto): Promise<{user: IAuthUser, token: string}> {
const { authUser } = await this.authService.createAuthUser(createUserDto);
// this is executed after resolve() in CreateUserCommand
return {user: authUser, token: this.authService.createAccessTokenFromUser(authUser)};
}
UserService.ts
async createAuthUser(createUserDto: CreateUserDto): Promise<{authUser: IAuthUser}> {
return await this.commandBus
.execute(new CreateAuthUserCommand(createUserDto))
.catch(error => { throw new HttpException(error, error.status); });
}
CreateUserCommand.ts
async execute(command: CreateAuthUserCommand, resolve: (value?) => void) {
const { createUserDto } = command;
const createAuthUserDto: CreateAuthUserDto = {
email: createUserDto.email,
password: createUserDto.password,
phoneNumber: createUserDto.phoneNumber,
};
try {
const user = this.publisher.mergeObjectContext(
await this.client
.send<IAuthUser>({ cmd: 'createAuthUser' }, createAuthUserDto)
.toPromise()
.then((dbUser: IAuthUser) => {
const {password, passwordConfirm, ...publicUser} = Object.assign(dbUser, createUserDto);
return new AuthUser(publicUser);
}),
);
user.notifyCreated();
user.commit();
resolve(user); // <== This makes the HTTP request return its reponse
} catch (error) {
resolve(Promise.reject(error));
}
}
UserSagas.ts
authUserCreated = (event$: EventObservable<any>): Observable<ICommand> => {
return event$
.ofType(AuthUserCreatedEvent)
.pipe(
map(event => {
const createUserProfileDto: CreateUserProfileDto = {
avatarUrl: '',
firstName: event.authUser.firstName,
lastName: event.authUser.lastName,
nationality: '',
userId: event.authUser.id,
username: event.authUser.username,
};
return new CreateUserProfileCommand(createUserProfileDto);
}),
);
}
CreateUserProfileCommand.ts
async execute(command: CreateUserProfileCommand, resolve: (value?) => void) {
const { createUserProfileDto } = command;
try {
const userProfile = this.publisher.mergeObjectContext(
await this.client
.send<IUserProfile>({ cmd: 'createUserProfile' }, createUserProfileDto)
.toPromise()
.then((dbUserProfile: IUserProfile) => new UserProfile(dbUserProfile)),
);
userProfile.notifyCreated();
userProfile.commit();
resolve(userProfile);
} catch (error) {
const userProfile = this.publisher.mergeObjectContext(new UserProfile({id: createUserProfileDto.userId} as IUserProfile));
userProfile.notifyFailedToCreate();
userProfile.commit();
resolve(Promise.reject(new HttpException(error, 500)).catch(() => {}));
}
}
UserProfileSagas.ts
userProfileFailedToCreate = (event$: EventObservable<any>): Observable<ICommand> => {
return event$
.ofType(UserProfileFailedToCreateEvent)
.pipe(
map(event => {
return new DeleteAuthUserCommand(event.userProfile);
}),
);
}
DeleteUserCommand.ts
async execute(command: DeleteAuthUserCommand, resolve: (value?) => void) {
const { deleteAuthUserDto } = command;
try {
const user = this.publisher.mergeObjectContext(
await this.client
.send<IAuthUser>({ cmd: 'deleteAuthUser' }, deleteAuthUserDto)
.toPromise()
.then(() => new AuthUser({} as IAuthUser)),
);
user.notifyDeleted();
user.commit();
resolve(user);
} catch (error) {
resolve(Promise.reject(new HttpException(error, error.status)).catch(() => {}));
}
}
【问题讨论】:
-
没有实际代码,很难看出你在哪里做错了 - 也许你有一个“处理”错误的捕获,因此错误不会“链接”下来
-
您为什么不尝试先创建配置文件,然后再创建用户?换句话说,您以相反的顺序发送命令
-
@JaromandaX 我已经添加了代码。但为了确保我没有弄错:后续命令由事件触发这一事实意味着任何后续相关错误不能链接到初始 HTTP 请求,对吗?基本上我在这个问题上的观点不是某处存在捕获问题,而是技术上不可能链接后续错误,因此我正在寻求想法以其他方式完成......但也许我'我错了吗?
-
@ConstantinGalbenu 反转有什么意义?我将不得不考虑用户创建失败,然后删除之前创建的用户配置文件。另外,用户配置文件包含 userId 的外键。
-
@S.Dan 我明白了,我不确定这个框架是如何工作的,因为我从未使用过它,我使用自己的框架。那么,配置文件创建是在另一个进程/机器中运行的吗?
标签: javascript typescript domain-driven-design nestjs cqrs