【问题标题】:Spring Data with JPA 2.2 resultStream to Kotlin's FlowSpring Data with JPA 2.2 resultStream 到 Kotlin 的流
【发布时间】:2020-03-01 04:06:46
【问题描述】:

在 JPA 2.2 之前,如果我想将 ScrollableResults 发射到 Kotlin 的 Flow ,我必须这样做:

  override fun findSomeUsers(batch: Int): Flow<User> {
    return flow {
      (em.delegate as Session).sessionFactory.openSession().use { session ->
        val query = session.createQuery("select u from User u where ...")
        query.fetchSize = batch
        query.isReadOnly = true

        query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
          while (results.next()) {
            val u = results.get(0) as User
            emit(u)
          }
        }
      }
    }
  }

我必须将 EntityManager 转换为 Hibernate 的 Session

但是由于 JPA 2.2 的 Query 支持 getResultStream ,应该有一种更简洁的方法来实现这一点:

  @ExperimentalCoroutinesApi
  override fun findSomeUsers(batchSize: Int): Flow<User> {
    return channelFlow {
      em.createQuery("select u from User u where ...")
        .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
        .unwrap(javax.persistence.Query::class.java)
        .resultStream
        .asSequence()
        .map { it as User }
        .forEach { u ->
          runBlocking {
            send(u)
          }
        }
    }
  }

嗯,它运作良好,但有些可疑。

首先,为什么我不能只编码 resultStream.asSequence.map {it as User}.asFlow() ? (客户端什么也没发生就结束了)

其次,runBlocking 块很难看。 runBlocking 只能用于测试。但是我发现没有办法在代码中规避它。

有什么办法可以解决吗?

第三,它与问题无关。看来 Spring-Data-JPA 仍然不支持这种查询方法:

  @Query("select u from User u where ...") 
  @MaybeSomeQueryHint(batchSize=:batchSize)
  fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>

它加载所有用户,然后抱怨重复的行......

客户端(测试)端代码就这么简单:

  @ExperimentalCoroutinesApi
  @Test
  @Transactional
  open fun testUsers() {
    runBlocking {
      userDao.findSomeUsers(100).collectIndexed { index, u: User ->
        logger.info("[{}] {}", index , u)
      }
    }
  }

@Marko,Stream 版本运行良好:

  override fun findSomeUserStream(batchSize: Int): Stream<User> {
    return em.createQuery("select u from User u where ...")
      .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
      .unwrap(javax.persistence.Query::class.java)
      .resultStream
      .map { it as User }
  }


  @Transactional // without this annotation , "Operation not allowed after ResultSet closed" will be thrown
  @Test
  open fun testUserStream() {
    runBlocking {
      userDao.findSomeUserStream(100).forEach { u ->
        logger.info("{}" , u)
      }
    }
  }


  // it works !!
  @Transactional
  @Test
  open fun testUserStream2() {
    runBlocking {
      userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
        logger.info("{}" , u)
      }
    }
  }

【问题讨论】:

    标签: java jpa kotlin spring-data-jpa kotlin-coroutines


    【解决方案1】:

    不要修补Stream.toSequence() 的结果,而是定义StreamFlow 的这种转换:

    fun <T> Stream<T>.asFlow() = flow {
        for (t in iterator()) {
            emit(t)
        }
    }
    

    如果您将它与此代码示例一起使用:

    suspend fun main() {
        Stream.of("a", "b")
                .asFlow()
                .collect { println(it) }
    }
    

    它会打印出来

    a
    b
    

    你的函数应该是这样的:

    override fun findSomeUsers(batchSize: Int): Flow<User> {
        return em.createQuery("select u from User u where ...")
                .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
                .unwrap(javax.persistence.Query::class.java)
                .resultStream
                .asFlow()
                .map { it as User }
    }
    

    【讨论】:

    • 好吧,客户端代码结束,什么都没有发生,没有任何输出。 (我已经添加了客户端代码)
    • 你是如何在服务器端代码中使用它的?我添加了一个例子。使用这种方法的 JPA 会话的范围并不明显,但我猜如果会话关闭太快,消费代码中应该会有异常。
    • 我还没有部署到服务器端,我只是尝试客户端测试(如上图用runBlocking包裹)。您的代码 resultStream.asFlow().map { it as User} 实际上在没有任何反应的情况下以静默方式结束(也没有例外)。
    • 这并不奇怪,你会从中得到一个Flow。你用你得到的流量做了什么?如果您尝试使用它但仍然没有输出,则意味着流是空的,这意味着底层流是空的。直接使用resultStream时,首先要确保代码能正常工作。
    • 对于任何对此问题感兴趣的人,我已经创建了一个 Spring-Data-JPA 票证,希望他们的 @Query 注释能够返回 Flowjira.spring.io/browse/DATAJPA-1624
    猜你喜欢
    • 2020-01-27
    • 2022-01-01
    • 2014-11-27
    • 1970-01-01
    • 2017-08-21
    • 2023-03-13
    • 2015-06-28
    • 1970-01-01
    • 2016-06-03
    相关资源
    最近更新 更多