【问题标题】:Embarassingly Parallel DB Update Using Python (PostGIS/PostgreSQL)使用 Python (PostGIS/PostgreSQL) 进行令人尴尬的并行数据库更新
【发布时间】:2011-09-20 14:32:53
【问题描述】:

我需要更新空间数据库中的每条记录,其中我有一个点数据集覆盖多边形数据集。对于每个点要素,我想分配一个键以将其与它所在的多边形要素相关联。因此,如果我的点“纽约市”位于多边形 USA 内,并且对于美国多边形“GID = 1”,我将为我的点纽约市分配“gid_fkey = 1”。

为此,我创建了以下查询。

procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

目前,我从另一个查询中获取 cityID 信息,该查询只选择 gid_fkey 为 NULL 的所有 cityID。本质上,我只需要遍历这些并运行前面显示的查询。由于查询仅依赖于另一个表中的静态信息,理论上所有这些过程都可以一次运行。我已经实现了下面的线程过程,但我似乎无法迁移到多处理

import psycopg2, pprint, threading, time, Queue

queue = Queue.Queue()
pyConn = psycopg2.connect("dbname='geobase_1' host='localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()

getGID = 'SELECT cityID FROM city'
pyCursor1.execute(getGID)
gidList = pyCursor1.fetchall()

class threadClass(threading.Thread):

def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

def run(self):

        while True:
            gid = self.queue.get()

            procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

            pyCursor2 = pyConn.cursor()                         
            pyCursor2.execute(procQuery)

            print gid[0]                    
            print 'Done'

def main():

    for i in range(4):
        t = threadClass(queue)
        t.setDaemon(True)
        t.start()

        for gid in gidList:
            queue.put(gid)

    queue.join()

main()

我什至不确定多线程是否是最佳的,但它肯定比一个接一个地更快。

我将使用的机器有四个内核(四核)和一个没有 GUI、PostgreSQL、PostGIS 和 Python 的最小 Linux 操作系统,如果这会有所不同的话。

我需要进行哪些更改才能启用这个极其简单的多处理任务?

【问题讨论】:

    标签: python postgresql multiprocessing postgis


    【解决方案1】:

    好的,这是对我自己帖子的回答。干得好=D

    在我的系统上从单核线程到四核多处理,速度提高了大约 150%。

    import multiprocessing, time, psycopg2
    
    class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
    
    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return
    
    
    class Task(object):
    def __init__(self, a):
        self.a = a
    
    def __call__(self):        
        pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        pyConn.set_isolation_level(0)
        pyCursor1 = pyConn.cursor()
    
            procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
    
        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a
    
        return self.a
    
    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'
    
    if __name__ == '__main__':
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    num_consumers = multiprocessing.cpu_count() * 2
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
    for w in consumers:
        w.start()
    
    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConnX.set_isolation_level(0)
    pyCursorX = pyConnX.cursor()
    
    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
    temp = pyCursorX.fetchall()    
    num_job = temp[0]
    num_jobs = num_job[0]
    
    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
    cityIdListTuple = pyCursorX.fetchall()    
    
    cityIdList = []
    
    for x in cityIdListTuple:
        cityIdList.append(x[0])
    
    
    for i in xrange(num_jobs):
        tasks.put(Task(cityIdList[i - 1]))
    
    for i in xrange(num_consumers):
        tasks.put(None)
    
    while num_jobs:
        result = results.get()
        print result
        num_jobs -= 1
    

    现在我在这里发布了另一个问题:

    Create DB connection and maintain on multiple processes (multiprocessing)

    希望我们可以摆脱一些开销,让这个婴儿更快地加速。

    【讨论】:

    • 嘿@ene,如果这解决了您的问题,最好将其标记为已回答:)
    • 是的,这很奇怪,因为当我发布此问题时,我只是访客用户或其他用户,我无法将自己的问题标记为正确。您可以看到缩略图图像没有与我的一起更新。欢迎提出解决方法的建议
    • 哦,是的.. 问题是您在另一个(未注册的)用户 (*.com/users/954992/ene) 下发布了您的问题,而您现在正在使用注册用户 (*.com/users/965035/ene) 进行回复。如您所见,它们上的ID是不同的。这可能会有所帮助:meta.stackexchange.com/questions/74024/…
    • 永远不要使用 Mutli 进程运行 IO 操作,始终使用异步或多线程
    【解决方案2】:

    在普通 SQL 中,可以执行以下操作:

    UPDATE city ci
    SET gid_fkey = co.gid 
    FROM country co 
    WHERE ST_within(ci.the_geom , co.the_geom) 
    AND ci.city_id = _some_parameter_
            ;
    

    如果一个城市适合多个国家/地区(导致同一目标行多次更新),则可能会出现问题,但您的数据可能并非如此。

    【讨论】: