【问题标题】:Unwrapping the Functions without parameters in scala with Futures使用 Futures 在 scala 中展开没有参数的函数
【发布时间】:2025-12-06 01:15:02
【问题描述】:

我正在尝试创建一个转换链来定义给定函数的可能转换:

type Transformation[T] = T => Future[T]

def transformationChain[T](chain: Seq[Transformation[T]]): Transformation[T] = {
}


val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
val t3: Transformation[Int] = t =>
    if (t > 2) Future.successful(t * t)
    else Future.failed(new NoSuchElementException)

val tc = transformationChain(Seq(t1, t2, t2, t3))
val tc2 = transformationChain(Seq(t2, t2, t2))
val tc3 = transformationChain(Seq(t2, t3, t1))

println(Await.result(tc(2), 5.seconds))  // 16
println(Await.result(tc2(2), 5.seconds)) // throw NoSuchElementException
println(Await.result(tc3(2), 5.seconds)) // 4

问题是我不明白如何在“transformationChain”方法中解开这些函数,以通过循环或递归调用它们将结果发送到链中的每个下一个函数。

【问题讨论】:

  • 在给定t2的定义的情况下,为什么他们不应该都返回Future.failed(NoSuchElementException)
  • 如果所有函数都返回 NoSuchElementException 而没有任何数值结果,则 transformationChain 的结果应该返回 Transformation[Int] = _ => Future.failed(new NoSuchElementException) 在其他情况下它应该返回一些没有数值的结果例外。
  • 所以在其他情况下,它应该“忽略”所有抛出 NoSuchElementException 的转换并继续将“上一个结果”传递给下一个?
  • 完全一样
  • 不只是 reduceflatMap 应该完成这项工作吗?

标签: scala functional-programming concurrent.futures


【解决方案1】:

你所描述的变换A => F[B]的函数)通常被称为Kleisli箭头。

Cats 库有一个数据类型,这使得对这些函数的操作更容易。例如,它有一个方法 andThen,它允许组合这些函数:

import cats.data.Kleisli
import cats.implicits._

val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)

Kleisli(t1).andThen(Kleisli(t2))

唯一的问题是,您的其中一个转换可能会返回失败的未来,这会使整个链条短路。我们可以使用 recoverWith 修复它。

所以最后 transformationChain 可能看起来像:

def transformationChain[T](chain: Seq[Transformation[T]]): Transformation[T] =
    t =>
      chain
         //wrap the function in Kleisli and then use replace failed futures with succeeded
         //future, that are passing value over
        .map(Kleisli(_).recoverWith {
          case _ => Kleisli(x => Future.successful(x))
        })
        .reduce(_ andThen _) //combine all elements with andThen
        .apply(t)

它适用于案例 1 和 3,但对于案例 2 则失败,因为它只会返回传递的值。

println(Await.result(tc(2), 5.seconds)) // 16
println(Await.result(tc3(2), 5.seconds)) // 4
println(Await.result(tc2(2), 5.seconds)) // 2

【讨论】:

    【解决方案2】:
    import scala.concurrent.{ ExecutionContext, Future }
    import scala.util.Try
    
    import ExecutionContext.Implicits.global
    
    object Transformations {
      type Transformation[T] = T => Future[T]
    
      private object DummyException extends Exception
      private val notReallyAFailedFuture: Future[Throwable] = Future.failed(DummyException)
    
      def transformationChain[T](chain: Seq[Transformation[T]])(implicit ectx: ExecutionContext): Transformation[T] = t =>
        if (chain.nonEmpty) {
          val initialFut = Future.successful(t)
          // resultFut will succeed if any of the transformations in the chain succeeded
          // lastFailure will fail if all of the transformations succeeded, otherwise it has the last failure
          val (resultFut: Future[T], lastFailure: Future[Throwable]) =
            chain.foldLeft((Future.failed[T](DummyException), notReallyAFailedFuture)) { (acc, v) =>
              val thisResult = acc._1.recoverWith {
                case _ => initialFut
              }.flatMap(v)
              val lastFailure = thisResult.failed.recoverWith { case _ => acc._2 }
              (thisResult.recoverWith { case _ => acc._1 }, lastFailure)
            }
          resultFut.recoverWith {
            case _ =>
              lastFailure.flatMap(Future.failed)
          }
        } else Future.successful(t)   // What to do with an empty chain is unspecified
    
      def main(args: Array[String]): Unit = {
        import scala.concurrent.Await
        import scala.concurrent.duration._
    
        val t1: Transformation[Int] = t => Future.successful(t + t)
        val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
        val t3: Transformation[Int] = t =>
          if (t > 2) Future.successful(t * t)
          else Future.failed(new NoSuchElementException)
    
        val tc1 = transformationChain(Seq(t1, t2, t2, t3))
        val tc2 = transformationChain(Seq(t2, t2, t2))
        val tc3 = transformationChain(Seq(t2, t3, t1))
    
        println(Try(Await.result(tc1(2), 5.seconds)))
        println(Try(Await.result(tc2(2), 5.seconds)))
        println(Try(Await.result(tc3(2), 5.seconds)))
      }
    }
    

    此实现假定:

    • 如果多次转换失败,返回最后一次失败
    • 如果链为空,则假设进行身份转换

    transformationChain 现在确实需要一个隐式ExecutionContext 来调度转换期货之间的“胶水”功能。在 Scala 2.13+ 中,scala.concurrent.ExecutionContext.parasitic 上下文实际上是执行这些快速转换的一个很好的选择(并且基本上没有其他用处)。

    为了让所有printlns 都能执行,我将Await.results 包装在Try 中。

    为了简洁起见,有一些使用失败的Future 来表示没有结果。

    【讨论】: