Httprunner集成Sql数据库验证

分享
藏宝库编辑 2024-9-28 20:35:37 116 0 来自 中国
Httprunner集成Sql验证


  • 开发功能点:
    1、sql语句可参数化
    2、sql效果支持JsonPath提取
    3、查询效果网络到httprunner Summary信息中
    4、定制场景哀求前与哀求后查询(如:充值接口)
  • 代码

  • 继承Runner类,重写_run_test方法
from httprunner.runner import Runnerfrom httprunner import exceptions, logger, response, utilsfrom httprunner.validator import Validatorfrom utils.httpManage.db import DbMysqlfrom httprunner.client import HttpSessionclass ARunner(Runner):    """ Running testcases. """    def __init__(self, config, http_client_session=None):        """            config =  {                        "name": "XXXX",                        "base_url": "http://127.0.0.1",                        "verify": False,                        "db_connect": {  #  db_mysql connect                              "host": "",                              "port": "",                              "user": "",                              "pwd": "",                              "database": "",                          }                }        """        super(ARunner, self).__init__(config, http_client_session)        self.config_db_connect = config.get("db_connect", {})  # config增长sql毗连信息        self.config_db_connect.update({"session_context": self.session_context})        self.sql_results = {}  # sql查询效果    def do_query_actions(self, actions, a_type):        """ 增长sql查询利用方法        Args:            actions (list):                    [{"variable_name": '', "sql": "", "json_path":""}]            a_type (str): setup_db/teardown_db        """        logger.log_debug("\n==================query {} details ==================\n".format(a_type))        with DbMysql(**self.config_db_connect) as db:            db.do_find(actions)            self.sql_results[a_type] = db.sql_results    def _run_test(self, test_dict):        """ run single teststep.        Args:            test_dict (dict): teststep info                {                    "name": "teststep description",                    "skip": "skip this test unconditionally",                    "times": 3,                    "variables": [],            # optional, override                    "request": {                        "url": "http://127.0.0.1:5000/api/users/1000",                        "method": "OST",                        "headers": {                            "Content-Type": "application/json",                            "authorization": "$authorization",                            "random": "$random"                        },                        "json": {"name": "user", "password": "123456"}                    },                    "extract": {},              # optional                    "validate": [],             # optional                    "setup_hooks": [],          # optional                    "teardown_hooks": []        # optional                    "setup_dbs": [                        {"variable_name": '', "sql": "", "json_path":""}                        ...                    ],                    "teardown_dbs":[                        {"variable_name": '', "sql": "", "json_path": ""}                        ...                    ]                }        Raises:            exceptions.ParamsError            exceptions.ValidationFailure            exceptions.ExtractFailure        """        # clear meta data first to ensure independence for each test        self.__clear_test_data()        # check skip        self._handle_skip_feature(test_dict)        # prepare        test_dict = utils.lower_test_dict_keys(test_dict)        test_variables = test_dict.get("variables", {})        self.session_context.init_test_variables(test_variables)        # teststep name        test_name = self.session_context.eval_content(test_dict.get("name", ""))        # parse test request        raw_request = test_dict.get('request', {})        parsed_test_request = self.session_context.eval_content(raw_request)        self.session_context.update_test_variables("request", parsed_test_request)        # prepend url with base_url unless it's already an absolute URL        url = parsed_test_request.pop('url')        base_url = self.session_context.eval_content(test_dict.get("base_url", ""))        parsed_url = utils.build_url(base_url, url)        # setup hooks        setup_hooks = test_dict.get("setup_hooks", [])        if setup_hooks:            self.do_hook_actions(setup_hooks, "setup")        # setup dbs  新增哀求前sql查询        setup_dbs = test_dict.get("setup_dbs", [])        if setup_dbs:            self.do_query_actions(teardown_dbs, "setup_db")        try:            method = parsed_test_request.pop('method')            parsed_test_request.setdefault("verify", self.verify)            group_name = parsed_test_request.pop("group", None)        except KeyError:            raise exceptions.ParamsError("URL or METHOD missed!")        # TODO: move method validation to json schema        valid_methods = ["GET", "HEAD", "OST", "UT", "ATCH", "DELETE", "OPTIONS"]        if method.upper() not in valid_methods:            err_msg = u"Invalid HTTP method! => {}\n".format(method)            err_msg += "Available HTTP methods: {}".format("/".join(valid_methods))            logger.log_error(err_msg)            raise exceptions.ParamsError(err_msg)        logger.log_info("{method} {url}".format(method=method, url=parsed_url))        logger.log_debug(            "request kwargs(raw): {kwargs}".format(kwargs=parsed_test_request))        # request        resp = self.http_client_session.request(            method,            parsed_url,            name=(group_name or test_name),            **parsed_test_request        )        resp_obj = response.ResponseObject(resp)        # teardown hooks        teardown_hooks = test_dict.get("teardown_hooks", [])        if teardown_hooks:            self.session_context.update_test_variables("response", resp_obj)            self.do_hook_actions(teardown_hooks, "teardown")            self.http_client_session.update_last_req_resp_record(resp_obj)        # teardown dbs  新增哀求后查询sql        teardown_dbs = test_dict.get("teardown_dbs", [])        if teardown_dbs:            self.do_query_actions(teardown_dbs, "teardown_db")        # extract        extractors = test_dict.get("extract", {})        extracted_variables_mapping = resp_obj.extract_response(extractors)        self.session_context.update_session_variables(extracted_variables_mapping)        # validate        validators = test_dict.get("validate") or test_dict.get("validators") or []        validate_script = test_dict.get("validate_script", [])        if validate_script:            validators.append({                "type": "python_script",                "script": validate_script            })        validator = Validator(self.session_context, resp_obj)        try:            validator.validate(validators)        except (exceptions.ParamsError,                exceptions.ValidationFailure, exceptions.ExtractFailure):            err_msg = "{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)            # log request            err_msg += "====== request details ======\n"            err_msg += "url: {}\n".format(parsed_url)            err_msg += "method: {}\n".format(method)            err_msg += "headers: {}\n".format(parsed_test_request.pop("headers", {}))            for k, v in parsed_test_request.items():                v = utils.omit_long_data(v)                err_msg += "{}: {}\n".format(k, repr(v))            err_msg += "\n"            # log response            err_msg += "====== response details ======\n"            err_msg += "status_code: {}\n".format(resp_obj.status_code)            err_msg += "headers: {}\n".format(resp_obj.headers)            err_msg += "body: {}\n".format(repr(resp_obj.text))            logger.log_error(err_msg)            raise        finally:            # get request/response data and validate results            self.meta_datas = getattr(self.http_client_session, "meta_data", {})            self.meta_datas["validators"] = validator.validation_results            self.meta_datas["sql_results"] = self.sql_results

  • 定制sql查询 pip install pymysql
# -------------------------------------# Author:       ALan# Date:         2022/4/19# Time:         13:36# -------------------------------------import pymysqlimport jsonpathfrom pymysql import OperationalErrorfrom httprunner import logger, exceptionsclass DbMysql(object):    def __init__(self, host, port, user, pwd, database, session_context=None):        # 创建毗连        try:            self.conn = pymysql.connect(host=host,  # 数据库地点                                        port=int(port),  # 端口号                                        user=user,  # sql账号                                        password=pwd,  # sql暗码                                        database=database,  # 数据库名                                        charset='utf8',                                        cursorclass=pymysql.cursors.DictCursor)        except OperationalError as err_msg:            logger.log_error(str(err_msg))        else:            # 创建游标            self.cur = self.conn.cursor()            self.session_context = session_context            self.sql_results = []    def __enter__(self):        """查询前利用"""        self.conn.commit()  # 查询前,提交变乱        return self    def __exit__(self, exc_type, exc_val, exc_tb):        """实行后关闭毗连利用"""        self.cur.close()  # 关闭游标        self.conn.close()  # 关闭毗连    def find_all_result(self, sql):        """查询sql语句全部效果"""        self.cur.execute(sql)  # 实行sql语句        return self.cur.fetchall()    def do_find(self, actions):        """        查询利用        :param actions: [{"variable_name": '', "sql": "", "json_path":""}]        :return:        """        self.sql_results = []        def log_print(req_resp_dict):            msg = "\n"            for key, value in req_resp_dict.items():                msg += "{:<16} : {}\n".format(key, repr(value))            logger.log_debug(msg)        for action in actions:            _sql = self.__eval_sql(action.get("sql", ""))  # 更换sql语句            json_path = action.get("json_path", "")            sql_value = self._extract_field_with_jsonpath(self.find_all_result(_sql), json_path)  # 提取查询数据            sql_variables_mapping = {action.get("variable_name"): sql_value}            self.session_context.update_session_variables(sql_variables_mapping)  # 更新效果变量            sql_dict = {                "sql": _sql,                "json_path": json_path,                "sql_value": sql_value            }            log_print(sql_dict)            self.sql_results.append(sql_dict)    def __eval_sql(self, sql_item):        """ evaluate sql item. """        sql_value = self.session_context.eval_content(sql_item)        return sql_value    def _extract_field_with_jsonpath(self, data, field):        """        :param data: 数据源        :param field: Jsonpath                    expression, e.g.                    1)$.code                    2) $..items. *.id        """        if field.startswith("$"):            result = jsonpath.jsonpath(data, field)            if result:                return result[0]  # 去除列表            else:                raise exceptions.ExtractFailure("\tjsonpath {} get nothing\n".format(field))        else:            return field


  • 至此已实现了底子功能点。终极是要完成Sql效果与实际数据校验,那么Validator类必要重写么?
    因原httprunner源码中已实现了Validator,可以不必要改动,仅需在api文件_add_tests方法中把Runner类改成自界说ARunner类就行。但是以上代码遗留了一个题目是在用例布局体validate中界说($参数名),必须在变量variables中界说该参数名称否则运行代码会报找不到参数错误
    例:



  • 完备用例布局体


  • 日志效果图



您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2025-3-10 00:08, Processed in 0.184222 second(s), 32 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表