【发布时间】:2021-10-09 03:39:06
【问题描述】:
我尝试在 FastAPI 上实现简单的 graphql 订阅。
根据文档,但它不起作用
import asyncio
import graphene
from fastapi import FastAPI
from starlette.graphql import GraphQLApp
from graphql.execution.executors.asyncio import AsyncioExecutor
from starlette.websockets import WebSocket
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
async def resolve_hello(self,info,name):
return "Hello " + name
class Subscription(graphene.ObjectType):
count = graphene.Int(upto=graphene.Int())
async def subscribe_count(root, info, upto=3):
for i in range(upto):
yield i
await asyncio.sleep(1)
app = FastAPI()
schema = graphene.Schema(query=Query, subscription=Subscription)
app.add_route("/", GraphQLApp(schema=schema, executor_class=AsyncioExecutor))
我搜索了一下,发现我可能需要为 Sanic 或 Aiohttp 实现订阅服务器
我试了,还是不行
from graphql_ws.websockets_lib import WsLibSubscriptionServer
subscription_server = WsLibSubscriptionServer(schema)
@app.websocket("/subscriptions")
async def subscriptions(websocket: WebSocket):
await subscription_server.handle(websocket)
return websocket
错误接收:
return self.ws.open is False
AttributeError: 'WebSocket' object has no attribute 'open'
我做错了什么以及如何解决?谢谢。
【问题讨论】:
标签: fastapi graphql-subscriptions