【问题标题】:Using scala continuations with netty/NIO listeners使用带有 netty/NIO 侦听器的 scala 延续
【发布时间】:2012-02-29 10:20:10
【问题描述】:

我正在使用 Netty 库(来自 GitHub 的第 4 版)。它在 Scala 中运行良好,但我希望我的库能够使用持续传递样式进行异步等待。

传统上使用 Netty 你会做这样的事情(一个异步连接操作示例):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

如果您正在实现一个库(我就是这样),那么您基本上可以使用三个简单的选项来允许该库的用户在建立连接后执行操作:

  1. 只需从您的 connect 方法返回 ChannelFuture 并让用户处理它 - 这并没有提供太多来自 netty 的抽象。
  2. 将 ChannelFutureListener 作为连接方法的参数,并将其作为侦听器添加到 ChannelFuture。
  3. 将回调函数对象作为连接方法的参数,并从您创建的 ChannelFutureListener 中调用它(这将形成类似于 node.js 的回调驱动样式)

我想做的是第四个选项;我没有把它包括在上面的计数中,因为它并不简单。

我想使用 scala 分隔的延续来使库的使用有点像一个阻塞库,但它在幕后是非阻塞的:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

想象一下以相同方式实现的其他读/写操作。这样做的目的是让用户的代码看起来更像这样:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

换句话说,该程序看起来像一个简单的阻塞式程序,但在幕后不会有任何阻塞或线程。

我遇到的麻烦是我不完全理解定界延续的类型是如何工作的。当我尝试以上述方式实现它时,编译器抱怨我的operationComplete 实现实际上返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] 而不是Unit。我知道 scala 的 CPS 中存在某种“陷阱”,因为您必须用 @suspendable 注释 shift 方法的返回类型,它会在调用堆栈中向上传递,直到 reset,但似乎没有以任何方式与预先存在的没有分隔延续概念的 Java 库相协调。

我觉得确实有办法解决这个问题 - 如果 Swarm 可以序列化延续并通过网络将它们阻塞以在其他地方计算,那么它必须可以简单地从预先存在的 Java 类调用延续。但我无法弄清楚它是如何做到的。为了实现这一点,我是否必须在 Scala 中重写 netty 的整个部分?

【问题讨论】:

  • 我不知道如何修复 scala 的东西,但我建议反对你的想法。让我来告诉你为什么。但是让用户“不知道”你的库的异步性质,你会告诉他在监听器代码中“阻塞”调用是可以的。事实上,他甚至不知道他在一个监听器中编写了他的代码。在侦听器中进行阻塞调用可能会导致各种问题。大多数时候你会看到的问题是它“减慢”了其他 io 任务,从而限制了吞吐量。
  • 你的观点很好,但我不同意。我认为我的图书馆的用户,如果除了我之外还有其他人,可能必须了解reset 的开头,因此会明白这些调用是非阻塞的。这实际上只是一种方法 A) 更深入地了解定界延续,以及 B) 尝试以更简洁的方式编写本质上由回调驱动的代码。

标签: scala netty continuations


【解决方案1】:

当我刚开始时,我发现Scala's continuations 的这种解释非常有用。特别注意他解释shift[A, B, C]reset[B, C]的部分。添加一个虚拟的null 作为operationComplete 的最后一条语句应该会有所帮助。

顺便说一句,你需要在另一个reset 中调用retrn(),如果它可能嵌套了一个shift

编辑:这是一个工作示例

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

可能的输出:

Outside reset
operationComplete starts
This will happen after the connection is finished

【讨论】:

  • 这实际上让编译器很开心,甚至看起来工作正常。我猜的关键是您将shift 移到匿名ChannelFutureListener 之外,并使用闭包从operationComplete 内部调用延续。我不确定我是否理解为什么这行得通,而另一种方式则行不通,但我会接受的。谢谢!
  • 这是一本关于 scala 延续的很好的读物。他们应该从 scala-lang.org 页面中删除关于延续的毫无价值的示例,并将它们替换为您链接的文章。
  • @Jeremy btw,你的代码和我的不同之处在于我明确注释了某些方法的返回类型。
  • 我知道在我发布的代码中,我的代码与您的代码几乎一模一样。但由于某种原因,这不是我在真实代码中所拥有的。我在operationComplete 的实现中放错了shift 块,这导致了编译器错误。我没有意识到我可以将shift 放在外面并使用闭包来调用它。即使我错误地在我的问题中正确地写了它:)
猜你喜欢
  • 2021-07-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多