go2sleep

2年前的实验室项目需要对exploit-db进行爬虫,这里回顾一下知识。

基本思路,使用urllib.request访问exploit-db,使用BeautifulSoup对Response进行解析,然后将提取出的内容存储至Mysql中。

urllib

写这个demo的时候Python2还没有废弃,这里将代码移植至Python3中。

由于exploit-db中漏洞页面的url是https://www.exploit-db.com/exploits/ + eid的方式构成的,因此遍历eid即可爬取所有的漏洞。

构造Request与网页访问
Request文档

class urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)

headers可以在构造函数中指定,也可以通过add_header方法进行添加

urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)

urlopen()函数既可以接收url,也可以接收Request。urlopen()将返回一个字节对象,需要我们自行处理编码。

def spider(spider_url):
    # 构造request
    user_agent = random.choice(ua_list)
    spider_request = request.Request(spider_url)
    spider_request.add_header(\'User-Agent\', user_agent)

    spider_response = request.urlopen(spider_request, timeout=30)

    html = spider_response.read().decode(\'utf-8\')

异常处理
urllib.request中的异常
爬取过程中遇到的一些异常

  • URLError //页面不存在
  • socket.timeout //read()超时
  • UnicodeDecodeError //目标页面是pdf,decode(\'utf-8\')错误
def spider(spider_url):
    # 构造request
    user_agent = random.choice(ua_list)
    spider_request = request.Request(spider_url)
    spider_request.add_header(\'User-Agent\', user_agent)

    try:
        spider_response = request.urlopen(spider_request, timeout=30)
    except error.URLError as e:
        return \'error, URLError\'

    # noinspection PyBroadException
    try:
        html = spider_response.read().decode(\'utf-8\')
    except socket.timeout as e:
        return \'error, socket.timeout\'
    except UnicodeDecodeError as e:
        return \'error, UnicodeDecodeError\'
    except Exception as e:
        return \'error, Exception: %s\' % e

    return html

BeautifulSoup

exploit-db在这段时间也更新了页面,之前写的解析函数已经无法运行。
BeautifulSoup的安装和详细使用方法可以参考官方文档,这里对使用的函数进行说明:
BeautifulSoup通过将html/xml文件转变成一个BeautifulSoup对象,然后根据该对象提供的一些方法对html/xml进行查找和修改。

BeautifulSoup可以通过.访问标签,通过[]访问属性,通过find()和find_all()选择需要的标签,然后提取其中的信息。

Chrome提供的检查工具可以很容易确定元素的位置,分析html中需要的标签的位置,然后选择合适的过滤器。

def bs4html(html):
    # 实现对html的解析
    soup = BeautifulSoup(html, \'html.parser\')
    for div in soup.find_all(\'div\', class_=\'col-sm-12 col-md-6 col-lg-3 d-flex align-items-stretch\'):
        for h in div.find_all(\'div\', class_=\'col-6 text-center\'):
            print(h.h4.get_text().strip() + h.h6.get_text().strip())
        for s in div.find_all(\'div\', class_=\'stats h5 text-center\'):
            if s.strong.string.strip() == \'EDB Verified:\':
                if s.i[\'class\'] == [\'mdi\', \'mdi-24px\', \'mdi-check\']:
                    print(\'EDB Verified: Yes\')
                else:
                    print(\'EDB Verified: No\')
            elif s.strong.string.strip() == \'Exploit:\':
                print(s.strong.string.strip() + s.a[\'href\'])
            else:
                if s.find(\'a\') is None:
                    print(s.strong.string.strip())
                else:
                    print(s.strong.string.strip() + s.a[\'href\'])

数据库存储

ORM也就将数据库映射成对象,然后使用对象的方式操作SQL语句,这里使用SQLalchemy框架。
需要实现两个类,一个类用于和数据库通信,完成增删改查等操作,另一个类是映射类,将数据库中的表与之形成映射。
数据库中的表

class DBPoc(Base):
    __tablename__ = \'exp_poc_info\'

    id = Column(Integer, primary_key=True)
    eid = Column(Integer)
    cve = Column(String)

    title = Column(String)
    author = Column(String)
    published_time = Column(String)

    verified = Column(String)

    platform = Column(String)
    exploit_type = Column(String)
    exploit_url = Column(String)
    exploit_app = Column(String)

    def __init__(self, eid, cve,
                 title, author, published_time, verified,
                 platform, exploit_type, exploit_url, exploit_app):
        self.eid = eid
        self.cve = cve

        self.title = title
        self.author = author
        self.published_time = published_time
        self.verified = verified

        self.platform = platform
        self.exploit_type = exploit_type
        self.exploit_url = exploit_url
        self.exploit_app = exploit_app

与数据库的通信
create_engine()与数据库进行连接,而具体的增删改查需要使用session进行操作

class DBEngine(object):
    def __init__(self):
        #
        self.engine = create_engine(\'sqlite:///exploit_db.sqlite\', echo=False)
        db_session = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
        self.session = db_session()

    def close_db(self):
        #
        self.session.close()

    # interface
    # lower words connected with \'_\'
    def add_poc(self, Poc):
        # 添加poc
        self.session.add(Poc)
        self.session.commit()

    def del_poc(self, eid):
        # 删除poc
        poc = self.session.query(DBPoc).filter(DBPoc.eid == eid).first()
        try:
            self.session.delete(poc)
            self.session.commit()
        except Exception as e:
            print(e)

    def is_eid_exist(self, eid):
        # exist True
        # not exist False
        if self.session.query(DBPoc).filter(DBPoc.eid == eid).first() is None:
            return False
        else:
            return True

    def view_all_poc(self):
        print(\'DBPoc:\')
        all_poc = self.session.query(DBPoc)
        for poc in all_poc:
            print(poc)

完整的代码见github

分类:

技术点:

相关文章:

  • 2022-02-22
  • 2022-12-23
  • 2022-01-19
  • 2021-11-28
  • 2021-12-29
  • 2021-11-29
  • 2021-04-02
  • 2021-11-23
猜你喜欢
  • 2022-01-19
  • 2021-12-29
  • 2022-12-23
  • 2022-01-21
  • 2022-02-05
  • 2021-12-02
  • 2022-02-21
相关资源
相似解决方案