【发布时间】:2012-02-29 10:20:10
【问题描述】:
我正在使用 Netty 库(来自 GitHub 的第 4 版)。它在 Scala 中运行良好,但我希望我的库能够使用持续传递样式进行异步等待。
传统上使用 Netty 你会做这样的事情(一个异步连接操作示例):
//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete (f:ChannelFuture) = {
//here goes the code that happens when the connection is made
}
})
如果您正在实现一个库(我就是这样),那么您基本上可以使用三个简单的选项来允许该库的用户在建立连接后执行操作:
- 只需从您的 connect 方法返回 ChannelFuture 并让用户处理它 - 这并没有提供太多来自 netty 的抽象。
- 将 ChannelFutureListener 作为连接方法的参数,并将其作为侦听器添加到 ChannelFuture。
- 将回调函数对象作为连接方法的参数,并从您创建的 ChannelFutureListener 中调用它(这将形成类似于 node.js 的回调驱动样式)
我想做的是第四个选项;我没有把它包括在上面的计数中,因为它并不简单。
我想使用 scala 分隔的延续来使库的使用有点像一个阻塞库,但它在幕后是非阻塞的:
class MyLibraryClient {
def connect(remoteAddr:SocketAddress) = {
shift { retrn: (Unit => Unit) => {
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete(f:ChannelFuture) = {
retrn();
}
});
}
}
}
}
想象一下以相同方式实现的其他读/写操作。这样做的目的是让用户的代码看起来更像这样:
reset {
val conn = new MyLibraryClient();
conn.connect(new InetSocketAddress("127.0.0.1", 1337));
println("This will happen after the connection is finished");
}
换句话说,该程序看起来像一个简单的阻塞式程序,但在幕后不会有任何阻塞或线程。
我遇到的麻烦是我不完全理解定界延续的类型是如何工作的。当我尝试以上述方式实现它时,编译器抱怨我的operationComplete 实现实际上返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] 而不是Unit。我知道 scala 的 CPS 中存在某种“陷阱”,因为您必须用 @suspendable 注释 shift 方法的返回类型,它会在调用堆栈中向上传递,直到 reset,但似乎没有以任何方式与预先存在的没有分隔延续概念的 Java 库相协调。
我觉得确实有办法解决这个问题 - 如果 Swarm 可以序列化延续并通过网络将它们阻塞以在其他地方计算,那么它必须可以简单地从预先存在的 Java 类调用延续。但我无法弄清楚它是如何做到的。为了实现这一点,我是否必须在 Scala 中重写 netty 的整个部分?
【问题讨论】:
-
我不知道如何修复 scala 的东西,但我建议反对你的想法。让我来告诉你为什么。但是让用户“不知道”你的库的异步性质,你会告诉他在监听器代码中“阻塞”调用是可以的。事实上,他甚至不知道他在一个监听器中编写了他的代码。在侦听器中进行阻塞调用可能会导致各种问题。大多数时候你会看到的问题是它“减慢”了其他 io 任务,从而限制了吞吐量。
-
你的观点很好,但我不同意。我认为我的图书馆的用户,如果除了我之外还有其他人,可能必须了解
reset的开头,因此会明白这些调用是非阻塞的。这实际上只是一种方法 A) 更深入地了解定界延续,以及 B) 尝试以更简洁的方式编写本质上由回调驱动的代码。
标签: scala netty continuations