【发布时间】:2021-01-07 02:43:36
【问题描述】:
我想改进定义如下的任务的逻辑,这些任务是在 Python 3 中使用 Django 框架实现的:
源数据已加载到我们的系统中,Job 实体定义了完全处理源所涉及的内容, Job_instance 定义了 Job 的实例。要求如下:
- 给定源名称,找到处理源的所有作业的最新上线日期。
- 同时输出正在处理的作业的整体状态
实体关系及表结构:
- “作业” - 这是一个实体,它定义了处理从外部方获取的源所涉及的内容。一个源被分成可以处理成多个作业的部分。表结构:
| Field name | description | type |
|---|---|---|
| id | id of job | int |
| name | name of job | string |
| source | name of source | string |
- “Job_instance” - 这是我们系统中已执行的作业实例。
| Field name | description | type |
|---|---|---|
| id | id of job instance | int |
| job_id | id of job | int |
| period | date string the source has been onboard to our system | string |
| updated_timestamp | last execution time and date for the job | timestamp |
| status | the status of the instance after run has been proceeded, can be one of any, e.g: success, failed, finished, created etc | string |
当每个作业的所有job_instance运行成功时,作业的整体状态显示为“Success”,否则显示为“Failure”
我现在的逻辑:
-
从源名称和缓存的 job_ids 中获取作业列表,并将布尔值作为映射。
job_ids = Job.objects.filter(Q(name__icontains=source) ).values_list('id', flat=True) """ Combine all Job Ids into one restriction """ """ Constructing where clause to match all job ids """ ids_filter = Q() for job_id in job_ids: ids_filter = ids_filter | Q(job__id=job_id) -
从作业 ID 中获取作业实例列表,并按周期和 update_timestamp 降序排序(这是从数据库中完成的)
completed_status_filter = Q() """ Only check job instances that have been completed """ for status in [status_dict.get('Failed'), status_dict.get('Finished')]: completed_status_filter = completed_status_filter | Q( status__exact=status) """ Find Finished or Failed job instances for job ids """ """ and sort by period_code and updated timestamp by decending order """ job_instances = JobInstance.objects \ .filter(ids_filter, completed_status_filter) \ .values('id', 'job__id', 'period_code', 'update_timestamp', 'status') \ .exclude(period_code='Ad Hoc') \ .order_by('-period_code', '-update_timestamp') -
为 job_instances 映射列表创建了一个周期。
def get_period_code_instances_map(self, job_instances): period_code_instance_map = {} for instance in job_instances.iterator(): instances =period_code_instance_map.get(instance['period_code']) if not instances: existing_instances = list() period_code_instance_map[instance['period_code']] \ = existing_instances existing_instances.append(instance) else: instances.append(instance) return period_code_instance_map -
如下遍历地图:
def get_last_period_code_at_curated( self, period_code_instances_map, job_ids, status_dict): target_period = None job_ids_verification_map = dict((key, False) for key in job_ids) """ Traverse instances ran on the same period_code """ for period_code, instances in period_code_instances_map.items(): for instance in instances: job_id = instance['job__id'] """ Set to True if job has not been visited""" if not job_ids_verification_map.get(job_id): job_ids_verification_map[job_id] = True """ If job id has not been visited after traversing """ """ reset job_id verifcation map """ if any(not checked for checked in job_ids_verification_map.values()): job_ids_verification_map = dict( (key, False) for key in job_ids) else: target_period = period_code break result = JobInstanceLatestPeriodCodeForSource() if not target_period: return result """ Once period_code is found, get the timestamp from 1st instance """ """ Evaluate status by check all statuses in instances """ instances = period_code_instances_map[target_period] result.period_code = target_period result.update_timestamp = instances[0]['update_timestamp'] failed_status = status_dict.get('Failed') if any(instance['status'] == failed_status for instance in instances): result.status = 'Failed' else: result.status = 'Finished' return result一个。迭代周期内的所有作业实例以查找是否所有作业都已运行。在这里,我使用了 job_id 和 boolean map 来跟踪该期间访问过的作业。
b.如果所有作业都已在该期间运行,则找到该期间。否则,继续遍历地图。
-
找到周期后,再次从地图中迭代作业实例列表以查找整体状态。
限制:
- 由于 job_instances 的列表可能非常大(> 100 万个条目),因此我在遍历数据库中的数据时使用了迭代器来创建映射。但欢迎提出任何优化建议。
- 有没有更有效的时间/空间方法来查找周期代码。我发现我必须使用嵌套的 for 循环来遍历,这给了我 O(n2)。
提前致谢
【问题讨论】:
-
你说你有工作代码,但没有分享它——所以很难说什么。如果您正在寻找代码审查和改进建议,codereview.stackexchange.com 是一个更好的地方。如果您有工作代码,但想知道如何修复/改进其中非常具体的部分,请分享并告诉我们您尝试了什么以及需要帮助。
-
您是否考虑过在 db 中缓存源的最后成功日期?然后你就不需要遍历整个源历史,只需要最近的源相关记录(如果你发现更近的成功周期,你可以更新缓存)
-
@Grismar,感谢您的回复,我已将源代码添加到不同的步骤中,我想要的优化是在第 4 步,甚至在创建地图时。
-
@pkuderov,这种方法不适用于我的情况,我需要具有不同状态的实例来计算整体状态。谢谢
标签: python database performance