【问题标题】:Python multithreading crawlerPython多线程爬虫
【发布时间】:2012-05-29 13:53:40
【问题描述】:

你好!我正在尝试用 python 编写网络爬虫。我想使用python多线程。即使在阅读了早期建议的论文和教程之后,我仍然有问题。我的代码在这里(整个源代码是here):

class Crawler(threading.Thread):

    global g_URLsDict 
    varLock = threading.Lock()
    count = 0

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.url = self.queue.get()

    def run(self):
        while 1:
            print self.getName()+" started" 
            self.page = getPage(self.url)
            self.parsedPage = getParsedPage(self.page, fix=True)
            self.urls = getLinksFromParsedPage(self.parsedPage)

            for url in self.urls:

                self.fp = hashlib.sha1(url).hexdigest()

                #url-seen check
                Crawler.varLock.acquire() #lock for global variable g_URLs
                if self.fp in g_URLsDict:
                    Crawler.varLock.release() #releasing lock
                else:
                    #print url+" does not exist"
                    Crawler.count +=1
                    print "total links: %d"%len(g_URLsDict)
                    print self.fp
                    g_URLsDict[self.fp] = url
                    Crawler.varLock.release() #releasing lock
                    self.queue.put(url)

                    print self.getName()+ " %d"%self.queue.qsize()
                    self.queue.task_done()
            #self.queue.task_done()
        #self.queue.task_done()


print g_URLsDict
queue = Queue.Queue()
queue.put("http://www.ertir.com")

for i in range(5):
    t = Crawler(queue)
    t.setDaemon(True)
    t.start()

queue.join()

它没有按需要工作,它在线程 1 之后没有给出任何结果,并且它执行的方式不同,有时会出现此错误:

Exception in thread Thread-2 (most likely raised during interpreter shutdown):

我该如何解决?而且我认为这并不比 for 循环更有效。

我已尝试修复 run():

def run(self):
    while 1:
        print self.getName()+" started" 
        self.page = getPage(self.url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()
                #self.queue.task_done()
        #self.queue.task_done()
    self.queue.task_done()

我在不同的地方试验了 task_done() 命令,谁能解释一下区别?

【问题讨论】:

  • 第一个示例是否缺少一些缩进?看起来班级成员应该被推入一级..?
  • 你能发布一个工作示例吗?你导入了哪些模块?
  • snipt.org/ujhW9 工作源代码
  • 我很困惑。您创建 5 个线程,每个线程从队列中读取一次。然后很多 url 从前五页添加到队列中,但它们从未被读取?
  • 什么意思?它们被添加到队列中,但线程也会从该队列中读取 url。

标签: python multithreading thread-safety web-crawler


【解决方案1】:

你只在线程初始化时调用self.url = self.queue.get()。如果您想获取新的 url 以便进一步处理,则需要尝试从 while 循环内的队列中重新获取 url。

尝试将self.page = getPage(self.url) 替换为self.page = getPage(self.queue.get())。请注意,get 函数将无限期阻塞。您可能希望在一段时间后超时并添加一些方式让您的后台线程按请求正常退出(这将消除您看到的异常)。

some good examples on effbot.org 以我上面描述的方式使用 get()。

编辑 - 对您最初的 cmets 的回答:

看看the docs for task_done();对于对get() 的每次调用(不会超时),您应该调用task_done(),它会告诉对join() 的任何阻塞调用,该队列上的所有内容现在都已处理。每次对get() 的调用都会在等待新的 url 发布到队列中时阻塞(休眠)。

Edit2 - 试试这个替代的运行功能:

def run(self):
    while 1:
        print self.getName()+" started"
        url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue
        self.page = getPage(url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                Crawler.count +=1
                print "total links: %d"%len(g_URLsDict)
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()

        self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it.

【讨论】:

  • 以及 task_done() 命令怎么样,我应该把它放在哪里,它是如何影响的?调用 task_done() 命令时,线程是否处于休眠状态并为另一个线程分配时间?如果是这样,并发在哪里?我很困惑。
  • 调用 task_done() 只会告诉其他线程可以使用队列吗?我的意思是,我从队列中获取url,然后立即调用task_done(),第二种获取url,处理,解析页面(此时我希望其他线程使用Queue,因为处理页面需要一些时间),然后我调用task_done(),有什么区别?哪个有效
  • 正如我所料,在 q.get() 终止所有线程之后立即放置 task_done()
  • 请阅读我链接的文档; task_done() 只是告诉 queue.join() 对 queue.put() 的一次调用已被处理。是对 queue.get() 的调用执行您感兴趣的阻塞。每次调用 queue.put() 时,一个阻塞在它调用 queue.get() 上的线程将唤醒并开始处理.. .
  • 我不能理解的一点是:每个创建的线程都从Queue中获取url,然后处理它,然后调用task_done(),那么真正的多线程效率在哪里,线程只是在变化地方,他们没有什么并行的,可以给速度????
猜你喜欢
  • 1970-01-01
  • 2021-07-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-10-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多