【问题标题】:Error with asynchronous request in DRFDRF 中的异步请求错误
【发布时间】:2018-05-16 14:04:02
【问题描述】:

我需要满足对两个服务的请求。 代码如下所示:

async def post1(data):
    response = await aiohttp.request('post', 'http://', json=data)
    json_response = await response.json()
    response.close()
    return json_response

async def get2():
    response = await aiohttp.request('get', 'http://')
    json_response = await response.json()
    response.close()
    return json_response

async def asynchronous(parameters):

    task1 = post1(parameters['data'])
    task2 = get2()

    result_list = []
    for body in await asyncio.gather(task1, task2):
        result_list.append(body)
    return result_list

如果我在本地运行代码,就可以了。代码如下所示:

if __name__ == "__main__":
    ioloop = asyncio.get_event_loop()
    parameters = {'data': data}
    result = ioloop.run_until_complete(asynchronous(parameters))
    ioloop.close()
    print(result)

我得到了正确的结果。但是如果我尝试从 DRF 方法执行代码,就会出现错误:

TypeError: object _SessionRequestContextManager can't be used in “等待”表达式

我运行的示例代码:

 .....
 class MyViewSet(GenericAPIView):
    def post(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        ......

        ioloop = asyncio.get_event_loop()
        result = ioloop.run_until_complete(asynchronous(serializer.data)) # <<<<< error here
        ioloop.close()

        ......
        return Response(serializer.data, status=status.HTTP_201_CREATED)

请告诉我可能是什么问题?

【问题讨论】:

    标签: python django asynchronous django-rest-framework python-asyncio


    【解决方案1】:

    aiohttp.request 返回的对象不能等待,它必须用作异步上下文管理器。这段代码:

    response = await aiohttp.request('post', 'http://', json=data)
    json_response = await response.json()
    response.close()
    

    需要改为:

    async with aiohttp.request('post', 'http://', json=data) as response:
        json_response = await response.json()
    

    有关更多使用示例,请参阅documentation

    也许您在运行 DRF 的服务器上有不同的 aiohttp 版本,这就是它在本地工作并在 DRF 下失败的原因。

    【讨论】:

      【解决方案2】:

      试试https://github.com/Skorpyon/aiorestframework

      创建一些用于身份验证的中间件:

      import re
      from xml.etree import ElementTree as etree
      from json.decoder import JSONDecodeError
      from multidict import MultiDict, MultiDictProxy
      
      from aiohttp import web
      from aiohttp.hdrs import (
          METH_POST, METH_PUT, METH_PATCH, METH_DELETE
      )
      
      from aiorestframework import exceptions
      from aiorestframework omport serializers
      from aiorestframework import Response
      from aiorestframework.views import BaseViewSet
      from aiorestframework.permissions import set_permissions, AllowAny
      from aiorestframework.app import APIApplication
      
      from my_project.settings import Settings  # Generated by aiohttp-devtools
      
      
      TOKEN_RE = re.compile(r'^\s*BEARER\s{,3}(\S{64})\s*$')
      
      async def token_authentication(app, handler):
          """
          Authorization middleware
          Catching Authorization: BEARER <token> from request headers
          Found user in Tarantool by token and bind User or AnonymousUser to request
          """
          async def middleware_handler(request):
              # Check that `Authorization` header exists
              if 'authorization' in request.headers:
                  authorization = request.headers['authorization']
                  # Check matches in header value
                  match = TOKEN_RE.match(authorization)
                  if not match:
                      setattr(request, 'user', AnonymousUser())
                      return await handler(request)
                  else:
                      token = match[1]
              elif 'authorization_token' in request.query:
                  token = request.query['authorization_token']
              else:
                  setattr(request, 'user', AnonymousUser())
                  return await handler(request)
      
              # Try select user auth record from Tarantool by token index
              res = await app['tnt'].select('auth', [token, ])
              cached = res.body
              if not cached:
                  raise exceptions.AuthenticationFailed()
      
              # Build basic user data and bind it to User instance
              record = cached[0]
              user = User()
              user.bind_cached_tarantool(record)
      
              # Add User to request
              setattr(request, 'user', user)
      
              return await handler(request)
          return middleware_handler
      

      对于从请求中提取数据:

      DATA_METHODS = [METH_POST, METH_PUT, METH_PATCH, METH_DELETE]
      JSON_CONTENT = ['application/json', ]
      XML_CONTENT = ['application/xml', ]
      FORM_CONTENT = ['application/x-www-form-urlencoded', 'multipart/form-data']
      
      async def request_data_handler(app, handler):
          """
          Request .data middleware
          Try extract POST data or application/json from request body
          """
          async def middleware_handler(request):
              data = None
              if request.method in DATA_METHODS:
                  if request.has_body:
                      if request.content_type in JSON_CONTENT:
                          # If request has body - try to decode it to JSON
                          try:
                              data = await request.json()
                          except JSONDecodeError:
                              raise exceptions.ParseError()
                      elif request.content_type in XML_CONTENT:
                          if request.charset is not None:
                              encoding = request.charset
                          else:
                              encoding = api_settings.DEFAULT_CHARSET
                          parser = etree.XMLParser(encoding=encoding)
                          try:
                              text = await request.text()
                              tree = etree.XML(text, parser=parser)
                          except (etree.ParseError, ValueError) as exc:
                              raise exceptions.ParseError(
                                  detail='XML parse error - %s' % str(exc))
                          data = tree
                      elif request.content_type in FORM_CONTENT:
                          data = await request.post()
      
              if data is None:
                  # If not catch any data create empty MultiDictProxy
                  data = MultiDictProxy(MultiDict())
      
              # Attach extracted data to request
              setattr(request, 'data', data)
      
              return await handler(request)
          return middleware_handler
      

      创建几个序列化器:

      class UserRegisterSerializer(s.Serializer):
          """Register new user"""
          email = s.EmailField(max_length=256)
          password = s.CharField(min_length=8, max_length=64)
          first_name = s.CharField(min_length=2, max_length=64)
          middle_name = s.CharField(default='', min_length=2, max_length=64, 
                                    required=False, allow_blank=True)
          last_name = s.CharField(min_length=2, max_length=64)
          phone = s.CharField(max_length=32, required=False,
                              allow_blank=True, default='')
      
          async def register_user(self, app):
              user = User()
              data = self.validated_data
              try:
                  await user.register_user(data, app)
              except Error as e:
                  resolve_db_exception(e, self)
             return user
      

      还有一些 ViewSet。可能嵌套在bindings['custom']

      class UserViewSet(BaseViewSet):
          name = 'user'
          lookup_url_kwarg = '{user_id:[0-9a-f]{32}}'
          permission_classes = [AllowAny, ]
          bindings = {
              'list': {
                  'retrieve': 'get',
                  'update': 'put'
              },
              'custom': {
                  'list': {
                      'set_status': 'post',
                      'create_new_sip_password': 'post',
                      'get_registration_domain': 'get',
                      'report': UserReportViewSet
                  }
              }
          }
      
          @staticmethod
          async def resolve_sip_host(data, user, app):
              sip_host = await resolve_registration_switch(user, app)
              data.update({'sip_host': sip_host})
      
          async def retrieve(self, request):
              user = User()
              await user.load_from_db(request.match_info['user_id'], request.app)
      
              serializer = user_ser.UserProfileSerializer(instance=user)
              data = serializer.data
              await self.resolve_sip_host(data, user, request.app)
              return Response(data=data)
      
          @atomic
          @set_permissions([AuthenticatedOnly, IsCompanyMember, CompanyIsEnabled])
          async def update(self, request):
              serializer = user_ser.UserProfileSerializer(data=request.data)
              serializer.is_valid(raise_exception=True)
              user = await serializer.update_user(user_id=request.user.id,
                                                  app=request.app)
      
              serializer = user_ser.UserProfileSerializer(instance=user)
              data = serializer.data
              await self.resolve_sip_host(data, user, request.app)
      
              return Response(data=data)
      

      注册视图集并运行应用程序:

      def setup_routes(app: APIApplication):
          """Add app routes here"""
          # Auth API
          app.router.register_viewset('/auth', auth_vws.AuthViewSet())
          # User API
          app.router.register_viewset('/user', user_vws.UserViewSet())
          # Root redirection to Swagger
          redirect = app.router.add_resource('/', name='home_redirect')
          redirect.add_route('*', swagger_redirect)
      
      
      def create_api_app():
          sentry = get_sentry_middleware(settings.SENTRY_CONNECT_STRING, settings.SENTRY_ENVIRONMENT)
          middlewares = [sentry, token_authentication, request_data_handler]
          api_app = APIApplication(name='api', middlewares=middlewares, 
      client_max_size=10*(1024**2))
      
          api_app.on_startup.append(startup.startup_api)
          api_app.on_shutdown.append(startup.shutdown_api)
          api_app.on_cleanup.append(startup.cleanup_api)
      
          setup_routes(api_app)
      
      if __name__ == '__main__':
          app = create_api_app()
          web.run_app(app)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2013-03-20
        • 1970-01-01
        • 2018-07-12
        • 2015-08-30
        • 2019-05-13
        • 2018-02-18
        • 2020-04-13
        • 1970-01-01
        相关资源
        最近更新 更多