目录结构
project
case#测试用例
suite#测试目录
logs#测试日志
papi#测试类
result#测试结果
setting.py#配置文件
1、日志类,用于测试时日志记录
pyapilog.py
# -*-coding:utf-8 -*-
# !/usr/bin/python
# __author__ = 'dongjie'
__data__ = '2015-05-20'
import datetime
import logging
import os
from yunjiweidian import setting
logLevel = {
1 : logging.NOTSET,
2 : logging.DEBUG,
3 : logging.INFO,
4 : logging.WARNING,
5 : logging.ERROR,
6 : logging.CRITICAL
}
setFile = os.path.join(setting.root_dir, 'setting.ini')
loggers = {}
# 定义日志方法,从配置文件读取日志等级,且定义日志输出路径
def pyapilog(**kwargs):
global loggers
log_level = setting.logLevel
log_path = setting.logFile
if os.path.exists(log_path):
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
else:
os.mkdir(r'%s' % log_path)
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
logger = logging.getLogger()
logger.setLevel(logLevel[log_level])
if not logger.handlers:
# 创建一个handler,用于写入日志文件
fh = logging.FileHandler(log_file)
fh.setLevel(logLevel[log_level])
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 定义handler的输出格式
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
loggers.update(dict(name=logger))
return logger
2、http测试类
httprequest.py
# -*-coding:utf-8 -*- # !/usr/bin/python __author__ = 'dongjie' __data__ = '2015-05-20' from pyapilog import pyapilog import requests import json import urllib class SendHttpRequest(object): def __init__(self, url): self.url = url # post request def post(self, value=None): params = urllib.urlencode(value) try: req = requests.post(self.url + "?%s" % params) except Exception, err: print err if req.status_code == 200: pyapilog().info(u"发送post请求: %s 服务器返回: %s" % (req.url, req.status_code)) else: pyapilog().error(u"发送post请求: %s 服务器返回: %s\n error info: %s " % (req.url, req.status_code, req.text)) return req.text def post_json(self, value): head = {'content-type': 'application/json'} try: req = requests.post(self.url, data=json.dumps(value), headers=head) print req.url except Exception, err: print err if req.status_code == 200: pyapilog().info(u"发送post请求: %s 服务器返回: %s" % (req.url, req.status_code)) return req.text else: pyapilog().error(u"发送post请求: %s 服务器返回: %s\n error info: %s " % (req.url, req.status_code, req.text)) def get(self, value=None): try: req = requests.get(self.url, params=value) except Exception, err: print err if req.status_code == 200: pyapilog().info(u"发送get请求: %s 服务器返回: %s" % (req.url, req.status_code)) else: pyapilog().error(u"发送get请求: %s 服务器返回: %s\n error info: %s " % (req.url, req.status_code, req.text)) return req.text
3、数据库操作类
databasedriver.py
# -*-coding:utf-8 -*# !/usr/bin/python __author__ = 'dongjie' __data__ = '2015-05-21' import pymssql import MySQLdb import setting from pyapilog import pyapilog class sqldriver(object): def __init__(self, host, port, user, password, database): self.host = host self.port = port self.user = user self.password = password self.database = database # 执行SQLserver查询 def exec_mssql(self, sql): try: conn = pymssql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset="utf8") cur = conn.cursor() if cur: pyapilog().info(u"执行SQL语句%s" % sql) cur.execute(sql) rows = cur.fetchall() if len(rows) == 0: pyapilog().warning(u"没有查询到数据") return rows else: pyapilog().error(u"数据库连接不成功") conn.close() except Exception, e: pyapilog().error(e) # 执行Mysql查询 def exec_mysql(self, sql): try: conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, db=self.database, ) cur = conn.cursor() if cur: pyapilog().info(u"执行SQL语句%s" % sql) resList = cur.execute(sql) return resList except Exception, e: pyapilog().error(e) # 执行sql语句返回结果 def execsql(sql): config = setting.DATABASE driver = config.get("ENGINE") host = config.get("HOST") port = config.get("PORT") user = config.get("USER") password = config.get("PWD") database = config.get("DATABASE") if driver == "MYSQL": try: sql_result = sqldriver( host=host, port=port, user=user, password=password, database=database ).exec_mysql(sql) return sql_result except Exception, e: pyapilog().error(e) elif driver == "MSSQL": try: sql_result = sqldriver( host=host, port=port, user=user, password=password, database=database ).exec_mssql(sql) return sql_result except Exception, e: pyapilog().error(e) else: pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)
4、解析json字符串
dataprase.py
# -*-coding:utf-8 -*- # !/usr/bin/python __author__ = 'dongjie' __data__ = '2015-05-21' import json import xmltodict from pyapilog import pyapilog # 解析json字符串 class jsonprase(object): def __init__(self, json_value): try: self.json_value = json.loads(json_value) except ValueError, e: pyapilog().error(e) def find_json_node_by_xpath(self, xpath): elem = self.json_value nodes = xpath.strip("/").split("/") for x in range(len(nodes)): try: elem = elem.get(nodes[x]) except AttributeError: elem = [y.get(nodes[x]) for y in elem] return elem def datalength(self, xpath="/"): return len(self.find_json_node_by_xpath(xpath)) @property def json_to_xml(self): try: root = {"root": self.json_value} xml = xmltodict.unparse(root, pretty=True) except ArithmeticError, e: pyapilog().error(e) return xml # 解析xml字符串 class xmlprase(object): def __init__(self, xml_value): self.xml_str = xml_value @property def xml_to_json(self): try: xml_dic = xmltodict.parse(self.xml_str, encoding="utf-8", process_namespaces=True, ) json_str = json.dumps(xml_dic) except Exception, e: print e return json_str