【问题标题】:Generating UUID in WebFlux在 WebFlux 中生成 UUID
【发布时间】:2021-12-13 20:42:17
【问题描述】:

假设我在实现BeforeConvertCall<Object>的类中有这样一个方法

 @Override
    public Publisher<Object> onBeforeConvert(Object o, SqlIdentifier sqlIdentifier) {
        
        return Mono.fromCallable(() -> enhance(o)); (A)
        //
        return Mono.just(o)
        .map(this::enhance); (B)

    }

private Object enhance(Object o) {

    if (o instanceof Foo && ((Foo) o).getId() == null) {
        ((Foo) o).setId(UUID.randomUUID().toString());
    } 
    return o;
}
  1. 此代码是否阻塞?我的意思是应该担心吗?
  2. UUIDThreadLocal 包装在反应堆中是否有意义?
  3. (A) 和 (B) 调用之间有什么区别吗?

我试过也把这个耗时操作的增强方法里面放了:

for (int i = 0; i < Integer.MAX_VALUE; i++) {
        String id = UUID.randomUUID().toString();
    }

并发出 3 个并发请求。 4 个reactor-http-nio 线程均在可运行状态下生成。首先锁定SecureRandom 然后休息3:

- locked <0x0000000600c4b510> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x0000000600c4b4b0> (a sun.nio.ch.KQueueSelectorImpl)
  1. 为什么其余 3 个线程没有因为 SecureRandom 实例上的锁定而被阻塞?

我尝试将相同的 for 循环放入 doNext() 钩子内的主管道中,突然休息线程按预期被阻塞。

  1. 为什么他们在这里被阻止,但在onBeforeConvert() 中采用相同的方法却没有? 此外,当我尝试使用 ThreadLocal 结束通话时,它没有帮助。

java.lang.Thread.State: BLOCKED(在对象监视器上)在 sun.security.provider.SecureRandom.engineNextBytes(java.base@11.0.10/SecureRandom.java:216)

  • 等待锁定(sun.security.provider.SecureRandom)
  1. 我希望所有 4 个线程都有自己的 SecureRandom.. 实例? 是否可能只有主线程拥有该实例,并将工作填充到没有SecureRandom 的线程本地的工作线程?

最后一个实验:

 @Override
    public Publisher<Object> onBeforeConvert(Object o, SqlIdentifier sqlIdentifier) {
        
        return Mono.fromCallable(() -> enhance(o))
        .map(this::enhance)
        .subscribeOn(Schedulers.boundedElastic()); (C)

    }

在添加行 (C) 并运行 4 个并发请求后,boundedElastic-2,3,4 确实被阻止了。

  1. 为什么前面的方法reactor-http-nio没有被屏蔽,却被屏蔽了?

【问题讨论】:

    标签: java multithreading spring-webflux project-reactor


    【解决方案1】:
    1. 是的,以您显示此代码的方式 sn-p - 在这里,UUID 生成是阻塞的。

    2. 没有

    3. 直到subscribeOn 未被调用,这两者之间没有显着差异。 just 会立即针对所有订阅者进行评估,而fromCallable 在订阅时会针对每个订阅者单独评估。 在此处查看更多信息:link

    最好的办法是使用:

    return Mono.fromCallable(() -> enhance(o))
            .subscribeOn(Schedulers.boundedElastic());
    

    通过这种方法,您可以将阻塞逻辑委托给单独的线程池 Schedulers.boundedElastic(),因此主反应器线程池不会被阻塞,并且不会影响整个应用程序

    【讨论】:

      猜你喜欢
      • 2016-10-23
      • 2017-06-25
      • 2013-11-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-18
      • 2014-11-30
      • 2011-03-15
      相关资源
      最近更新 更多