【发布时间】:2020-10-26 07:56:51
【问题描述】:
我需要多线程概念方面的帮助。我不太了解这个概念。你能帮我做这件事吗?
当前情景:
Job seq_num
A 1
B 1
C 2
D 2
在上表中,您可以看到我们根据dependency.seq_num 对作业进行分类,2 个作业依赖于 seq_num-1 个作业。这意味着第一个 seq_num 1 将触发,然后 2 将触发。各个 seq_num 中的所有作业都将并行运行。
Suppose Job A = 10 mins
Job B = 15 mins
所以 seq_num -1 的总完成时间是 15 分钟。 15分钟后。 seq_num =2 将开始。
理想的场景:
Job Job_Type seq_num
A independent 1
B independent 1
C A 2
D B 2
在上表中,您可以看到作业 C 依赖于作业 A,而 D 依赖于作业 B。在之前的场景中,seq_num 是在 seq_num = 1 完成后开始的。但在这种情况下,我想要不同的方法。
Job A = 10 mins
Job B = 15 mins
作业 C 依赖于 A,因此在作业 A 完成后,作业 C 将立即启动。它不应等待作业 B 的完成。 就像这个 D 依赖于 B 所以在 Job B 完成后,Job D 会立即开始。
目前我正在使用多线程概念来根据 seq_num 运行整个过程。但我想要不依赖于 seq_num 的理想场景。如何等待依赖进程直到其父进程完成? 我也在分享我的代码。请让我知道我需要在哪里更改代码以获得理想的场景。如果您需要更多信息,请告诉我。
代码:
def parallel_Execution():
logging.info("parallel_Execution..................[started]")
par_temp_loc = '/medaff/Temp/'
'''Reading the metadata file and creating as a dataframe'''
df = pd.read_csv(par_temp_loc+'metadata_file_imedical.txt', delimiter='|',error_bad_lines=False)
uni_master_job = df['Master Job Name'].unique().tolist()
print(uni_master_job)
'''getting unique execution sequence'''
logging.info("Getting the unique Execution Sequence Number!")
unique_exec_seq = df['Execution Sequence'].unique().tolist()
unique_exec_seq.sort()
print(unique_exec_seq)
num_unique_seq = len(unique_exec_seq)
logging.info("Total Number of unique sequence Number : %2d" %(num_unique_seq))
p2 = ThreadWithReturnValue(target = partial(parallel_temp2, unique_exec_seq, df ))
p2.start()
r2 = p2.join()
print(r2)
#r1 = r1.append(r2)
mail_df(r2)
'''Parallel Processing Function'''
def parallel_temp2(unique_exec_seq, df):
list_df = []
df_main4 = pd.DataFrame()
for exec_seq in unique_exec_seq:
seq_num = exec_seq
temp_df = df[df['Execution Sequence'] == exec_seq].copy()
unique_master_job = temp_df['Master Job Name'].unique().tolist()
print(unique_master_job)
#logging.info("%s Master Job Started." %(unique_master_job))
if(len(unique_master_job)>0):
num_processes = len(unique_master_job)
pool = ThreadPool(processes=num_processes)
result1 = pool.map(partial(parallel_view_creation, exec_seq, temp_df), unique_master_job)
pool.close()
pool.join()
df_main = pd.DataFrame(result1)
#print("printing df_main")
#print(df_main)
for m_job in df_main.master_job.unique():
temp_df1 = df_main[df_main['master_job'] == m_job]
status = temp_df1.status.unique()[0]
if(status == 0):
unique_master_job.remove(m_job)
pool = ThreadPool(processes=num_processes)
result2 = pool.map(partial(parallel_build_query, exec_seq, temp_df), unique_master_job)
pool.close()
pool.join()
if(result2):
df_main2 = pd.DataFrame(result2)
df_main3 = pd.concat([df_main,df_main2], sort = False)
status_df_list = df_main3['status'].unique().tolist()
print(status_df_list)
if(0 in status_df_list):
break
if(0 in status_df_list):
break
else:
df_main4 = df_main4.append(df_main3)
if(0 in status_df_list):
df_main4 = df_main4.append(df_main3)
return df_main4
代码说明:
首先,我正在读取包含有关作业和 seq_num 的所有信息的元数据文件。 然后我接受了独特的工作和独特的 seq_num。 传入 ThreadWithReturnValue 函数。
在基于 seq_num 的 Parallel_temp2 函数中,我正在触发作业。
提前致谢!!
【问题讨论】:
-
作业数量是固定的还是可以变化的?如果您总是处理 4 个作业,您可以生成 4 个线程并使用 conditions 来检查哪一个可以继续处理。如果工作数量不同,这可能会变得更加复杂
-
嗨@urban。这只是我写的例子。这次我在做 50 份工作。
-
@urban,你能帮我在我的代码上实现以下逻辑吗?
标签: python multithreading multiprocessing threadpool