【问题标题】:How to improve the performance of traversing a large dataset如何提高遍历大型数据集的性能
【发布时间】:2021-01-07 02:43:36
【问题描述】:

我想改进定义如下的任务的逻辑,这些任务是在 Python 3 中使用 Django 框架实现的:

源数据已加载到我们的系统中,Job 实体定义了完全处理源所涉及的内容, Job_instance 定义了 Job 的实例。要求如下:

  1. 给定源名称,找到处理源的所有作业的最新上线日期。
  2. 同时输出正在处理的作业的整体状态

实体关系及表结构:

  1. “作业” - 这是一个实体,它定义了处理从外部方获取的源所涉及的内容。一个源被分成可以处理成多个作业的部分。表结构:
Field name description type
id id of job int
name name of job string
source name of source string
  1. “Job_instance” - 这是我们系统中已执行的作业实例。
Field name description type
id id of job instance int
job_id id of job int
period date string the source has been onboard to our system string
updated_timestamp last execution time and date for the job timestamp
status the status of the instance after run has been proceeded, can be one of any, e.g: success, failed, finished, created etc string

当每个作业的所有job_instance运行成功时,作业的整体状态显示为“Success”,否则显示为“Failure”

我现在的逻辑:

  1. 从源名称和缓存的 job_ids 中获取作业列表,并将布尔值作为映射。

    job_ids = Job.objects.filter(Q(name__icontains=source)
    ).values_list('id', flat=True)
    
    """ Combine all Job Ids into one restriction """
    """ Constructing where clause to match all job ids """
    ids_filter = Q()
    for job_id in job_ids:
        ids_filter = ids_filter | Q(job__id=job_id)
    
  2. 从作业 ID 中获取作业实例列表,并按周期和 update_timestamp 降序排序(这是从数据库中完成的)

    completed_status_filter = Q()
    """ Only check job instances that have been completed """
    for status in [status_dict.get('Failed'), status_dict.get('Finished')]:
        completed_status_filter = completed_status_filter | Q(
            status__exact=status)
    
    """ Find Finished or Failed job instances for job ids """
    """ and sort by period_code and updated timestamp by decending order """
    job_instances = JobInstance.objects \
        .filter(ids_filter, completed_status_filter) \
        .values('id', 'job__id', 'period_code', 'update_timestamp', 'status') \
        .exclude(period_code='Ad Hoc') \
        .order_by('-period_code', '-update_timestamp')
    
  3. 为 job_instances 映射列表创建了一个周期。

    def get_period_code_instances_map(self, job_instances):
        period_code_instance_map = {}
        for instance in job_instances.iterator():
           instances =period_code_instance_map.get(instance['period_code'])
           if not instances:
             existing_instances = list()
             period_code_instance_map[instance['period_code']] \
                = existing_instances
            existing_instances.append(instance)
           else:
            instances.append(instance)
    
       return period_code_instance_map
    
  4. 如下遍历地图:

    def get_last_period_code_at_curated(
        self, period_code_instances_map, job_ids, status_dict):
    
    target_period = None
    job_ids_verification_map = dict((key, False) for key in job_ids)
    """ Traverse instances ran on the same period_code """
    for period_code, instances in period_code_instances_map.items():
        for instance in instances:
            job_id = instance['job__id']
            """ Set to True if job has not been visited"""
            if not job_ids_verification_map.get(job_id):
                job_ids_verification_map[job_id] = True
    
        """ If job id has not been visited after traversing """
        """ reset job_id verifcation map """
        if any(not checked for checked in job_ids_verification_map.values()):
            job_ids_verification_map = dict(
                (key, False) for key in job_ids)
        else:
            target_period = period_code
            break
    
    result = JobInstanceLatestPeriodCodeForSource()
    if not target_period:
        return result
    
    """ Once period_code is found, get the timestamp from 1st instance """
    """ Evaluate status by check all statuses in instances """
    instances = period_code_instances_map[target_period]
    result.period_code = target_period
    result.update_timestamp = instances[0]['update_timestamp']
    failed_status = status_dict.get('Failed')
    if any(instance['status'] == failed_status for instance in instances):
        result.status = 'Failed'
    else:
        result.status = 'Finished'
    
    return result
    

    一个。迭代周期内的所有作业实例以查找是否所有作业都已运行。在这里,我使用了 job_id 和 boolean map 来跟踪该期间访问过的作业。

    b.如果所有作业都已在该期间运行,则找到该期间。否则,继续遍历地图。

  5. 找到周期后,再次从地图中迭代作业实例列表以查找整体状态。

限制:

  • 由于 job_instances 的列表可能非常大(> 100 万个条目),因此我在遍历数据库中的数据时使用了迭代器来创建映射。但欢迎提出任何优化建议。
  • 有没有更有效的时间/空间方法来查找周期代码。我发现我必须使用嵌套的 for 循环来遍历,这给了我 O(n2)。

提前致谢

【问题讨论】:

  • 你说你有工作代码,但没有分享它——所以很难说什么。如果您正在寻找代码审查和改进建议,codereview.stackexchange.com 是一个更好的地方。如果您有工作代码,但想知道如何修复/改进其中非常具体的部分,请分享并告诉我们您尝试了什么以及需要帮助。
  • 您是否考虑过在 db 中缓存源​​的最后成功日期?然后你就不需要遍历整个源历史,只需要最近的源相关记录(如果你发现更近的成功周期,你可以更新缓存)
  • @Grismar,感谢您的回复,我已将源代码添加到不同的步骤中,我想要的优化是在第 4 步,甚至在创建地图时。
  • @pkuderov,这种方法不适用于我的情况,我需要具有不同状态的实例来计算整体状态。谢谢

标签: python database performance


【解决方案1】:

找到了解决方案,由于处理结果集可能需要一些时间,所以我要求上游用户每 5 秒轮询一次,直到计算出结果。

【讨论】:

    猜你喜欢
    • 2015-05-12
    • 1970-01-01
    • 1970-01-01
    • 2022-01-05
    • 1970-01-01
    • 2011-04-13
    • 2012-07-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多