cheflone

最近拜读瑞安·米切尔的书关于并行抓取问题有很通俗的介绍:

  “网页抓去的速度很快,起码通常比雇佣几十个实习生手动网上复制数据要快很多。当然随着技术的不断进步和享乐适应,人们还是在某个时刻觉得‘不够快’,于是把目光转向分布式计算。

 和其他领域不同的是,网页抓取不能单纯依靠‘给问题增加更多进程’来提升速度,虽然运行一个process很快,但是两个进程未必能让速度提升一倍,而当运行三个乃更多时,可能你的所有请求都会被远程服务器封杀,因为他认为你是在恶意攻击。”

然而,某些场景里使用网页并行抓取或者并行线程(thread)/进程仍然有些好处:

 

1.从多个数据源(多个远程服务器),而不只是在一个数据源收集数据。

2.在已经收集到的数据上执行更加复杂/执行时间更长的操作(例如图像分析或者OCR处理)。

3.从大型web服务器收集数据,如果你已经付费,或者创建多个连接是使用协议允许的行为。

 

一.pythom3.x 使用的是_thread,而thread模块已经废弃

1.1下面用_thread实现一个小例子

 

 1 import time
 2 import  _thread
 3 def print. time(threadName, delay, iterations):
 4 
 5     start = int(time . time())
 6 
 7     for i in range(0 ,iterations):
 8 
 9         time .sleep(delay)
10 
11         seconds_ elapsed = str(int(time.time()) - start)
12 
13         print ("[] []". format(seconds_ elapsed, threadName))
14     
15 try:
16     _thread.start_new_thread(print_time, (\'Fizz\', 3, 33))
17     _thread.start_new_thread(print_time, (\'Buzz\',5,20))
18     _thread.start_new_thread(print_time,(\'Counter\',1,100))
19 except:
20 
21     print (\'Error:unable to start_thread\')
22 
23 while 1:
24 
25 pass                

上面的例子开启了三个线程,分别3,5,1秒打印不同次数的名字

 

1.2 threading 模块

 

 _thread是python相当底层的模块,虽然可以有详细的管理操作,但是苦于没有高级函数,使用起来不太方便。

threadign模块是一个高级接口,更加便捷使用线程,与此同时也体现了_thread模块的特性。

在threading与os模块里,分别调用了current_thread()和getpid() 该函数来获取当前的线程/进程号,能有更好的执行效果。

 1 import os
 2 import threading
 3 from threading import Thread
 4 
 5 import time
 6 
 7 userlist = ["黎明", "张学友", "刘德华"]
 8 
 9 
10 def work(n):
11     print(f"开始给{n}打电话,进程号:{os.getpid()},线程号是:{threading.current_thread()}")  # 这里加f 是字符串格式化,作用类似于format
12     time.sleep(3)
13     print(f"给{n}打电话结束。进程号:{os.getpid()}线程号是:{threading.current_thread()}")
14 
15 
16 if __name__ == \'__main__\':
17     # 单线程
18     # for d in userlist:
19     #     work(d)
20     plist = []
21     # # 循环创建多进程部门,类似增加
22     # for d in userlist:
23     #     # 进程对象
24     #     p = Process(target=work, args=(d,))
25     #     # 生成进程
26     #     p.start()
27     #     # 生成的进程加入列表
28     #     plist.append(p)
29     #
30     # # 阻塞终止进程的执行
31     # [i.join() for i in plist]
32 
33     # 多线程类似部门增加人手
34     for d in userlist: 
35         # 利用for来生成线程对象,然后再为它们传入你想要传入的数据(可以不一样)
36         p = Thread(target=work, args=(d,))
37         # 生成线程
38         p.start()
39         # 生成的线程加入列表
40         plist.append(p)
41 
42     # 阻塞终止线程的执行(join():创建的都是子线程。如果不加join().主线程运行完,程序就结束了,作用就是等待子线程运行完毕。)
43     [i.join() for i in plist]

你也可以单个的创建,threading的优势是创建的线程其他线程无法访问的线程局部数据(local thread data),这样的优势对于爬虫尤其明显,它们抓取不同的网站,那么每个线程都可以

专注于自己要抓取的目标。

import threading

def  crawler(url):
       data = threading.local()
       data.visited = []
       #爬去目标网站

threading.Thread(target=crawler, args=(\'http://www.douban.com\')).start()

这样就解决了线程之间的共享对象导致竞争条件问题。目标很明显:不需要共享就不要共享(使用local data),为了安全的共享,就需要用到Queue模块(后面会有示例)

threading的保姆职责甚至可以达到高度定制:isAlive函数的默认行为查看是否有线程仍处于活跃状态。当一个线程崩溃或者重启后才会返回True:

threading.Thread(target=crawler)

t.start()

while True:
         time.sleep(1)
         if not t.isAlive:

t = Thread(target=crawler, args=(d,))

t.start()

这样可以达到简单的监控方法。

 

2021-04-01

19:36:13

1.3import threading

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import  os
import time

user_list = [\'树花\', \'欧阳娜娜\', \'迪丽热巴\', \'鞠婧祎\', \'宋祖儿\']

user_list1 = [\'树花\', \'欧阳娜娜\', \'迪丽热巴\', \'鞠婧祎\', \'宋祖儿\']


def work(n):
    print(f"开始给{n}打电话,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+\'\n\')
    time.sleep(2)
    print(f"给{n}打电话结束,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+\'\n\')

def work1(n):
    print(f"开始给{n}打电话,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+\'\n\')
    time.sleep(2)
    print(f"给{n}打电话结束,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+\'\n\')


if __name__ == \'__main__\':
    \'\'\'
    创建线程池的时候规定三个线程,根据指派任务为三种情况:
    1.= 三个任务  理论上同时执行三个任务
    2.> 三个任务  先执行前三个任务,在这其中根据完成速度先后继续分配线程给后面的任务
    3.< 三个任务  同时执行分配的任务,无任务线程等待阻塞 
    \'\'\'
    # 创建线程池(进程池)
with ThreadPoolExecutor(max_workers=3) as pool:
    # pool = ProcessPoolExecutor(max_workers=3)
    # 循环指派任务和参数
    [pool.submit(work,user) for user in user_list]
    # [pool.submit(work1,[user]) for user in user_list1]

    # 关闭线程池(进程池)

    pool.shutdown(wait=True)
print(\'主线程完成\')



1.3.1

      如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数,直接调用result函数结果。

 

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
    time.sleep(2)
    return \'finished\'

def test_result(future):
    print(future.result())

if __name__ == "__main__":

    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
        future = threadPool.submit(test, i,i+1)
        future.add_done_callback(test_result) # 回调函数
        print(future.result())

    threadPool.shutdown(wait=True)
    print(\'main finished\')

 

1.3.2 map()

from concurrent.futures import ThreadPoolExecutor

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
#     time.sleep(2)
    return \'*******\'

if __name__ == "__main__":

    args1 = [1,2]  #假设1,2分别代表一个url
    args2 = [3,4]

    with ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_") as threadPool:

        threadPool.map(test, args1,args2) # 这是运行一次test的参数,众所周知map可以让test执行多次,一个[]代表参数,这里会请求1,3和2,4,执行两次。

        threadPool.shutdown(wait=True)

执行结果:

test__0 threading is printed 1, 3
test__1 threading is printed 2, 4


    上面程序使用 map() 方法来启动 4个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来
才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。
    通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 test() 函数,
但最后收集的 test() 函数的执行结果,依然与传入参数的结果保持一致。

2021-04-03

00:01:27

 

1.4 利用队列达到线程之间安全通信

 

这一个面向对象的简单爬虫,使用了上面提到的线程池和Queue。

import os
import threading
import time
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import requests
from lxml import etree


class quibai():
    def __init__(self):
        self.base_url = "https://www.qiushibaike.com/imgrank/page/{}/"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36", }
        self.queue_list_url = Queue()
        self.queue_parse_url = Queue()
        self.queue_content = Queue()

    def list_url(self):
        # return [self.base_url.format(i) for i in range(1,14)]
        for i in range(1, 14):
            url_s = self.base_url.format(i)
            self.queue_list_url.put(url_s)
            # print(self.queue_list_url.get())

    def get_imformations(self):
        while True:
            url = self.queue_list_url.get()

            response = requests.get(url=url, headers=self.headers)
            time.sleep(1)
            # return response.content.decode(\'utf-8\')
            self.queue_parse_url.put(response.content.decode(\'utf-8\'))
            self.queue_list_url.task_done()

    def analysis(self):
        while True:
            resp = self.queue_parse_url.get()
            res = etree.HTML(resp)
            item = {}
            picture = res.xpath(\'//div[@class="author clearfix"]//img/@src\')
            item["picture"] = ["https:" + i for i in picture]
            name = res.xpath(\'//div[@class="author clearfix"]//h2/text()\')
            item["name"] = [n.replace("\n", "") for n in name]
            # for n in name:

            title = res.xpath(\'//div[@class="content"]/span/text()\')
            item["title"] = [t.replace("\n", "") for t in title]
            photo = res.xpath(\'//div[@class="thumb"]/a/img/@src\')
            item["photo"] = ["https:" + i for i in photo]
            item["Fabulous"] = res.xpath(\'//span[@class="stats-vote"]/i/text()\')
            item["comment"] = res.xpath(\'//span[@class="stats-comments"]//i/text()\')

            gender = res.xpath(\'//div[@class="author clearfix"]/div/@class\')
            for i in range(len(gender)):
                re = gender[i].split(" ")[1]
                if re == "womenIcon":
                    gender[i] = ""
                else:
                    gender[i] = ""
            item["gender"] = gender
            info = {}
            # # print(item[\'picture\'])
            for name, gender, picture, title, photo, Fabulous, comment in zip(item[\'name\'], item[\'gender\'],
                                                                              item[\'picture\'], item[\'title\'],
                                                                              item[\'photo\'], item[\'Fabulous\'],
                                                                              item[\'comment\']):
                info[\'name\'] = name
                info[\'gender\'] = gender
                info[\'picture\'] = picture
                info[\'title\'] = title
                info[\'photo\'] = photo
                info[\'Fabulous\'] = Fabulous
                info[\'comment\'] = comment
            # return name, gender, picture, title, photo, Fabulous, comment
            self.queue_content.put(info)
            self.queue_parse_url.task_done()

    def save_data(self):
        if not os.path.exists("story_info"):
            os.mkdir("story_info")
        s = 1
        while True:
            item_list = self.queue_content.get()
            with open("story_info/test_{}.txt".format(s), \'w\', encoding=\'utf-8\') as f:
                f.writelines(str(item_list))
            s += 1
            self.queue_content.task_done()

    def go(self):
        """
        1.设置基础信息,链接数据库
        2.获取url列表
        3。获取请求
        4。get信息
        5.save data from \'def get\' to mysql
        6.改造成线程
        :return:
        """
        # li_url = self.list_url()
        # print(li_url)
        # for j in li_url:
        # resp = self.get_imformations(j)
        # print(resp)
        # name, gender, picture, title, photo, Fabulous, comment =self.analysis(resp)
        # print(self.analysis(resp))
        thread_list = []
        # 在下面同时循环建立线程对象
        
         for j in range(3):
             t1 = threading.Thread(target=self.list_url)
             thread_list.append(t1)
        
             t2 = threading.Thread(target=self.get_imformations)
        
             thread_list.append(t2)
             t3 = threading.Thread(target=self.analysis)
        
             thread_list.append(t3)
             t4 = threading.Thread(target=self.save_data)
        
             thread_list.append(t4)
         for t in thread_list:
             t.setDaemon(True)  # 把子线程设置成守护线程,该线程不signifisant,当主线程结束,子线程结束
             t.start()
             print(t)
         for q in [self.queue_list_url, self.queue_parse_url, self.queue_content]:
             q.join()  # 让主线程队列先阻塞等待,当所有子线程完成后再完成。
             print(q)
         print("主线程结束")

        if __name__ == \'__main__\':
    start_time=time.time()
    star = quibai()
    star.go()
    end_time=time.time()
    print(\'执行完毕,花了:{}的时间\'.format(end_time-start_time))

 

1.4.1  锁与线程同步

 

我上面的例子使用的了队列来实现了线程的同步和安全,而锁也可以实现共享资源的同步访问,特别在一些场景下有十分重要的应用,例如io操作、数据库查询,在博客总能看到大佬们对我这种的小白的调侃告诫 “毕加索”。 这里就不写

分类:

技术点:

相关文章: