流量截取与入库

我们需要先定义和初始化流量截取后存储的表结构,这里我们采用 peewee 这个 ORM 库,进行操作。
更多操作可参考:
http://docs.peewee-orm.com/en/latest/peewee/quickstart.html

from peewee import *
import datetime
//可以考虑通过 docker 快速搭建 Mysql 服务
db = MySQLDatabase("mitmproxy", host="172.17.0.2", port=3306, user="root", passwd="xxxx")
db.connect()

class HttpRecords(Model):
    //定义了三个字段,id, http 请求记录,时间戳
    id = BigAutoField(primary_key=True)
    httprecord = TextField()
    timestamps = DateTimeField(default=datetime.datetime.utcnow)

    class Meta:
        database = db

接下来编写我们的插件脚本,用于拦截请求,并且写入到我们定义好的数据库表中,如下:

//关注接口(get 请求)
get_url = "https://frodo.douban.com/api/v2/user/91807076/following"

//发表说说的接口(post 请求)
post_url = "https://frodo.douban.com/api/v2/status/create_status?loc_id=118282"
httprecord = {}
url_list = [get_url, post_url]

def parser_data(query):
    data = {}
    for key, value in query.items():
        data[key] = value
    return data

class HttpRecord:
    @concurrent
    def request(self, flow: http.HTTPFlow):
        //这里可以根据自身业务需求,拦截特定域名下的请求(这里为了方便演示,特地指定了两个接口地址)
        if flow.request.pretty_url.startswith(get_url):
            httprecord['method'] = flow.request.method
            httprecord['scheme'] = flow.request.scheme
            httprecord['url'] = flow.request.pretty_url
            httprecord['request_headers'] = {}
            for item in flow.request.headers:
                httprecord['request_headers'][item] = flow.request.headers[item]
            httprecord['get_data'] = parser_data(flow.request.query)
            httprecord['post_data'] = parser_data(flow.request.urlencoded_form)

    @concurrent
    def response(self, flow: http.HTTPFlow):
        if flow.request.pretty_url.startswith(get_url):
            httprecord['status_code'] = flow.response.status_code
            httprecord['response_headers'] = {}
            for item in flow.response.headers:
                httprecord['response_headers'][item] = flow.response.headers[item]
            httprecord['response_content'] = flow.response.get_text()

            # 插入数据库
            record = HttpRecords(httprecord=httprecord)
            record.save()

addons = [
    HttpRecord()
]

检查数据库表,看是否插入成功,如下:

流量回放进行接口测试

从数据库中查询请求记录,并按不同请求方法,e.g. get/post 进行分类;
通过 request 网络请求库,进行重新回放请求接口;
引入 Pytest 测试框架,加入断言,进行组织测试用例的执行,具体代码可参考如下:

import demjson
import requests
from replay.httpmodel import HttpRecords
import pytest

class TestInterface:
    get_http = []
    post_http = []
    def setup_class(self):
        # 从数据库获取流量记录(前置处理操作)
        self.httprecords = HttpRecords.select()
        for item in self.httprecords:
            data = demjson.decode(item.httprecord)
            if data['method'] == "GET":
                self.get_http.append(data)
            elif data['method'] == "POST":
                self.post_http.append(data)
            else:
                ...
        # 初始化请求 session
        self.session = requests.session()

    def testReplayGet(self):
        """
         测试回放 Get 请求
        """
        for i in range(len(self.get_http)):
            res = self.session.get(url=self.get_http[i]['url'], headers=self.get_http[i]['request_headers'], data=self.get_http[i]['get_data'])
            //这里主要是断言了响应状态码,在实际业务中,我们还需要断言返回格式及校验核心字段。
            assert res.status_code == 200

    def testRelayPost(self):
        """
        测试回放 Post 请求
        """
        for i in range(len(self.post_http)):
            res = self.session.post(url=self.post_http[i]['url'], headers=self.post_http[i]['request_headers'], data=self.post_http[i]['post_data'])
            assert res.status_code == 200

if __name__ == '__main__':
    pytest.main(["--html=report.html --self-contained-html", "interfacereplay.py"])

查看测试报告
get 和 post 请求分别回放测试成功:

MitmProxy使用:流量劫持与入库,流量回放

 

分类:

技术点:

相关文章: