目录结构

project

  

  case#测试用例

    

    suite#测试目录

  

  logs#测试日志

  

  papi#测试类

  

  result#测试结果

  

  setting.py#配置文件

1、日志类,用于测试时日志记录 

pyapilog.py

  

# -*-coding:utf-8 -*-
# !/usr/bin/python
# __author__ = 'dongjie'
__data__ = '2015-05-20'

import datetime
import logging
import os

from yunjiweidian import setting

logLevel = {
1 : logging.NOTSET,
2 : logging.DEBUG,
3 : logging.INFO,
4 : logging.WARNING,
5 : logging.ERROR,
6 : logging.CRITICAL
}
setFile = os.path.join(setting.root_dir, 'setting.ini')
loggers = {}


# 定义日志方法,从配置文件读取日志等级,且定义日志输出路径
def pyapilog(**kwargs):
global loggers
log_level = setting.logLevel
log_path = setting.logFile
if os.path.exists(log_path):
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
else:
os.mkdir(r'%s' % log_path)
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
logger = logging.getLogger()
logger.setLevel(logLevel[log_level])
if not logger.handlers:
# 创建一个handler,用于写入日志文件
fh = logging.FileHandler(log_file)
fh.setLevel(logLevel[log_level])
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 定义handler的输出格式
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
loggers.update(dict(name=logger))
return logger

2、http测试类

httprequest.py

  

# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'

from pyapilog import pyapilog
import requests
import json
import urllib

class SendHttpRequest(object):
    def __init__(self, url):
        self.url = url
    # post request

    def post(self, value=None):
        params = urllib.urlencode(value)
        try:
            req = requests.post(self.url + "?%s" % params)
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
        else:
            pyapilog().error(u"发送post请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
        return req.text

    def post_json(self, value):
        head = {'content-type': 'application/json'}
        try:
            req = requests.post(self.url, data=json.dumps(value), headers=head)
            print req.url
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
            return req.text
        else:
            pyapilog().error(u"发送post请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))

    def get(self, value=None):
        try:
            req = requests.get(self.url, params=value)
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code))
        else:
            pyapilog().error(u"发送get请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
        return req.text

3、数据库操作类

databasedriver.py

  

# -*-coding:utf-8 -*# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-21'
import pymssql
import MySQLdb
import setting
from pyapilog import pyapilog

class sqldriver(object):
    def __init__(self, host, port, user, password, database):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database

    # 执行SQLserver查询
    def exec_mssql(self, sql):
        try:
            conn = pymssql.connect(host=self.host,
                                   port=self.port,
                                   user=self.user,
                                   password=self.password,
                                   database=self.database,
                                   charset="utf8")
            cur = conn.cursor()
            if cur:
                pyapilog().info(u"执行SQL语句%s" % sql)
                cur.execute(sql)
                rows = cur.fetchall()
                if len(rows) == 0:
                    pyapilog().warning(u"没有查询到数据")
                return rows
            else:
                pyapilog().error(u"数据库连接不成功")
            conn.close()
        except Exception, e:
            pyapilog().error(e)

    # 执行Mysql查询
    def exec_mysql(self, sql):
        try:
            conn = MySQLdb.connect(host=self.host,
                                   port=self.port,
                                   user=self.user,
                                   passwd=self.password,
                                   db=self.database,
                                   )
            cur = conn.cursor()
            if cur:
                pyapilog().info(u"执行SQL语句%s" % sql)
                resList = cur.execute(sql)
                return resList
        except Exception, e:
            pyapilog().error(e)

# 执行sql语句返回结果
def execsql(sql):
    config = setting.DATABASE
    driver = config.get("ENGINE")
    host = config.get("HOST")
    port = config.get("PORT")
    user = config.get("USER")
    password = config.get("PWD")
    database = config.get("DATABASE")
    if driver == "MYSQL":
        try:
            sql_result = sqldriver(
                host=host,
                port=port,
                user=user,
                password=password,
                database=database
            ).exec_mysql(sql)
            return sql_result
        except Exception, e:
            pyapilog().error(e)

    elif driver == "MSSQL":
        try:
            sql_result = sqldriver(
                host=host,
                port=port,
                user=user,
                password=password,
                database=database
            ).exec_mssql(sql)
            return sql_result
        except Exception, e:
            pyapilog().error(e)
else:
        pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

 

4、解析json字符串

dataprase.py

  

# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-21'
import json
import xmltodict
from pyapilog import pyapilog

# 解析json字符串
class jsonprase(object):
    def __init__(self, json_value):
        try:
            self.json_value = json.loads(json_value)
        except ValueError, e:
            pyapilog().error(e)

    def find_json_node_by_xpath(self, xpath):
        elem = self.json_value
        nodes = xpath.strip("/").split("/")
        for x in range(len(nodes)):
            try:
                elem = elem.get(nodes[x])
            except AttributeError:
                elem = [y.get(nodes[x]) for y in elem]
        return elem

    def datalength(self, xpath="/"):
        return len(self.find_json_node_by_xpath(xpath))

    @property
    def json_to_xml(self):
        try:
            root = {"root": self.json_value}
            xml = xmltodict.unparse(root, pretty=True)
        except ArithmeticError, e:
            pyapilog().error(e)
        return xml

# 解析xml字符串
class xmlprase(object):
    def __init__(self, xml_value):
        self.xml_str = xml_value

    @property
    def xml_to_json(self):
        try:
            xml_dic = xmltodict.parse(self.xml_str,
                                      encoding="utf-8",
                                      process_namespaces=True,
                                      )
            json_str = json.dumps(xml_dic)
        except Exception, e:
            print e
        return json_str
View Code

相关文章:

  • 2021-10-23
  • 2022-12-23
  • 2021-11-20
  • 2022-12-23
  • 2021-11-13
  • 2022-12-23
  • 2021-06-06
猜你喜欢
  • 2021-10-22
  • 2022-12-23
  • 2021-08-03
  • 2022-01-18
  • 2021-12-01
  • 2022-12-23
  • 2021-08-03
相关资源
相似解决方案