【问题标题】:Linearization in Reactor Netty (Spring Boot Webflux)Reactor Netty 中的线性化(Spring Boot Webflux)
【发布时间】:2022-06-05 06:02:43
【问题描述】:

如何保证 Reactor Netty 中请求的线性化?

理论:

鉴于:
请求 A 想写 x=2, y=0
请求 B 想读 x, y 写 x=x+2, y=y+1
请求 C 想读 x 写 y=x
所有请求都被异步处理并立即返回客户端,状态为 ACCEPTED。

例子:
按顺序发送请求 A、B、C。

示例日志输出:(请求、线程名称、x、y)
请求 A,nioEventLoopGroup-2-0,x=2,y=0
请求 C,nioEventLoopGroup-2-2,x=2,y=2
请求 B,nioEventLoopGroup-2-1,x=4,y=3

业务逻辑要求 A 之后的所有读取都看到 x=2 和 y=0。
并请求 B 查看 x=2, y=0 并设置 y=1。
并请求 C 查看 x=4 并设置 y=4。

简而言之:业务逻辑使每个下一个写操作都依赖于之前要完成的写操作。否则操作是不可逆的。

示例代码

文档:

@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {

    @Id
    private String id;

    private int data;

    public Event withNewId() {
        setId(UUID.randomUUID().toString());
        return this;
    }

}

回购:

public interface EventRepository extends ReactiveMongoRepository<Event, String> {}

控制器:

@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
    private final EventRepository repo;

    @PostMapping
    public Mono<String> create(Event event) {
        return Mono.just(event.withNewId().getId())
                   .doOnNext(id -> 
                       // do query based on some logic depending on event data
                       Mono.just(someQuery)
                           .flatMap(query ->                        
                               repo.find(query)
                                   .map(e -> event.setData(event.getData() + e.getData())))
                           .switchIfEmpty(Mono.just(event))
                           .flatMap(e -> repo.save(e))
                           .subscribeOn(Schedulers.single())
                           .subscribe());
    }

}

它不起作用,但使用subscribeOn 我试图保证线性化。这意味着并发请求 A 和 B 将始终按照服务器接收它们的顺序将其有效负载写入数据库。因此,如果另一个并发请求 C 是先读后写的复合,它将从 DB 中读取反映请求 B 而不是 A 的更改,并根据 B 写入自己的更改。

Reactor Netty 中是否有办法使用未绑定的 FIFO 队列调度执行程序,以便我可以按顺序异步处理请求?

【问题讨论】:

  • 如果您可以共享一些代码以更好地理解问题,那就太好了,但从第一眼看,您可以使用保证按顺序处理的反应器运算符。例如,concatMap 不引入任何并发。
  • @Alex我试图让回答问题变得更简单

标签: spring spring-webflux netty reactor reactor-netty


【解决方案1】:

我不认为这特别针对 Netty 或 Reactor,而是针对更广泛的主题——如何处理乱序消息传递和多次消息传递。几个问题:

  1. 客户端是否总是以相同的顺序发送相同数量的请求?由于网络问题,请求总是有可能无序到达,或者一个或多个可能丢失。
  2. 客户端是否进行重试?如果同一个请求两次到达服务器会发生什么?
  3. 如果顺序很重要,为什么客户端在发出第 n 个请求之前不等待第 n-1 个请求的结果?换句话说,为什么会有很多并发请求?

    我会尝试以这样一种方式重新设计操作,即有一个请求以所需的顺序在后端执行操作,并在必要时在此处使用并发来加速流程。

    如果不可能,例如,您不控制客户端,或者更一般地说事件(请求)到达的顺序,您必须使用每个消息语义在应用程序级逻辑上实现排序来进行排序。例如,您可以存储或缓冲消息,等待所有消息到达,当它们到达时,才使用消息中的数据以正确的顺序触发业务逻辑。这需要某种可以将消息归因于同一实体的键(身份)和一个排序键,您知道如何以正确的顺序对消息进行排序。

    编辑: 得到答案后,你绝对可以“以反应堆方式”实现它。

    
        Sinks.Many<Event> sink = Sinks.many() // you creat a 'sink' where the events will go
                .multicast()                  // broads all messages to all subscribes of the stream 
                .directBestEffort();          // additional semantics - publishing will fail if no subscribers - doesn't really matter here
    
        Flux<Event> eventFlux = sink.asFlux(); // the 'view' of the sink as a flux you can subscribe to
    
          public void run() {
            subscribeAndProcess();
            sink.tryEmitNext(new Event("A", "A", "A"));
            sink.tryEmitNext(new Event("A", "C", "C"));
            sink.tryEmitNext(new Event("A", "B", "B"));
    
            sink.tryEmitNext(new Event("B", "A", "A"));
            sink.tryEmitNext(new Event("B", "C", "C"));
            sink.tryEmitNext(new Event("B", "B", "B"));
        }
    
    
        void subscribeAndProcess() {
            eventFlux.groupBy(Event::key)
                    .flatMap(
                            groupedEvents -> groupedEvents.distinct(Event::type) // distinct to avoid duplicates
                                    .buffer(3)                                   // there are three event types, so we buffer and wait for all to arrive
                                    .flatMap(events ->                           // once all the events are there we can do the processing the way we need
                                            Mono.just(events.stream()
                                                    .sorted(Comparator.comparing(Event::type))
                                                    .map(e -> e.key + e.value)
                                                    .reduce(String::concat)
                                                    .orElse(""))
                                    )
                    )
                    .subscribe(System.out::println);
        }
        // prints values concatenated in order per key: 
        // - AAABAC
        // - BABBBC
    

    见要点:https://gist.github.com/tarczynskitomek/d9442ea679e3eed64e5a8470217ad96a

    有几个注意事项:

    • 如果给定键的所有预期事件都没有到达,您就浪费了内存缓冲 - 除非您设置了超时
    • 如何确保给定键的所有事件都转到同一个应用程序实例?
    • 您将如何从处理过程中遇到的故障中恢复?

    考虑到这一切,我会使用持久存储——比如将传入的事件保存在数据库中,并在后台进行处理——为此你不需要使用 Reactor。大多数时候,一个简单的基于 Servlets 的 Spring 应用程序会更容易维护和开发,特别是如果你以前没有函数式反应式编程的经验。

【讨论】:

  • 回答您的问题: 1. 客户端发送“标记”事件,然后是使用身份关系链接到标记的正常事件,因此第 n 个事件对 nth-1 的依赖性。 2. 不重试。翻倍是不太可能的,但(目前)无关紧要。 3.客户端或在这种情况下的上游服务只是没有时间等待并且不关心结果。我无法触及上游服务,因此是我问题中的最后一句话。我认为可能有一种反应器方式来实现缓冲和异步处理。
  • 有一种反应器方法可以进行缓冲和异步处理,但您仍然必须对其进行编码 - 它的逻辑定义了缓冲的内容和时间 - 我将使用代码示例编辑答案;-)
  • 这是对 Reactor 的探索,但考虑到您的评论和我的经验,我们从 Reactor 搬回来并让它正常工作。无论如何,感谢您的时间和精力,这很有启发性!
  • 不客气,我很高兴我能帮上忙! Reactor 是一个很棒的工具,但学习曲线陡峭,并非所有用例都适合它。
【解决方案2】:

查看提供的代码,我不会尝试在 Reactor Netty 级别处理它。

起初,有几个关于控制器实现的 cmets,因为它有多个违反反应性原则的问题。我建议花一些时间学习反应式 API,但这里有一些提示

  1. 在您订阅之前,没有任何反应。同时显式调用subscribe 是一种反模式,在创建类似于WebFlux 的框架之前应避免使用。

  2. parallel 调度程序应该用于运行非阻塞逻辑,直到你有一些阻塞代码。

  3. doOn... 是所谓的副作用运算符,不应用于构建反应流。

    @PostMapping
    public Mono<String> create(Event event) {
        // do query based on some logic depending on event data
        return repo.find(query)
                .map(e -> event.setData(event.getData() + e.getData()))
                .switchIfEmpty(Mono.just(event))
                .flatMap(e -> repo.save(e));
    }
    

    现在,由于网络故障、可能的重试等原因,以预定义的顺序处理请求可能会很棘手。如果您从未收到请求 B 或请求 C,该怎么办?你还应该坚持 Request A 吗?

    正如@ttarczynski 在他的评论中提到的,最好的选择是重新设计 API 并发送单个请求。

    如果这不是一个选项,您需要引入一些状态来“推迟”请求处理,然后根据一致性语义,在收到最后一个请求时将它们作为“批处理”处理,或者只是推迟请求 C 直到获得请求 A & B。

【讨论】:

  • 此代码示例会立即返回响应吗?请求处理必须是异步的,这意味着上游服务只是处理数据而不关心结果。该服务存储的数据稍后用于下游分析。这就是我故意使用doOnNext的原因
猜你喜欢
  • 2019-01-03
  • 2020-06-03
  • 1970-01-01
  • 1970-01-01
  • 2019-07-31
  • 2018-07-26
  • 1970-01-01
  • 2020-09-12
  • 2018-09-09
相关资源
最近更新 更多