【问题标题】:Unsubscribe observer in Async grpc stub - Java / Kotlin取消订阅异步 grpc 存根中的观察者 - Java / Kotlin
【发布时间】:2019-08-08 17:30:09
【问题描述】:

我有一个异步存根,我在其中添加了一个观察者:

            val obs =  object: StreamObserver<Hallo> {

                override fun onNext(value: Hallo) {

                    streamSuccess(value)
                }

                override fun onError(t: Throwable?) {

                    nonSuccess(t?.message ?: "Unknow error")
                }

                override fun onCompleted() {

                    Log.d("Info", "completed")
                    completed()
                }
            }

我希望能够从异步存根中删除这个观察者,这样我就可以在客户端取消流式传输。

正如 github 问题中所说:https://github.com/grpc/grpc-java/issues/3095

我尝试保留观察者的局部变量,以便客户端以后可以这样做:

observer?.onError(Status.CANCELLED.cause)

那没用。

我还尝试从抽象类创建自己的类:ClientCallStreamObserver

class CancellableStreamObserver<TResponse>(val next:(value:TResponse)->Unit, val onError:(t:Throwable)-> Unit, val onCompleted:(()->Unit), val onCanceledHandler: (()->Unit)? = null) : ClientCallStreamObserver<TResponse>() {
        override fun isReady(): Boolean {
            return  true
        }

        override fun setOnReadyHandler(onReadyHandler: Runnable?) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun disableAutoInboundFlowControl() {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun cancel(message: String?, cause: Throwable?) {

            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun request(count: Int) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun setMessageCompression(enable: Boolean) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun onNext(value: TResponse) {
            next(value)
        }

        override fun onError(t: Throwable) {
            if (t is StatusException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            if (t is StatusRuntimeException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            this.onError(t)
        }

        override fun onCompleted() {
            onCompleted()
        }
    }

所以稍后我可以打电话:

        observer?.cancel("Cancelled for the user",Status.CANCELLED.cause)

那也没用。

我知道它不起作用的方式,这是因为如果用户再次添加新的观察者,我会得到重复的响应,就好像旧的观察者还活着一样。

我知道我可以使用channel.shutdownNow() 关闭频道。但我觉得这太激进了。

谢谢

【问题讨论】:

    标签: kotlin grpc grpc-java


    【解决方案1】:

    来自引用的https://github.com/grpc/grpc-java/issues/3095

    对于异步,您可以使用 ClientCallStreamObserver.cancel(),方法是将 返回的 StreamObserver 转换为 ClientCallStreamObserver 或实现让传入的 StreamObserver 实现 ClientResponseObserver。

    (强调)

    grpc-java 将实现适当的方法,而不是您的实例。所以模式是:

    stub.foo(req, object: ClientResponseObserver<Hallo> {
        override fun beforeStart(respObs: ClientCallStreamObserver<Hallo>) {
            // save respObs for later
        }
        override fun onNext(value: Hallo) {
            streamSuccess(value)
        }
        override fun onError(t: Throwable?) {
            nonSuccess(t?.message ?: "Unknow error")
        }
        override fun onCompleted() {
            Log.d("Info", "completed")
            completed()
        }
    });
    
    // -or- (for streaming calls only)
    
    val obs = ...;
    val respObs = stub.foo(obs) as (ClientCallStreamObserver<Hallo>);
    respObs.onNext(req);
    // save respObs for later
    

    请注意,两种情况下的respObs 是相同的。使用ClientResponseObserver 主要用于当有流式传输并希望在响应观察者中取消以避免线程竞争时。

    【讨论】:

    • 整洁!它完美地工作。我想我没有遵循错误的提示:) 我认为只有一件事: val respObs = (ClientCallStreamObserver) stub.foo(req, obs);应该是: val respObs = stub.foo(req, obs) as (ClientCallStreamObserver) 非常感谢!
    • 已修复。我不知道 Kotlin :-)
    • 也许我错了,但我发现第二个选项不起作用。 stub.foo(req,obs) 不返回任何内容,因此 Kotlin 在日志中默默地抱怨它无法将 Unit(就像 Kotlin 中的 void )转换为(ClientCallStreamObserver)。我必须做第一种方法,但添加请求: object: ClientResponseObserver { 。这可能不是最好的方法,但似乎很有效。
    • @xarly,啊,你是对的。第二个是一种解决方案,但仅适用于流式 RPC。我更新了答案以提及这一点。
    • 嗯,我认为这不对(至少在我的特定情况下)。我正在使用流式 rpc 并且 stub.foo(req, obs) 不返回任何内容。我也不能做 stub.foo(obs),生成的唯一方法有两个参数;请求和观察者。
    猜你喜欢
    • 2016-07-10
    • 1970-01-01
    • 2021-06-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多