【发布时间】:2022-06-05 06:02:43
【问题描述】:
如何保证 Reactor Netty 中请求的线性化?
理论:
鉴于:
请求 A 想写 x=2, y=0
请求 B 想读 x, y 写 x=x+2, y=y+1
请求 C 想读 x 写 y=x
所有请求都被异步处理并立即返回客户端,状态为 ACCEPTED。
例子:
按顺序发送请求 A、B、C。
示例日志输出:(请求、线程名称、x、y)
请求 A,nioEventLoopGroup-2-0,x=2,y=0
请求 C,nioEventLoopGroup-2-2,x=2,y=2
请求 B,nioEventLoopGroup-2-1,x=4,y=3
业务逻辑要求 A 之后的所有读取都看到 x=2 和 y=0。
并请求 B 查看 x=2, y=0 并设置 y=1。
并请求 C 查看 x=4 并设置 y=4。
简而言之:业务逻辑使每个下一个写操作都依赖于之前要完成的写操作。否则操作是不可逆的。
示例代码
文档:
@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
@Id
private String id;
private int data;
public Event withNewId() {
setId(UUID.randomUUID().toString());
return this;
}
}
回购:
public interface EventRepository extends ReactiveMongoRepository<Event, String> {}
控制器:
@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
private final EventRepository repo;
@PostMapping
public Mono<String> create(Event event) {
return Mono.just(event.withNewId().getId())
.doOnNext(id ->
// do query based on some logic depending on event data
Mono.just(someQuery)
.flatMap(query ->
repo.find(query)
.map(e -> event.setData(event.getData() + e.getData())))
.switchIfEmpty(Mono.just(event))
.flatMap(e -> repo.save(e))
.subscribeOn(Schedulers.single())
.subscribe());
}
}
它不起作用,但使用subscribeOn 我试图保证线性化。这意味着并发请求 A 和 B 将始终按照服务器接收它们的顺序将其有效负载写入数据库。因此,如果另一个并发请求 C 是先读后写的复合,它将从 DB 中读取反映请求 B 而不是 A 的更改,并根据 B 写入自己的更改。
Reactor Netty 中是否有办法使用未绑定的 FIFO 队列调度执行程序,以便我可以按顺序异步处理请求?
【问题讨论】:
-
如果您可以共享一些代码以更好地理解问题,那就太好了,但从第一眼看,您可以使用保证按顺序处理的反应器运算符。例如,
concatMap不引入任何并发。 -
@Alex我试图让回答问题变得更简单
标签: spring spring-webflux netty reactor reactor-netty