从最近的 graphql-java 版本开始,完全支持订阅。订阅的DataFetcher 必须返回org.reactivestreams.Publisher,graphql-java 将负责将查询函数映射到结果。
这个功能很好 documented 和 a complete example 使用官方 repo 中提供的网络套接字。
如果您有一个响应式数据源(例如,带有响应式驱动程序的 Mongo,或者可能是 R2DBC 支持的任何东西),那么您就一切就绪。只需使用@Tailable,Spring Data 就会为您提供Flux(实现Publisher),您无需执行任何其他操作。
至于更手动的 Spring 特定实现,我无法想象使用 Spring's own event mechanism(也是一个很好的教程 here)来支持 Publisher 太难了。
每次有传入订阅时,使用应用程序上下文创建并注册一个新侦听器:context.addApplicationListener(listener),它将发布到正确的Publisher。例如。在DataFetcher:
// Somehow create a publisher, probably using Spring's Reactor project. Or RxJava.
Publisher<ResultObject> publisher = ...;
//The listener reacts on application events and pushes new values through the publisher
ApplicationListener listener = createListener(publisher);
context.addApplicationListener(listener);
return publisher;
当 Web 套接字断开连接或您以某种方式知道事件流已完成时,您必须确保移除侦听器。
请注意,我实际上没有尝试过任何这些,我只是在大声思考。
另一种选择是直接使用 Reactor(有或没有 Spring WebFlux)。有一个使用 Reactor 和 WebSocket 的示例(通过GraphQL SPQR Spring Boot Starter)here。
您像这样创建Publisher:
//This is really just a thread-safe wrapper around Map<String, Set<FluxSink<Task>>>
private final ConcurrentMultiRegistry<String, FluxSink<Task>> subscribers = new ConcurrentMultiRegistry<>();
@GraphQLSubscription
public Publisher<Task> taskStatusChanged(String taskId) {
return Flux.create(subscriber -> subscribers.add(taskId, subscriber.onDispose(() -> subscribers.remove(taskId, subscriber))), FluxSink.OverflowStrategy.LATEST);
}
然后从其他地方(可能是相关的突变或反应性存储)推送新值,如下所示:
subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));
例如
@GraphQLMutation
public Task updateTask(@GraphQLNonNull String taskId, @GraphQLNonNull Status status) {
Task task = repo.byId(taskId); //find the task
task.setStatus(status); //update the task
repo.save(task); //persist the task
//Notify all the subscribers following this task
subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));
return task;
}
使用 SPQR Spring Starter,这就是获得与 Apollo 兼容的订阅实现所需的全部内容。