【问题标题】:Why is a suspending repository function executed but a non-suspending one not?为什么执行暂停的存储库功能但不执行非暂停的存储库功能?
【发布时间】:2021-12-18 20:49:06
【问题描述】:

我有这个服务方式:

@Transactional
override suspend fun deleteByCarId(carId: Long) {
  routeRepository.deleteByCarId(carId)
  routePlanRepository.deleteByCarId(carId)
  carRepository.deleteById(carId)
}

路线计划(中)和汽车(最后一行)被删除,但routeRepository上的删除没有被执行。

interface RouteRepository : CoroutineCrudRepository<Route, Long> {
  fun deleteByCarId(carId: Long)
  // ...
}
interface RoutePlanRepository : CoroutineCrudRepository<RoutePlan, Long> {
  suspend fun deleteByCarId(carId: Long)
  // ...
}

所以,我发现这可能是因为RouteRepository 在删除方法上错过了suspend,但有人可以解释一下,为什么这很重要?


编辑 1

通过进一步考虑更一般的情况,我认为创建Flow 的非暂停存储库函数似乎不一定标记为suspend。但我不明白为什么。所有其他方法似乎都需要suspend - 但为什么呢?通常我可以从协程运行一个非挂起函数并执行它(当然我们不应该,因为它可能会阻塞线程 - 对吧?)。

我的假设是创建Flow 不需要是可暂停的,因为创建它的操作很快,并且只有它的订阅最终会执行查询。

在上面的示例中,没有订阅Flow,因为该方法返回Unit - 这就是不执行删除的原因?

但是,我仍然不明白,为什么用suspend 标记存储库方法会改变行为。现在查询的创建本身就是一个异步操作,因此它(删除查询的创建)成为整个请求处理链的一部分?

但我希望在挂起函数中创建查询不会自动订阅?谁能解释一下?

编辑 2

我创建这个问题是因为我认为应该更好地记录什么是可能的,什么不是:https://github.com/spring-projects/spring-data-commons/issues/2503

文档目前声明delete 需要suspend,但未明确提及:

对于返回值,从 Reactive 到 Coroutines API 的转换如下:

fun handler(): Mono&lt;Void&gt; 变为 suspend fun handler()

fun handler(): Mono&lt;T&gt; 变为 suspend fun handler(): Tsuspend fun handler(): T? 取决于 Mono 是否可以为空(具有更多静态类型的优势)

fun handler(): Flux&lt;T&gt; 变为 fun handler(): Flow&lt;T&gt;

https://docs.spring.io/spring-data/r2dbc/docs/current/reference/html/#kotlin.coroutines.reactive

【问题讨论】:

  • 您的意思是所有这些deleteByCarId() 函数实际上都返回流吗?因为从您的示例中它们返回Unit,而不是流。
  • 在示例中他们返回Unit,我不明白为什么执行暂停但没有暂停?通过进一步思考,我发现理解它对流也有帮助。我也不明白带有Unit return 的挂起函数在 r2dbc 中的行为。它是否在引擎盖下创建了一个流程(不返回它)?

标签: kotlin spring-webflux kotlin-coroutines project-reactor spring-data-r2dbc


【解决方案1】:

我不完全理解这里发生了什么,但我相信你误解了这个案例。挂起函数或流中没有内置魔法,例如,如果在挂起函数中创建流,它们会自动变热。但是Spring Data提供了一个魔法,与CoroutineCrudRepository相关。

最近,许多数据库/Web 服务库遵循用户创建接口并且库提供该接口的实现的模式。这意味着库读取界面并尝试猜测用户的预期结果。

问题是:非挂起删除功能没有太大意义。您的预期行为是什么?它应该阻塞线程吗?使用协程时非常不鼓励这样做。应该是异步的,所以在后台启动删除操作,立即返回?但是如果函数没有返回JobDeferredFlow或类似的,如何访问这个异步任务呢?

我的猜测是该库与您的函数定义混淆,它不知道您期望什么,因此它提供了一些“奇怪”或不完整的实现。也许它异步安排删除操作。或者它只是什么都不做。我同意@João Dias 的观点,它可能会抛出错误或至少生成警告。

无论如何,我认为解决您的问题的正确方法是使此功能suspend。从一开始就应该是suspend

更新

在 cmets 中进行了一些讨论之后,问题的作者似乎很困惑,该函数是 suspend 或它返回例如是什么意思? Flow 以及它们如何相互连接。

这都与我们希望如何等待一些长时间运行的操作有关。我们可以同步执行,这意味着函数在操作完成时返回,也可以异步执行,因此函数立即返回并在后台执行操作。在 Kotlin 中有两个同步选项:阻塞(经典)和挂起(协程)。第一个阻塞线程,我们希望通过使用协程和挂起函数来避免这种情况。

有更多的异步选项,它们分为两个子类别:接收一次数据(回调,CompletableFutureDeferredMono)或接收值流(回调,FlowFlux)。在数据库的情况下,值流可以被解释为被搜索时的实体流(我相信 Spring Data 就是这样工作的)或数据库中数据的变化流(Android Room?)。

最终,这是您决定要使用哪种执行策略,您可以通过更改函数的定义来选择它,例如:

  • suspend fun getData(): Data - 通过挂起同步接收数据一次。
  • fun getData(): Deferred&lt;Data&gt; - 一次异步接收数据(我们也可以使用未来,Mono 等)。
  • fun getData(): Flow&lt;Data&gt; - 接收数据流(或者:Flux 等)。
  • fun getData(): Data - 通过阻塞线程同步接收数据 - 应该避免
  • suspend fun getData(): Flow&lt;Data&gt; - 不清楚预期的行为是什么,因为它似乎同时是同步的 (suspend) 和异步的 (Flow) - 应该避免(对于暂停函数返回相同期货、DeferredMonoFlux 等)。

删除操作有点不同,因为它不返回任何数据,但思路是一样的。我们可以同步执行(挂起函数)或异步执行(非挂起返回Deferred&lt;Void&gt;Mono&lt;Void&gt;等)。

最后一点:我并不是说上述所有函数定义都适用于 Spring Data。我在这里说的大多是抽象的,但正如我所说,Spring Data 试图通过查看您的函数定义来猜测预期的行为是什么,我相信它或多或少应该像我描述的那样工作。

【讨论】:

  • 有趣,但我认为它记录在 r2dbc 参考 17.5.1 Coroutines support is enabled when kotlinx-coroutines-core, kotlinx-coroutines-reactive and kotlinx-coroutines-reactor dependencies are in the classpath 和 esp 17.5.3 文档 CorutineCrudRepository => docs.spring.io/spring-data/r2dbc/docs/current/reference/html/…
  • 这也是相关的,但我还是不明白Invoking a custom implementation method propagates the Coroutines invocation to the actual implementation method if the custom method is suspend-able without requiring the implementation method to return a reactive type such as Mono or Flux.。删除方法返回 void 或删除计数。所以我仍然认为它缺乏文档,并将在下周将其发布在 r2dbs repo 中。
  • 啊,对不起,我说这不是官方的时完全错了。我发现了错误的 GitHub 存储库 :facepalm:。关于您的第二条评论:我绝对不是 Spring Data 方面的专家,但这个引用的片段是关于自定义实现的,您在这里使用查询方法。另外,如果我错了,请纠正我,但反应式存储库中的删除操作不会返回void,而是Mono&lt;Void&gt;。从技术上讲,对删除操作使用非挂起的同步函数并且不阻塞线程是不可能的。我们只能阻塞、挂起或异步。
  • 那为什么find-operations 不能是suspending 呢?本来明白添加suspend可以解决问题,但不明白为什么。我在 spring-data 上创建了一个问题,因为这应该更详细地记录github.com/spring-projects/spring-data-commons/issues/2503
  • 可能是因为Flows collect 操作正在暂停功能
【解决方案2】:

我没有使用 spring 或您使用的依赖项的经验,但我一直在使用 android 的 Room ORM,我认为您使用的依赖项以类似的方式工作。

本质上,这些 ORM 会为您生成代码,并且在根本上它们正在执行以下类型的操作:

...
db.connect()
val cursor = db.execSQL("DELETE FROM $table WHERE $condition")
db.commit()
...

我说的对吗?

在 Room 中,开发者可能希望以不同的方式消费:

  • 简单的旧阻塞方式:按原样生成函数而不做任何花哨的事情
  • 与以前相同,但将其标记为 suspend,因此我们知道这是一项长期运行的操作
  • 使用LiveData&lt;T&gt; 包装器。 (类似于流)
  • 使用Deferred&lt;T&gt; 类型(好像用户想要启动async {} 任务

... 或任何其他类型的绑定;那可能是CompleteableFutureRxJava……你明白了。

因此,当您指示时,您使用的工具可能只是将 db 操作包装到 suspend 函数中,并且生成的代码最终看起来像:

suspend fun deleteById(...) {
  db.connect()
  val cursor = db.execSQL(...)
  db.commit()
  return cursor.getFirst()
}

这可以解释为什么将函数标记为suspend 只是有效...但是Flow&lt;T&gt; 奇怪的行为呢?

嗯,这取决于您如何创建流程。

你可以像这样创建一个Flow:(注意它不是阻塞函数!)

fun deleteById(...) = flow {
  db.connect()
  val cursor = db.execSQL(...)
  db.commit()
  emit(cursor.getFirst())
}

在这里,我们只是使用了一个流程构建器,只要流程是collected,就会执行该操作。

但是,如果 ORM 在您传递 suspend 关键字时尝试做到最好呢?

我想它看起来像下面这样:

suspend fun deleteById(...) {
  db.connect()
  val cursor = db.execSQL(...)
  db.commit()
  return flowOf(cursor.getFirst())
}

所以这一次,代码在被调用时被执行,但结果被包装在Flow 中,所以无论您是否订阅给定的Flow&lt;T&gt;,都已经执行了什么数据库操作

【讨论】:

    【解决方案3】:

    我觉得奇怪的是,当您在扩展 CoroutineCrudRepository 的接口中定义非挂起函数时没有警告甚至编译错误。

    您可能知道,挂起函数是协程的核心概念。 suspend 关键字意味着这个函数可以是阻塞的,并且可以在以后暂停和恢复。当您需要执行长时间运行的操作但您不希望线程被阻塞等待它完成时,这非常有用。数据库调用只是这些长时间运行操作的一个示例,这就是为什么所有CoroutineCrudRepository 方法都被定义为挂起函数的原因。您可以在https://github.com/konrad-kaminski/spring-kotlin-coroutine/blob/master/spring-data-mongodb-kotlin-coroutine/src/main/kotlin/org/springframework/data/repository/coroutine/CoroutineCrudRepository.kt 的源代码中查看这一点。

    我仍然觉得奇怪的是,当您执行以下操作时没有警告甚至编译错误,但编译器可能无法确定只应将挂起函数添加到这样的接口:

    interface RouteRepository : CoroutineCrudRepository<Route, Long> {
      fun deleteByCarId(carId: Long)
      // ...
    }
    

    【讨论】:

    • 请注意,并非所有存储库都需要暂停。那些返回Flow 的人不会。请参阅github.com/spring-projects/spring-data-commons/blob/main/src/… 原因是,它们只有在实际执行阻塞操作时才应该挂起。否则,该方法只会构建一个流程(其本身可以暂停/阻塞)您的回答没有回答我的问题,即为什么服务不将非暂停调用挂钩到流程中,从而不执行它。
    • Flow 的好点,我忘了。感谢您的提示;)但除此之外,所有其他方法都需要暂停。
    猜你喜欢
    • 2017-01-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-07
    • 1970-01-01
    • 1970-01-01
    • 2021-07-18
    相关资源
    最近更新 更多