kelvinchiang

概述

开发命令行入口main.py

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-


import argparse
import logging
import os


from gx.action.publish_cmc import publish_rpm_func
from gx.data.global_vars import GlobalVars


verbose = os.environ.get(\'VERBOSE\')
log = logging.getLogger(\'gxtool\')

log.setLevel(level=logging.DEBUG)
handler = logging.StreamHandler()
if verbose == \'1\':
    handler.setLevel(logging.DEBUG)
else:
    handler.setLevel(logging.INFO)
str_log = \'[%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s\'
formatter = logging.Formatter(str_log)
handler.setFormatter(formatter)
log.addHandler(handler)

log.debug(\'VERBOSE={}\'.format(verbose))


cur_path = os.path.realpath(__file__)
log.debug("cur_path: {}".format(cur_path))

VERSION = \'0.8.3-4\'
RELEASE_NOTE = """
==============================================================================
date        version                   info
------------------------------------------------------------------------------
            0.6.0-1   init-dep解耦和拆分,enable-repofile、init-dep、copy-conf
==============================================================================
"""

PARAMS = GlobalVars()
PARAMS.put(\'gxtool_dir\', cur_path)
PARAMS.put(\'version\', VERSION)
PARAMS.put(\'release_note\', RELEASE_NOTE.strip(\'\n\'))


def welcome():
    print("""
Welcome!
This is the main program for gxtool function. 
It provides many features, and you can use \'gxtool --help\' to check. 

Thank you!
Copyright (c) Huawei Technologies Co., Ltd. 2012-2020. All rights reserved.
""")


def add_commands():
    """
    Process command line options and return
    an argparse object containing all arguments

    :return: all the arguments parsed into the object
    :rtype: an argparse object
    """
    parser = argparse.ArgumentParser(prog=\'gxtool\',
                                     description="Develop tool for gx project")
    parser.add_argument(\'-v\', \'--version\', action="version",
                        version="Version: {}".format(VERSION))
    desc = \'Subcommands for gxtool. \' \
           \'Use "gxtool [subcmd] --help" for detail usage.\'
    sub_parser = parser.add_subparsers(title=\'subcommands\',
                                       description=desc,
                                       dest="subparser_name"
                                       )
    # 环境检查
    add_env_parser(sub_parser)
    # 版本信息
    add_info_parser(sub_parser)

    # 编译核心
    add_dep_parser(sub_parser)
    add_copy_conf_parser(sub_parser)
    add_source_parser(sub_parser)

    # 发布cmc
    add_publish_parser(sub_parser)
    add_publish_comp_parser(sub_parser)

    # 查询repo信息
    add_check_repo_parser(sub_parser)
    add_check_local_repo_parser(sub_parser)
    add_search_rpm_parser(sub_parser)
    add_download_rpm_parser(sub_parser)
    add_extract_rpm_parser(sub_parser)
    show_repo_rpm_parser(sub_parser)
    # 更新本地repo文件
    update_repo_info_parser(sub_parser)

    args = parser.parse_args()

    PARAMS.update(vars(args))
    log.debug(PARAMS.get_all())
    func_name = PARAMS.get("func")
    log.debug(\'func_name: {}\'.format(func_name))
    if func_name:
        func_name()
    else:
        welcome()


def add_env_parser(sub_parser):
    help_info = \'check essential for current env.\'
    env_parser = sub_parser.add_parser(\'env-check\',
                                       help=help_info)
    env_parser.set_defaults(func=env_check_func)


def add_copy_conf_parser(sub_parser):
    """
    add subcommand parser for copy-conf

    :param: sub_parser: the subcommand parser for gxtool
    :return:
    """
    dep_parser = sub_parser.add_parser(\'copy-conf\',
                                       help=\'resolve build_conf for project\')
    dep_parser.add_argument(\'-c\', \'--build_conf\',
                            required=True,
                            help=\'project build_conf path\')
    dep_parser.add_argument(\'-d\', \'--dest_path\',
                            required=True,
                            help=\'spec file path and name\')

    dep_parser.set_defaults(func=copy_conf_func)

def main():
    add_commands()


if __name__ == "__main__":
    main()

  一个单例类 global_vars.py:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

class GlobalVars(object):
    """
    Singleton class GlobalVars to store and manage global variables.
    """

    _instance = None
    _var_dict = {}

    def __new__(cls, *args, **kw):
        """
        Ensure that when creating a new instance, it\'ll be the only
        instance. Otherwise return existing instance.

        :param args:
        :param kw:
        :return: cls._instance
        """

        if not cls._instance:
            cls._instance = super(GlobalVars, cls).__new__(cls, *args, **kw)
        return cls._instance

    def get(self, var_name):
        """
        Get the global variable value from var dict.

        :param var_name:
        :return: self._var_dict[var_name] or None
        """

        if var_name in self._var_dict.keys():
            return self._var_dict[var_name]
        return None

    def put(self, key, value):
        """
        Put a pair of key/value into var dict.

        :param key, value:
        :return: None
        """

        self._var_dict.update({key: value})

    def update(self, var_dict):
        """
        Update the _var_dict with a dict of key/values.
        :param var_dict:
        :return:
        """

        if isinstance(var_dict, dict):
            self._var_dict.update(var_dict)

    def get_all(self):
        return self._var_dict

    def save(self):
        """
        Save the _var_dict in local file.
        :param var_dict:
        :return:
        """
        pass

    def refresh(self):
        """
        init the _var_dict from local file.
        :param var_dict:
        :return:
        """
        pass

  一个最简单基础类 utils.py:

class BaseMethod:

    @staticmethod
    def run_cmd(cmd):
        retcode, output = subprocess.getstatusoutput(cmd)
        if retcode != 0:
            print("RUN CMD: %s, retcode: %s, output: %s"
                  % (cmd, retcode, output))
        return retcode, output

    @staticmethod
    def write_file(filename, content):
        dir_name = os.path.dirname(filename)
        if not os.path.exists(dir_name):
            os.makedirs(dir_name)
        with open(filename, \'w\', encoding=\'utf-8\') as tf:
            tf.write(content)

    @staticmethod
    def read_file_with_json(file_name, mode="r"):
        if not os.path.exists(file_name):
            raise IOError("No such file or directory: %s" % file_name)
        with open(file_name, mode) as fp:
            json_body = json.load(fp)
        return json_body

    @staticmethod
    def write_json_to_file(file_name, json_body, mode="w+"):
        # json_body 要求是字典
        with tempfile.NamedTemporaryFile(mode, dir=os.path.dirname(file_name),
                                         delete=False) as fp:
            json_str = json.dumps(json_body, indent=4, sort_keys=True)
            fp.write(json_str)
            fp.flush()
            temp_name = fp.name
        os.rename(temp_name, file_name)

    @staticmethod
    def get_md5sum(file_name):
        cmd = \'md5sum {}\'.format(file_name)
        ret, out = BaseMethod.run_cmd(cmd)
        md5sum = \'\'
        if ret == 0:
            md5sum = out.split()[0]
        return md5sum

  

分类:

技术点:

相关文章:

  • 2021-09-20
  • 2021-11-05
  • 2022-12-23
  • 2022-12-23
  • 2021-05-09
  • 2021-09-28
  • 2022-12-23
猜你喜欢
  • 2022-02-08
  • 2021-07-24
  • 2021-11-12
  • 2021-11-18
相关资源
相似解决方案