【问题标题】:How to add pagination for custom Apiview result data in DRF如何在 DRF 中为自定义 Apiview 结果数据添加分页
【发布时间】:2018-05-21 15:49:32
【问题描述】:

我在 DRF 3 中创建了一个 ListView。由于数据库结构复杂,我编写了一个自定义查询,即原始查询来获取数据。但是由于数据量很大,加载它需要很多时间。所以我需要实现分页。下面是代码。

class SubnetList(APIView):

    def get(self, request, format=None):
        subnet_list = get_subnet_details(None)
        return Response(subnet_list, status=status.HTTP_200_OK)

    def get_subnet_details(subnet_id=None):
        if subnet_id:
            subnets = Subnet.objects.filter(id=subnet_id).values()
        if not subnets:
            raise Http404    
        else:
            subnets = Subnet.objects.values()

        subnet_list = []
        subnet_option_query = ' SELECT  ' \
                              ' options_value.id as value_id,' \
                              ' options_value.content,' \
                              ' options_option.id,' \
                              ' options_option.name' \
                              ' FROM subnets_subnetoption' \
                              ' INNER JOIN subnets_subnet ON (subnets_subnetoption.subnet_id = subnets_subnet.id)' \
                              ' INNER JOIN options_value ON (subnets_subnetoption.value_id = options_value.id)' \
                              ' INNER JOIN options_option ON (options_value.option_id = options_option.id)' \
                              ' INNER JOIN options_scope ON (options_option.scope_id = options_scope.id)' \
                              ' WHERE subnets_subnetoption.subnet_id = %s '

        try:
            for subnet in subnets:
                subnet_dictionary = collections.OrderedDict()
                subnet_dictionary['subnet_id'] = str(subnet['id'])
                subnet_dictionary['name'] = str(subnet['name'])
                subnet_dictionary['base_address'] = str(subnet['base_address'])
                subnet_dictionary['bcast_address'] = str(subnet['bcast_address'])
                subnet_dictionary['bits'] = str(subnet['bits'])
                subnet_dictionary['begin'] = str(subnet['base_address'])
                subnet_dictionary['end'] = str(subnet['bcast_address'])
                subnet_dictionary['parent_id'] = str(subnet['parent_id'])
                subnet_dictionary['vlan_common_name'] = str(subnet['vlan_common_name'])
                subnet_dictionary['admin'] = str(subnet['admin'])
                subnet_dictionary['responsible'] = str(subnet['responsible'])
                subnet_dictionary['comments'] = str(subnet['comments'])

                super_subnet = 'True' if subnet['is_physical'] else 'False'
                subnet_dictionary['is_physical'] = super_subnet
                options = SubnetOption.objects.raw(subnet_option_query % (subnet['id']))

                physical_attributes = collections.OrderedDict()
                range_attributes = collections.OrderedDict()
                gatewaydevice_attributes = collections.OrderedDict()
                securityzone_attributes = collections.OrderedDict()
                for option in options:
                    if option.name.lower() == 'provider':
                        subnet_dictionary['provider'] = str(option.value_id)
                    elif option.name.lower() == 'customer':
                        subnet_dictionary['customer'] = str(option.value_id)
                    elif option.name == 'Allocated by RIPE/ARIN/APNIC':
                        subnet_dictionary['allocated_by'] = str(option.value_id)

                    if super_subnet == 'True':
                        if option.name.lower() == 'vlan usage':
                            physical_attributes['vlan_usage'] = str(option.value_id)
                        elif option.name.lower() == 'environment':
                            physical_attributes['environment'] = str(option.value_id)
                        elif option.name.lower() == 'status':
                            physical_attributes['status'] = str(option.value_id)
                        elif option.name.lower() == 'vrf':
                            physical_attributes['vrf'] = str(option.value_id)
                        elif option.name.lower() == 'dns on demand status':
                            physical_attributes['dns_on_demand_status'] = str(option.value_id)

                        dmz = 'True' if subnet['dmz'] else 'False'
                        physical_attributes['dmz'] = dmz            
        except ValueError as e:
            print(e)
            pass
        return subnet_list

【问题讨论】:

  • 缩进错误,我尝试修复它。当它是一个空列表时,返回subnet_list 有什么意义。也调用get_subnet_details(None) 没有意义。这意味着永远不会传递真正的subnet_id,也不需要传递None

标签: django python-3.x django-rest-framework


【解决方案1】:

传入更多参数(如果需要,您可以将它们设为可选) - 例如 pagepagesize。然后使用它们来限制查询。对于 MySQL,它将类似于 LIMIT (pagesize * page),page。但是,具体情况会因数据库引擎而异。

所以,使用 Django ORM。如果没有看到模型,我不能肯定地说,但我相当肯定你可以将查询重写为简单的 Django ORM 查询:

SubnetOption.objects.filter(subnet=subnet['id'])

并且各种 INNER JOIN 应该自动通过。

然后,您可以在查询末尾使用范围行 [page*pagesize:(page+1)*pagesize],Django 会将其转换为 LIMIT 或您的数据库引擎的等价物。

【讨论】:

  • 感谢您的回复顺便说一句,我在 url 中添加了额外的参数 ?page= ,然后我在请求元参数中得到了相同的结果。该值用于查询中的范围行。从而解决了我的问题:)
【解决方案2】:

首先,在您的应用中创建一个名为 pagination.py 的文件。然后复制这些代码:

class LargeResultsSetPagination(PageNumberPagination):
    page_size = 1000
    page_size_query_param = 'page_size'
    max_page_size = 10000

class StandardResultsSetPagination(PageNumberPagination):
    page_size = 100
    page_size_query_param = 'page_size'
    max_page_size = 1000

之后,您可以使用这些类,如下例所示:

class BillingRecordsView(generics.ListAPIView):
    queryset = Billing.objects.all()
    serializer_class = BillingRecordsSerializer
    pagination_class = LargeResultsSetPagination

【讨论】:

  • 他使用的是 API 视图,而不是 ListAPIView。这不适用于他的情况。
猜你喜欢
  • 2018-06-12
  • 2021-11-25
  • 2020-11-07
  • 2017-08-26
  • 1970-01-01
  • 2020-03-22
  • 1970-01-01
  • 2017-07-27
  • 2015-09-21
相关资源
最近更新 更多