【问题标题】:Can I pass reactor Context into grpc clientInterceptor implicitly?我可以将反应器上下文隐式传递给 grpc clientInterceptor 吗?
【发布时间】:2021-06-27 01:07:19
【问题描述】:

这是我想在高层次上做的事情

  1. 在 WebFilter 中捕获一些 http 标头
  2. 在Controller方法中,我进行了grpc调用
  3. 我想将 http 标头作为 grpc 元数据标头传播

目前,我的工作实现是

  1. WebFilter 捕获 http 标头并写入反应器上下文
  2. Controller 方法提取反应器上下文并将其传递给 Grpc ClientInterceptor
  3. Grpc ClientInterceptor 从 Context 中提取 http 标头并注入到 Grpc 元数据标头中

但我想避免让 Controller 方法做任何工作(上面的第 2 步)。

这是一个实现,但正在寻找一种将 http 标头获取到 Grpc 元数据的方法,而无需从 Controller 方法中显式传递它们。

网络过滤器

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
  return chain.filter(exchange)
    .contextWrite(Context.of("my-header", "header-value"));
}

控制器方法

@GetMapping
public Mono<String> testHeaderPropagation() throws Exception {
  return Mono.deferContextual(reactorContext -> {
    Response response = grpcStub
      .withInterceptors(new GrpcClientInterceptor(reactorContext))
      .call(request);
    return Mono.just(response.getMessage());
  });
}

GrpcClientInterceptor

public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
  final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
  return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
    @Override
    public void start(Listener<RespT> responseListener, Metadata headers) {
      Metadata.Key < String > key =
          Metadata.Key.of("filter-context", Metadata.ASCII_STRING_MARSHALLER);
      headers.put(key, context.get("filter-context"));

      delegate().start(responseListener, headers);
    }
  };
}

我想简化我的 Controller 方法(删除反应器上下文的显式传入到 clientInterceptor)

grpcStub.call(request)

我相信 Spring Sleuth 有办法做到这一点,但不确定如何调整它的方法。 我错过了什么聪明的东西?

编辑

我推动包含最少控制器方法代码的版本的原因是因为其他开发人员将编写控制器和方法。如果可能的话,我想建立一个不需要额外布线的模式,否则有可能有人忘记做或做错了。

编辑

后续问题。我没有让控制器方法将 Context 传递给 clientInterceptor,而是尝试在 Grpc ClientInterceptor 中获取 Context,但这似乎不起作用。

这是我尝试做的事情

public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
  final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);

  return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
    @Override
    public void start(Listener<RespT> responseListener, Metadata headers) {

      Mono.deferContextual(context -> {
        Metadata.Key < String > key =
            Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
        headers.put(key, context.get("CONTEXT-HEADER"));

        delegate().start(responseListener, headers);

        return Mono.empty();
      }).subscribe();


    }
  };
}

但我得到一个错误

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.NoSuchElementException: 上下文不包含键 CONTEXT-HEADER

试图了解反应器管道为何在此处中断

【问题讨论】:

  • 是的,您的第二次尝试无法进行,因为Context 是在订阅时初始化的。这里调用subscribe 时没有提供上下文,所以它是空的。
  • post启发,我想出了一个解决方案。

标签: java grpc project-reactor grpc-java


【解决方案1】:

我想出了另一个不需要 WebFilter 或反应器上下文的解决方案。但我仍然更喜欢某种方式来进行“全面自动传播”。

在这个解决方案中,我只是将注入到控制器方法中的 ServerWebExchange 传递给客户端拦截器,该拦截器将读取标头。

@GetMapping
public Mono<String> testHeaderPropagation(ServerWebExchange exchange) throws Exception {
  MyGrpcStub grpcStubWithInterceptor = attachMetadata(grpcStub, exchange);
  Response response = grpcStub
    .call(request);
  return Mono.just(response.getMessage());
}

public static <S extends AbstractStub<S>> S attachMetadata(S stub, ServerWebExchange exchange) {
  return stub.withInterceptors(new GrpcClientInterceptor(exchange));
}

然后是我的新 GrpcClientInterceptor

public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
  HttpHeaders httpHeaders = exchange.getRequest().getHeaders();

  final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
  return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
    @Override
    public void start(Listener<RespT> responseListener, Metadata headers) {
      Metadata.Key < String > key =
          Metadata.Key.of("my-header", Metadata.ASCII_STRING_MARSHALLER);
      headers.put(key, httpHeaders.getFirst("my-header"));

      delegate().start(responseListener, headers);
    }
  };
}

【讨论】:

    【解决方案2】:

    grpc 和 Reactor 互不了解,因此 someone 必须翻译两者之间的上下文信息。您当前的方法是推荐的方法,即使考虑到 Sleuth(因为Context 的全面自动传播是一个混合包,对性能影响很大)。这是精确定位您的确切需求,万无一失,所以我会保留它。

    【讨论】:

    • 为了论证,您将如何创建“上下文的全面自动传播”?您能否提供一些代码以便我进行评估? ps-我找到了你的博客。很不错!也许你可以创建一个关于这个(以及它为什么不好)的博客条目
    • 我不确定这是否可能。从 grpc 到 reactor 的转换发生在控制器中,因此除了为您的开发人员提供帮助之外,您无能为力。另外,我怀疑您的代码可能存在严重错误:grpcStub.call 阻塞了吗?
    • 这个blog 似乎是解决此类问题的关键
    猜你喜欢
    • 1970-01-01
    • 2020-07-29
    • 1970-01-01
    • 2015-07-30
    • 2021-12-09
    • 1970-01-01
    • 2013-10-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多