【问题标题】:graphql-java - How to use subscriptions with spring boot?graphql-java - 如何在 Spring Boot 中使用订阅?
【发布时间】:2017-10-31 09:37:53
【问题描述】:

在一个项目中,我将 graphql-javaspring boot 与 postgreSQL 数据库一起使用。现在我想使用3.0.0版本发布的subscription feature。遗憾的是,关于订阅功能应用的信息还不是很成熟。

如何使用graphql-java 和订阅实现实时功能

【问题讨论】:

  • 您找到解决方案了吗?

标签: java spring-boot graphql graphql-java


【解决方案1】:

我遇到了同样的问题,我在 lib 上添加了与 spring boot 集成。我找到了graphql-java,但是,它似乎只支持模式级别的“订阅”,它不对该功能执行任何跨国支持。这意味着您可能需要自己实现它。

请参考https://github.com/graphql-java/graphql-java/blob/master/docs/schema.rst#subscription-support

【讨论】:

    【解决方案2】:

    从最近的 graphql-java 版本开始,完全支持订阅。订阅的DataFetcher 必须返回org.reactivestreams.Publisher,graphql-java 将负责将查询函数映射到结果。

    这个功能很好 documenteda complete example 使用官方 repo 中提供的网络套接字。

    如果您有一个响应式数据源(例如,带有响应式驱动程序的 Mongo,或者可能是 R2DBC 支持的任何东西),那么您就一切就绪。只需使用@Tailable,Spring Data 就会为您提供Flux(实现Publisher),您无需执行任何其他操作。

    至于更手动的 Spring 特定实现,我无法想象使用 Spring's own event mechanism(也是一个很好的教程 here)来支持 Publisher 太难了。

    每次有传入订阅时,使用应用程序上下文创建并注册一个新侦听器:context.addApplicationListener(listener),它将发布到正确的Publisher。例如。在DataFetcher:

    // Somehow create a publisher, probably using Spring's Reactor project. Or RxJava.
    Publisher<ResultObject> publisher = ...; 
    //The listener reacts on application events and pushes new values through the publisher
    ApplicationListener listener = createListener(publisher);
    context.addApplicationListener(listener);
    return publisher;
    

    当 Web 套接字断开连接或您以某种方式知道事件流已完成时,您必须确保移除侦听器。

    请注意,我实际上没有尝试过任何这些,我只是在大声思考。

    另一种选择是直接使用 Reactor(有或没有 Spring WebFlux)。有一个使用 Reactor 和 WebSocket 的示例(通过GraphQL SPQR Spring Boot Starterhere

    您像这样创建Publisher

    //This is really just a thread-safe wrapper around Map<String, Set<FluxSink<Task>>>
    private final ConcurrentMultiRegistry<String, FluxSink<Task>> subscribers = new ConcurrentMultiRegistry<>();
    
    @GraphQLSubscription
    public Publisher<Task> taskStatusChanged(String taskId) {
        return Flux.create(subscriber -> subscribers.add(taskId, subscriber.onDispose(() -> subscribers.remove(taskId, subscriber))), FluxSink.OverflowStrategy.LATEST);
    }
    

    然后从其他地方(可能是相关的突变或反应性存储)推送新值,如下所示:

    subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));
    

    例如

    @GraphQLMutation
    public Task updateTask(@GraphQLNonNull String taskId, @GraphQLNonNull Status status) {
        Task task = repo.byId(taskId); //find the task
        task.setStatus(status); //update the task
        repo.save(task); //persist the task
        //Notify all the subscribers following this task
        subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));
        return task;
    }
    

    使用 SPQR Spring Starter,这就是获得与 Apollo 兼容的订阅实现所需的全部内容。

    【讨论】:

    • ConcurrentMultiRegistry 在运行多个实例的应用程序中如何以及在何处存在?每个实例是否都需要维护自己的订阅者并通知他们?
    • @raga 扩展 GraphQL 订阅非常困难,因为它总是涉及维护连接和状态。由于订阅是客户端发起的,并创建到节点的持久连接,我认为每个节点都必须维护自己的订阅者注册表,客户端连接到它。
    • 知道了,谢谢。如果这样的节点出现故障,我认为客户端需要有办法知道它的故障并重新订阅另一个正在运行的节点,对吗?
    • @raga 是的。当节点宕机时,客户端的连接会断开,客户端立即尝试重新订阅是很常见的。我希望服务器上有一个反向代理,可以透明地确保将连接定向到活动节点。
    【解决方案3】:

    记录一下:这是另一个非常好的、紧凑的示例,它实现了 GraphQL 的基本功能查询、突变和订阅:https://github.com/npalm/blog-graphql-spring-service

    【讨论】:

      猜你喜欢
      • 2018-08-06
      • 2023-01-31
      • 2022-10-12
      • 2020-01-07
      • 2018-02-25
      • 2019-07-14
      • 2020-05-16
      • 2019-11-27
      • 2019-09-27
      相关资源
      最近更新 更多