conf===>settings.py
\'\'\' 存放项目相关配置信息(相关路径的配置) \'\'\' import os #获取文件跟目录路径 BASE_PATH=os.path.dirname(os.path.dirname(__file__)) #转到db目录下 DB_PATH=os.path.join(BASE_PATH,\'db\') #转到user_data目录下 USER_DATA=os.path.join(DB_PATH,\'user_data\') #转到shop_data目录下 SHOP_DATA=os.path.join(DB_PATH,\'shop_data\') #日志的配置 # 定义三种日志输出格式 standard_format = \'[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]\' \ \'[%(levelname)s][%(message)s]\' # 其中name为getlogger指定的名字 simple_format = \'[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s\' id_simple_format = \'[%(levelname)s][%(asctime)s] %(message)s\' # 定义日志输出格式 logfile_dir = os.path.join(BASE_PATH, \'log\') # print(logfile_dir) logfile_name = \'ATM.log\' if not os.path.isdir(logfile_dir): os.mkdir(logfile_dir) # log文件的全路径 logfile_path = os.path.join(logfile_dir, logfile_name) LOGGING_DIC = { \'version\': 1, \'disable_existing_loggers\': False, \'formatters\': { \'standard\': { \'format\': standard_format }, \'simple\': { \'format\': simple_format }, }, \'filters\': {}, \'handlers\': { # 打印到终端的日志 \'console\': { \'level\': \'DEBUG\', \'class\': \'logging.StreamHandler\', # 打印到屏幕 \'formatter\': \'simple\' }, # 打印到文件的日志 \'default\': { \'level\': \'DEBUG\', \'class\': \'logging.handlers.RotatingFileHandler\', # 保存到文件 \'formatter\': \'standard\', \'filename\': logfile_path, # 日志文件 \'maxBytes\': 1024 * 1024 * 5, # 日志大小 5M \'backupCount\': 5, \'encoding\': \'utf-8\', }, }, \'loggers\': { # logging.getLogger(__name__)拿到的logger配置 \'\': { \'handlers\': [\'default\', \'console\'], \'level\': \'DEBUG\', \'propagate\': True, }, }, }
core====>admin.py
from interface import admin_interface def change_locked(): #给用户账户解锁 while True: username=input(\'请输入解锁账户名:\').strip() flag,msg=admin_interface.unlockuser(username) if flag: print(msg) break else: print(msg) def change_money(): while True: username=input(\'请输入修改信用额度的账户名:\').strip() creditsmoney = input(\'请输入用户额度:\').strip() flag,msg=admin_interface.change_creditsmoney(username,creditsmoney) if flag: print(msg) break else: print(msg) def add_user(): while True: username=input(\'请输入需要设置管理员权限的账户名:\').strip() flag,msg=admin_interface.add_admin(username) if flag: print(msg) break else: print(msg) def add_shop(): while True: print("=======温情提示:输入q/Q则退出添加商品=====》") shopname=input(\'请输入要添加的商品名称:\').strip() if shopname==\'q\'or shopname==\'Q\': break unitprice= input(\'请输入商品单价:\').strip() total = input(\'请输入添加商品总数:\').strip() if total.isdigit(): flag,msg=admin_interface.add_shop(shopname,unitprice,total) if flag: print(msg) continue else: print(msg) continue else: print(\'输入的数量不合法,添加失败,请重新输入\') choice_dic = { \'1\': change_locked, \'2\': change_money, \'3\': add_user, \'4\': add_shop, \'0\': exit } #管理员功能启动入口 def admin_run(): while True: print(""" ========= ~欢迎进入管理员界面~ ======== 1 用户账户解锁 2 修改额度 3 设置管理员账号 4 添加商品库存 0 退出 ================ end ================= """) choice = input(\'请输入功能编号:\').strip() if choice in choice_dic: choice_dic[choice]() else: print(\'输入的编号不存在或者不合法,请重新输入~~\') continue
core====>src.py
\'\'\' 用户视图层,存放项目核心逻辑。。。 \'\'\' #调用不同的接口模块 from interface import user_interface from interface import bank_interface from interface import shop_interface from lib import common # 全局变量,记录用户是否已登录 Login_user=None def register(): while True: username=input("请输入注册用户姓名:").strip() password = input("请输入密码:").strip() password1 = input("请再次输入密码:").strip() if password==password1: #把用户视图层得到的数据传给接口层进行处理 flag,msg=user_interface.register_interface(username,password1) if flag: print(msg) break else: print(msg) def login(): count=0 while count<=3: username = input("请输入用户名:").strip() password = input("请输入密码:").strip() flag,msg=user_interface.login_interface(username, password) if flag==\'M\': #返回值为M,密码错误 count+=1 if count==3: print(\'密码输错三次,已被锁定!\') #把锁定值传去数据处理层进行更新保存 user_interface.login_locked(username,1) break print(\'%s还有%s次机会哦~!\' %(msg,(3-count))) continue elif flag==\'L\': #返回为L账户被锁定 print(msg) break elif flag:#返回为1,登陆成功 print(msg) global Login_user Login_user=username break else: print(msg)#返回为0,用户名不存在 break @common.login_auth def check_mongey(): user_money=bank_interface.checkmongey_interface(Login_user) print("您的账户余额为:%s" %user_money) @common.login_auth def payback_money(): while True: money=input(\'请输入还款金额:\').strip() if money.isdigit(): flag,msg=bank_interface.paybackmoney_interface(Login_user,money) if flag: print(msg) break elif flag==\'N\': print(msg) break else: print(msg) continue else: print(\'请输入合法数字!\') @common.login_auth def withdraw_money(): while True: money=input(\'请输入您要提现的金额:\').strip() if money.isdigit(): flag,msg=bank_interface.withdrawmoney_interface(Login_user,money) if flag: print(msg) break else: print(msg) continue else: print(\'请输入合法数字!\') @common.login_auth def transfer_money(): while True: collectuser = input(\'请输入您要转账的账户名:\').strip() transfermoney = input(\'请输入您要转账金额:\').strip() if transfermoney.isdigit(): flag, msg = bank_interface.transfermoney_interface(Login_user,collectuser,transfermoney) if flag: print(msg) break else: print(msg) continue else: print(\'请输入合法数字!\') @common.login_auth def shoppingcart(): from db import db_handler shop_dic=db_handler.shopdata() #商品库存数据 print(shop_dic) user_dic=db_handler.userdata(Login_user) shopcar_dic=user_dic[\'shoppingcart\'] #用户购物车数据 print(shopcar_dic) print(\'商城商品展示如下:\') for shopneme,shop_list in shop_dic.items(): unit_price,total=shop_list print(f\'商品名称:{shopneme},单价:{unit_price}元,剩余库存:{total}个\') print(\'温情提示:输入q/Q则结束商品添加!\') while True: shop_name=input(\'请输入您要购买的商品名称:\').strip() if shop_name==\'q\'or shop_name==\'Q\': print(\'已退出!\') break if shop_name in shop_dic: count=input(\'请输入商品个数:\').strip() if count.isdigit(): flag,msg=shop_interface.add_shopcar(Login_user,shop_name,count) if flag: print(msg) continue else: print(msg) continue else: print("商品个数大于库存或不合法,请重新输入") else: print("商品不存在,请重新输入~!") @common.login_auth def check_shoppingcart(): usershopcar_dic=shop_interface.checkshop_interface(Login_user) all_total=0 if usershopcar_dic: for shopname,shop_list in usershopcar_dic.items(): price,count=shop_list price=float(price) count=float(count) total=float(price*count) all_total+=total print(f\'商品名称:{shopname},单价:{price}元,数量:{count}个,总价:{total}\') while True: choice=input("是否对购物车商品进行结算?(Y:结算/N:放弃):").strip() if choice==\'Y\': #调用银行结算接口进行结算 flag,msg=bank_interface.paymoney_interface(Login_user,all_total) if flag: print(msg) break else: print(msg) break if choice==\'N\': break else: print(\'您的购物车空空如也~~!\') @common.login_auth def check_paylist(): user_flow = bank_interface.paylist_interface(Login_user) if user_flow: for flow in user_flow: print(flow) else: print(\'当前用户没有流水记录!\') @common.login_auth def admin(): from core import admin #首先要判断该账号是否是管理账号 flag=user_interface.isadmin(Login_user) if flag: #返回值为1,则是管理员账号,否则不是管理员 admin.admin_run() else: print(\'非管理员账号,无法使用该功能!\') #用户choice第一次选择,使用以下功能: choice_dic={ \'1\': register, \'2\': login, \'3\': check_mongey, \'4\': payback_money, \'5\': withdraw_money, \'6\': transfer_money, \'7\': shoppingcart, \'8\': check_shoppingcart, \'9\': check_paylist, \'10\': admin, \'0\': exit } #项目启动程序 def run(): while True: print(""" ========= ~ATM + 购物车~ ======== 1 注册功能 2 登录功能 3 查看余额 4 还款功能 5 提现功能 6 转账功能 7 购物功能 8 查看购物车 9 查看流水 10 管理员功能 0 退出 ============ end ============== """) choice=input(\'请输入功能编号:\').strip() if choice in choice_dic: choice_dic[choice]() else: print(\'输入的编号不存在或者不合法,请重新输入~~\') continue
db====>db_handler.py
\'\'\' 数据处理层,用于处理相关数据(数据的存取,更新等) \'\'\' import json from conf import settings import os #查看用户信息数据,调用json模块 def userdata(username): #获取用户名的文件,需要先进行路径拼接 user_path=os.path.join(settings.USER_DATA,f\'{username}.json\') #进行校验 if os.path.exists(user_path): with open(user_path,\'r\',encoding=\'utf-8\')as f: user_dic=json.load(f) return user_dic def userdata_save(user_dic): #首先需要用户名来拼接用户的文件名,所以先从字典里取出用户名 username=user_dic.get(\'username\') user_path=os.path.join(settings.USER_DATA,f\'{username}.json\') #保存数据 with open(user_path,\'w\',encoding=\'utf-8\')as f: json.dump(user_dic,f,ensure_ascii=False) #查看商品库存信息数据,调用json模块 def shopdata(): #获取商品库存的文件路径,需要先进行路径拼接 shopstocks_path=os.path.join(settings.SHOP_DATA,\'shopstocks.json\') #进行校验 if os.path.exists(shopstocks_path): with open(shopstocks_path,\'r\',encoding=\'utf-8\')as f: shopstocks_dic=json.load(f) return shopstocks_dic def shopdata_save(shopstocks_dic): #保存数据 shopstocks_path=os.path.join(settings.SHOP_DATA,\'shopstocks.json\') with open(shopstocks_path,\'w\',encoding=\'utf-8\')as f: json.dump(shopstocks_dic,f,ensure_ascii=False)
interface====>admin_interface.py
from db import db_handler #用户账户解锁接口 def unlockuser(username): user_dic=db_handler.userdata(username) if user_dic: user_dic[\'locked\']=0 msg=f\'用户{username}解锁成功!\' db_handler.userdata_save(user_dic) return True,msg else: msg=f\'解锁用户不存在,请核实!\' return False,msg #用户信用额度修改接口 def change_creditsmoney(username,creditsmoney): user_dic=db_handler.userdata(username) if user_dic: user_dic[\'creditsmoney\']=int(creditsmoney) msg=f\'{username}账户额度修改成功,额度为:{creditsmoney}元!\' db_handler.userdata_save(user_dic) return True,msg else: msg=f\'修改账户不存在,请核实!\' return False,msg #设置管理账号接口 def add_admin(username): user_dic=db_handler.userdata(username) if user_dic: user_dic[\'is_admin\']=True msg=f\'修改{username}账号为管理员~!\' db_handler.userdata_save(user_dic) return True,msg else: msg=f\'修改账户不存在,请核实!\' return False,msg #管理员添加商品库存接口 def add_shop(shopname,price,total): shopstocks_dic=db_handler.shopdata() price=int(price) total=int(total) if shopname in shopstocks_dic: shopstocks_dic[shopname][0]=price shopstocks_dic[shopname][1]+=total msg=f\'{shopname}商品数量更新成功~~~\' db_handler.shopdata_save(shopstocks_dic) return True, msg else: shopstocks_dic[shopname]=[price,total] msg=f\'{shopname}商品添加成功~~~\' db_handler.shopdata_save(shopstocks_dic) return True,msg
interface====>bank_interface.py
\'\'\' 银行相关业务的接口 \'\'\' #查询账户余额接口 from db import db_handler from lib import common bank_logger=common.logger(log_type=\'bank\') #用户查询余额接口 def checkmongey_interface(username): user_dic=db_handler.userdata(username) return user_dic[\'money\'] #用户还款接口 def paybackmoney_interface(username,paymoney): #paymoney=300 user_dic=db_handler.userdata(username) money=float(user_dic[\'money\']) #money=15000 creditsmoney=float(user_dic[\'creditsmoney\']) #获取账号信用额度creditsmoney 20000 re_money=creditsmoney-money #re_money=500 paymoney=float(paymoney) #paymoney=300 leftmoney=creditsmoney-(money+paymoney) #leftmoney=20000-(15000+300) if money<=creditsmoney: #判断账户金额是否超过信用额度,超过则不需要还款 if paymoney<=re_money: msg=f\'用户{username}还款{paymoney}元成功,还剩{leftmoney}元未还款!\' bank_logger.info(msg) user_dic[\'user_flow\'].append(msg) user_dic[\'money\']=money+paymoney db_handler.userdata_save(user_dic) return True,msg else: msg=f\'当前用户全部还款金额为:{re_money}元,请重新输入\' return False,msg else: msg=f\'您的账户余额大于信用额度,不需要进行还款~\' return \'N\',msg #提现接口 def withdrawmoney_interface(username,withdraw_money,hand=0.005): #用户:tank提现金额:10000:手续费:0.5%(10000*0.5%) 得到10000-(10000*0.5%) user_dic=db_handler.userdata(username) withdraw_money=float(withdraw_money) #withdraw_money=10000 money=float(user_dic[\'money\']) #用户账户余额 money=10000 handmoney=withdraw_money*hand #手续费10000*0.5% actualmoney=withdraw_money-handmoney if withdraw_money<=money: msg=f\'用户{username}提现金额:{withdraw_money}元,扣除手续费:{handmoney}元,实际取款:{actualmoney}元!\' bank_logger.info(msg) user_dic[\'user_flow\'].append(msg) user_dic[\'money\']=money-withdraw_money db_handler.userdata_save(user_dic) return True,msg msg=f\'您的账户余额不足,请查询好充值!\' return False,msg #转账接口 def transfermoney_interface(transfer_user,collect_user,transfermoney): transf_user_dic=db_handler.userdata(transfer_user) collect_user_dic=db_handler.userdata(collect_user) if collect_user_dic: money=float(transf_user_dic[\'money\']) transfermoney=float(transfermoney) if transfermoney<=money: msg=f\'用户{transfer_user}成功向{collect_user}的账户,转账{transfermoney}元!\' bank_logger.info(msg) msg1=f\'用户{collect_user}收到{transfer_user}账户的转账{transfermoney}元!\' bank_logger.info(msg1) transf_user_dic[\'user_flow\'].append(msg) collect_user_dic[\'user_flow\'].append(msg1) transf_user_dic[\'money\']=money-transfermoney money=float(collect_user_dic[\'money\']) collect_user_dic[\'money\']=money+transfermoney db_handler.userdata_save(transf_user_dic) db_handler.userdata_save(collect_user_dic) return True,msg else: msg=f\'转账失败,账户余额不足,请先查询!\' return False,msg else: msg=f\'对方账户不存在,请先核实!\' return False,msg #付款接口 def paymoney_interface(username,paymoney): user_dic=db_handler.userdata(username) money=float(user_dic[\'money\']) paymoney=float(paymoney) if user_dic: if paymoney<=money: user_dic[\'money\']=money-paymoney msg=f\'用户{username}支付{paymoney}元成功,准备发货啦~\' bank_logger.info(msg) user_dic[\'shoppingcart\']={} #付款成功后,清空购物车 user_dic[\'user_flow\'].append(msg) db_handler.userdata_save(user_dic) return True,msg else: msg=f\'您的账户余额不足,请先查询!\' return False,msg #查看消费流水接口 def paylist_interface(username): user_dic=db_handler.userdata(username) return user_dic[\'user_flow\']
interface====>shop_interface.py
\'\'\' 购物商城相关业务的接口 \'\'\' from db import db_handler from lib import common shop_logger=common.logger(log_type=\'shop\') #用户添加购物车商品接口 def add_shopcar(username,shopname,count): shop_dic=db_handler.shopdata() #获取商品库存信息所有商品 """ shop_dic={"suger": [20, 50], "mac": [3000, 50], "phone": [2000, 50], "ice": [400, 500]} """ """ "shoppingcart": {"phone": [2000,1]} shopcar_dic={"phone": [2000,1]} """ user_dic=db_handler.userdata(username) #获取用户信息 shopcar_dic=user_dic[\'shoppingcart\'] #用户信息中的购物车信息 if shopname in shop_dic: total=int(shop_dic[shopname][1]) #库存数量 unit_price=float(shop_dic[shopname][0]) #商品单价 count=int(count) if count<=total: shop_dic[shopname][1]=total-count if shopname in shopcar_dic: shopcar_dic[shopname][1]+=count msg=f\'已将{shopname}添加到购物车!\' shop_logger.info(msg) db_handler.userdata_save(user_dic) db_handler.shopdata_save(shop_dic) return True, msg else: shopcar_dic[shopname]=[unit_price,count] user_dic[\'shoppingcart\']=shopcar_dic msg=f\'已将{shopname}添加到购物车!\' shop_logger.info(msg) db_handler.userdata_save(user_dic) db_handler.shopdata_save(shop_dic) return True,msg else: msg=f\'商品库存不足,还剩{total}个,请重新输入\' return False,msg #查看购物车接口 def checkshop_interface(username): user_dic=db_handler.userdata(username) usershopcar_dic=user_dic[\'shoppingcart\'] return usershopcar_dic
interface====>user_interface.py
\'\'\' 逻辑接口层,用户接口 \'\'\' from db import db_handler from lib import common user_logger=common.logger(log_type=\'user\') #注册接口,接收注册数据进行处理 def register_interface(username,password,creditsmoney=20000): #调用用户数据处理层,进行数据的校验判断 user_dic=db_handler.userdata(username) if user_dic: return False,\'您注册的用户名已存在!\' #如果不存在,则将密码进行加密后,传到数据处理层进行保存 password=common.md5password(password) money=creditsmoney #用户字典信息 user_dic={ \'username\': username, \'password\': password, \'money\': money, \'creditsmoney\': creditsmoney, \'user_flow\': [], \'shoppingcart\':{}, \'locked\':0, \'is_admin\': False, } #传到数据处理层进行保存 msg=f\'您已注册成功,您的信用默认额度为:{creditsmoney},如需更改请联系管理员!====》 请先登陆!\' user_logger.info(msg) db_handler.userdata_save(user_dic) return True,msg #登陆接口 def login_interface(username,password): #调用数据处理层处理数据,查看用户名是否存在 user_dic=db_handler.userdata(username) if user_dic: #先判断用户是否被锁定 locked=user_dic.get(\'locked\') if locked==1: #1 被锁,0 未被锁 msg=f\'您的账户已被锁定,请联系管理员解锁!\' return \'L\',msg if locked==0: #把密码进行加密 password=common.md5password(password) md_password=user_dic.get(\'password\') if password==md_password: msg=f\'你已登陆成功!\' user_logger.info(msg) return True,msg else: msg=f\'您的密码输入有误,请重新输入,\' return \'M\',msg else: msg=f\'您输入的用户名不存在或错误,请先核实!\' return False,msg #用户被锁定接口 def login_locked(username,lock): user_dic=db_handler.userdata(username) user_dic[\'locked\']=lock db_handler.userdata_save(user_dic) #判断用户账号是否是管理员账号 def isadmin(username): user_dic=db_handler.userdata(username) flag=user_dic[\'is_admin\'] return flag
lib====>common.py
\'\'\' 存放公共的使用方法 \'\'\' import hashlib from conf import settings #对密码进行处理,不加盐 def md5password(password): md_password=(hashlib.md5(password.encode(\'utf-8\'))).hexdigest() return md_password #登陆功能添加装饰器 def login_auth(func): from core import src def inner(*args, **kwargs): if src.Login_user: res = func(*args, **kwargs) return res else: print(\'请先登录后使用~!\') src.login() return inner #添加日志功能,导入日志模块 import logging.config def logger(log_type): #获取日志配置信息获取日志路径 logging.config.dictConfig(settings.LOGGING_DIC) logger=logging.getLogger(log_type) return logger
start.py
""" 程序的启动入口 """ import os import sys import core #添加环境解释器的环境变量 sys.path.append(os.path.dirname(__file__)) #引用逻辑层里面的启动程序入口,开始启动程序 from core import src if __name__==\'__main__\': src.run()