【问题标题】:Multiprocessing function in python scriptpython脚本中的多处理函数
【发布时间】:2017-01-19 15:46:39
【问题描述】:

我试图在我的脚本中的一个函数上使用多处理,我认为这是我的多处理结构和语法的问题。这里的目标是遍历一个列表并从 13 个不同的空间地理数据库中复制出光栅/tiff 图像。输出和输入都保存在单独的地理数据库中,以加快进程并防止崩溃。这是我的示例脚本:

import arcpy, time, os
from arcpy import env
import multiprocessing
from multiprocessing import pool

def copy_rasters_3b(P6_GDBs, x, c):
        mosaic_gdb = os.path.join('{}/mosaic_{}.gdb/{}'.format(P6_GDBs, x, c))
        final_gdb = os.path.join('{}/final_{}.gdb/{}'.format(P6_GDBs, x, c))
        final_tiff = os.path.join('{}/{}.tif'.format(P6_GDBs, c))
        print "---Copying Rasters Started---"
        start_time = time.time()
        arcpy.CopyRaster_management(mosaic_gdb, final_gdb, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "", "NONE")
        arcpy.CopyRaster_management(mosaic_gdb, final_tiff, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "TIFF", "NONE")
        print("--- "+ c + " Complete %s seconds ---" % (time.time() - start_time))

### Main ###
def main():
    P6_DIR= "D:/P6_SecondRun"
    P6_GDBs= "D:/P6_GDBs"
    Categories =['CRP', 'FORE', 'INR', 'IR', 'MO', 'PAS', 'TCI', 'TCT', 'TG', 'WAT', 'WLF', 'WLO', 'WLT']
    rasters = defaultdict(list)

    # Environments
    arcpy.env.overwriteOutput = True
    arcpy.env.snapRaster = "D:/A__Snap/Phase6_Snap.tif"
    arcpy.env.outputCoordinateSystem = arcpy.SpatialReference(102039) #arcpy.SpatialReference(3857)

    pool = multiprocessing.Pool(processes=5)

    for c, x in zip(Categories, range(1,14)):
        pool.map(copy_rasters_3b(P6_GDBs, x, c), 14)
        pool.close()

############### EXECUTE MAIN() ###############
if __name__ == "__main__":
    main()

此脚本在后台启动五个进程,但在前两个实例后最终失败。我正在运行 ArcMap 10.4 x64 并使用 Python27-64 位。

【问题讨论】:

    标签: python multiprocessing gis arcpy


    【解决方案1】:

    首先,您应该调用pool.map(一次),使用一个函数和一个为函数提供参数的迭代。这样做的原因是,实际的函数调用是在 子进程中进行的。相反,您实际上是直接调用该函数并将其结果传递给pool.map,这是没有用的。

    其次,您在循环内调用pool.close,然后在下一次迭代中再次调用pool.map。池对象的模式是将所有作业提交给它。然后在您完成等待所有作业完成后致电pool.close。在这种情况下,所有作业都将通过一个 .map 方法调用分配给它。

    您的代码可能如下所示:

    def copy_rasters_3b(arg_tuple):
        P6_GDBs, x, c = arg_tuple    # Break apart argument tuple
        ... # Rest same as before
    
    pool = multiprocessing.Pool(processes=5)
    pool.map(copy_rasters_3b, [(P6_GDBs, x, c) for x, c in enumerate(Categories)])
    pool.close()
    

    分解列表理解:Categories 是一个字符串列表,因此enumerate(Categories) 生成一组编号的元组(N, member),其中 N 是列表中的位置,member 是列表中的值。我们将这两者与每个结果元组中的常量值 P6_GDBs 结合起来。理解的最终结果是一个三元组列表。

    map 将在每次调用时将其可迭代参数中的单个元素提供给函数。但是,该单个元素是 3 元组,因此这里的函数需要将一个参数分解为其组成部分。

    最后,您应该删除以下导入行,因为它令人困惑且不必要。在这里,您导入名称 pool,但随后(从未使用过),您通过将 pool 作为变量名分配给它来覆盖它:

    from multiprocessing import pool
    [...]
    pool = multiprocessing.Pool(processes=5)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-07-01
      • 1970-01-01
      • 2019-05-09
      • 2019-08-14
      • 2017-11-17
      • 1970-01-01
      • 1970-01-01
      • 2015-09-11
      相关资源
      最近更新 更多