【问题标题】:Intercepting/Logging requests and Responses in GRPC在 GRPC 中拦截/记录请求和响应
【发布时间】:2018-04-19 16:28:56
【问题描述】:

我正在使用 GRPC 开发一个聊天应用程序,其中服务器从客户端接收信息并将其发送回与其连接的所有客户端。为此,我使用 Saturnism's chat-example 作为参考。我已经复制了代码,代码可以编译并运行,但服务器应该永远不会收到来自客户端的任何请求。

我的问题是:

  1. 有没有办法在 GRPC 中启用 verbos 服务器端和客户端日志记录,以查看进出哪些请求和响应以及哪些可能失败?
  2. 我将以下代码用于服务器和客户端。以下代码中可能缺少/错误的内容会导致客户端和服务器之间没有通信。

WingokuServer.java

public class WingokuServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(8091)
                .intercept(recordRequestHeadersInterceptor())
                .addService(new WingokuServiceImpl())
                .build();

        System.out.println("Starting server...");
        server.start();
        System.out.println("Server started!");
        server.awaitTermination();
    }

WingokuServerSideServiceImplementation:

public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase {
    private static Set<StreamObserver<Response>> observers =
            Collections.newSetFromMap(new ConcurrentHashMap<>());

    public WingokuServiceImpl() {
        System.out.println("WingokuServiceImp");
    }

    @Override
    public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) {
        System.out.println("messages");
        observers.add(responseObserver);
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                System.out.println("Server onNext: ");
                System.out.println("request from client is: "+ request.getRequestMessage());
                Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
                for (StreamObserver<Response> observer : observers) {
                    observer.onNext(response);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Server onError: ");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                observers.remove(responseObserver);
                System.out.println("Server onCompleted ");
            }
        };
    }
}

WingokuClient:

public class WingokuClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
        WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
        StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() {
            @Override
            public void onNext(Response response) {
                System.out.println("Client onNext");
                System.out.println("REsponse from server is: "+ response.getResponseMessage());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Client onError");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("Client OnComplete");
            }
        });

        requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
        requestStreamObserver.onCompleted();
        channel.shutdown();
        System.out.println("exiting client");
    }
}

编辑:

代码没有问题。有用。我只需要将 awaitTermination 添加到客户端的通道,因为没有它只会立即关闭客户端和服务器之间的连接,甚至可能在请求从客户端传出到网络之前。这就是服务器从未收到任何请求的原因。

但是,我关于启用详细日志记录和/或向服务器端添加某种拦截器的问题仍未得到解答。所以我很期待在这里得到专家的指点。

【问题讨论】:

    标签: java android server grpc grpc-java


    【解决方案1】:

    多年后让我也回答这个问题(希望对谁会遇到同样的问题有用)。 我基本上以Shoohei的响应为例,尽量压缩它来解决。

    服务器拦截器

    public class ServerLogInterceptor implements ServerInterceptor {
    
    @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
    
        ServerCall<ReqT, RespT> listener = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
    
            @Override
            public void sendMessage(RespT message) {
                log.debug("Sending message to cliens: {}",  message);
                super.sendMessage(message);
            }
        };
    
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(listener, headers)) {
    
            @Override
            public void onMessage(ReqT message) {
                log.debug("Received message from cliens: {}", message);
                super.onMessage(message);
            }
    
        };
    }}
    

    客户端拦截器

        public class ClientLogInterceptor  implements ClientInterceptor {
    
        @Override
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions,
            Channel next
        ) {
            return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
                @Override
                public void sendMessage(ReqT message) {
                    log.debug("Sending message to modules: {}", message);
                    super.sendMessage(message);
                }
    
                @Override
                public void start(Listener<RespT> responseListener, Metadata headers) {
                    super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
    
                        @Override
                        public void onMessage(RespT message) {
                            log.debug("Received message from modules: {}", message);
                            super.onMessage(message);
                        }
    
                    }, headers);
                }
    
            };
        }
    
    }
    

    (我不确定我是否正确粘贴了代码,以防只是添加或删除一些括号)

    【讨论】:

      【解决方案2】:

      我找到了一种使用拦截器在服务器端和客户端记录请求和响应的方法,它使代码更清晰。 也可以使用 sleuth 进行追踪。

      请使用弹簧:

      implementation 'io.github.lognet:grpc-spring-boot-starter'
      

      服务器部分

      然后您可以使用 GRpcGlobalInterceptor 注释

      import io.grpc.Metadata;
      import io.grpc.MethodDescriptor;
      import io.grpc.ServerCall;
      import io.grpc.ServerCallHandler;
      import io.grpc.ServerInterceptor;
      import io.grpc.Status;
      import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
      
      @GRpcGlobalInterceptor
      public class GrpcInterceptor implements ServerInterceptor {
          private Logger logger = LoggerFactory.getLogger(this.getClass());
      
          public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
      
          @Override
          public <M, R> ServerCall.Listener<M> interceptCall(
                  ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) {
              String traceId = headers.get(TRACE_ID_KEY);
              // TODO: Add traceId to sleuth
              logger.warn("traceId from client: {}. TODO: Add traceId to sleuth", traceId);
      
              GrpcServerCall grpcServerCall = new GrpcServerCall(call);
      
              ServerCall.Listener listener = next.startCall(grpcServerCall, headers);
      
              return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) {
                  @Override
                  public void onMessage(M message) {
                      logger.info("Method: {}, Message: {}", methodName, message);
                      super.onMessage(message);
                  }
              };
          }
      
          private class GrpcServerCall<M, R> extends ServerCall<M, R> {
      
              ServerCall<M, R> serverCall;
      
              protected GrpcServerCall(ServerCall<M, R> serverCall) {
                  this.serverCall = serverCall;
              }
      
              @Override
              public void request(int numMessages) {
                  serverCall.request(numMessages);
              }
      
              @Override
              public void sendHeaders(Metadata headers) {
                  serverCall.sendHeaders(headers);
              }
      
              @Override
              public void sendMessage(R message) {
                  logger.info("Method: {}, Response: {}", serverCall.getMethodDescriptor().getFullMethodName(), message);
                  serverCall.sendMessage(message);
              }
      
              @Override
              public void close(Status status, Metadata trailers) {
                  serverCall.close(status, trailers);
              }
      
              @Override
              public boolean isCancelled() {
                  return serverCall.isCancelled();
              }
      
              @Override
              public MethodDescriptor<M, R> getMethodDescriptor() {
                  return serverCall.getMethodDescriptor();
              }
          }
      
          private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> {
      
              String methodName;
      
              protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) {
                  super(listener);
                  methodName = method.getFullMethodName();
              }
          }
      }
      

      客户端部分

      拦截器:

      import io.grpc.CallOptions;
      import io.grpc.Channel;
      import io.grpc.ClientCall;
      import io.grpc.ClientInterceptor;
      import io.grpc.Metadata;
      import io.grpc.MethodDescriptor;
      import io.grpc.Status;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.stereotype.Component;
      
      import java.util.concurrent.TimeUnit;
      
      import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
      
      @Component
      public class BackendInterceptor implements ClientInterceptor {
          private Logger logger = LoggerFactory.getLogger(this.getClass());
      
          public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
      
          @Override
          public <M, R> ClientCall<M, R> interceptCall(
                  final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) {
              return new BackendForwardingClientCall<M, R>(method,
                      next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) {
      
                  @Override
                  public void sendMessage(M message) {
                      logger.info("Method: {}, Message: {}", methodName, message);
                      super.sendMessage(message);
                  }
      
                  @Override
                  public void start(Listener<R> responseListener, Metadata headers) {
                      // TODO: Use the sleuth traceId instead of 999
                      headers.put(TRACE_ID_KEY, "999");
      
                      BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
                      super.start(backendListener, headers);
                  }
              };
          }
      
          private class BackendListener<R> extends ClientCall.Listener<R> {
      
              String methodName;
              ClientCall.Listener<R> responseListener;
      
              protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) {
                  super();
                  this.methodName = methodName;
                  this.responseListener = responseListener;
              }
      
              @Override
              public void onMessage(R message) {
                  logger.info("Method: {}, Response: {}", methodName, message);
                  responseListener.onMessage(message);
              }
      
              @Override
              public void onHeaders(Metadata headers) {
                  responseListener.onHeaders(headers);
              }
      
              @Override
              public void onClose(Status status, Metadata trailers) {
                  responseListener.onClose(status, trailers);
              }
      
              @Override
              public void onReady() {
                  responseListener.onReady();
              }
          }
      
          private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> {
      
              String methodName;
      
              protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) {
                  super(delegate);
                  methodName = method.getFullMethodName();
              }
          }
      }
      

      将拦截器添加到通道中:

      ManagedChannel managedChannel = ManagedChannelBuilder
                      .forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();
      

      【讨论】:

        【解决方案3】:

        另外,如果您想打印在服务器上看到的消息内容或标题,您可以创建一个 ServerInterceptor: https://grpc.io/grpc-java/javadoc/io/grpc/ServerInterceptor.html

        您可以查看有关 ServerInterceptor 和 ClientInterceptor 工作原理的示例目录。不存在记录网络事件的预先存在的拦截器。

        【讨论】:

          【解决方案4】:

          您可以在 Netty 传输中打开帧记录。首先,创建一个名为logging.properties 的文件。在文件中放入以下内容:

          handlers=java.util.logging.ConsoleHandler
          io.grpc.netty.level=FINE
          java.util.logging.ConsoleHandler.level=FINE
          java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
          

          然后使用 jvm 标志启动 Java 二进制文件 -Djava.util.logging.config.file=logging.properties

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2018-01-07
            • 2022-07-12
            • 2023-01-22
            • 1970-01-01
            • 1970-01-01
            • 2020-10-22
            相关资源
            最近更新 更多