程序代碼 1.main ''' 程序入口 ''' import os,sys #建立一個目錄 basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #把目錄追加到我的路徑里 sys.path.append(basedir) #導入我的主程序 from zy3.src import service if __name__ == '__main__': #運行我的service主程序 service.execute() 2.setting """ 配置文件,,數(shù)據(jù)庫的一些配置信息 """ #當前登陸用戶的權(quán)限列表 user_permission_list = [] #當前登陸用戶的基本信息 user_info = {} #數(shù)據(jù)庫的配置信息 pyMysql_Connect_Dict = { "host":'127.0.0.1', "port":3306, "user":'root', "passwd":'自己建', "db":'自己建', "charset":'utf8' } 3.type_permission """ 查看用戶權(quán)限、為某個角色分配權(quán)限 """ from 自己建.src.repository.permission import Permission from 自己建.src.repository.user_info import UserInfoRepository from 自己建.src.repository.user_type import UserTypeRepository from 自己建.src.repository.user_type_to_permission import UserTypePermission user= UserInfoRepository() #操作用戶信息的表的類 user_type = UserTypeRepository() #操作角色表的類 permission = Permission() #操作權(quán)限表的類 type_permission = UserTypePermission() #操作權(quán)限和角色對應關(guān)系的類 def look_type(): look= input("請輸入你查看的用戶:") username =user.fetch_by_user(look) if username: print("該用戶 %s 存在"%username) permissions=type_permission.get_permission(username['user_type_id']) for i in permissions: print(i["caption"]) else: print("沒有該用戶") def add_per(): #從permission表中獲取所有權(quán)限列表 types=permission.fetch_all() #從user_type表中獲取所有角色列表 pers=user_type.all_caption() print("{:>8}目前可以管理的角色信息如下".format("")) for i in types: print(i["nid"],i["caption"]) type_id=input("請輸入添加權(quán)限角色的id") # 在打印的時候在前面 print("{:>8}目前可以管理的角色信息如下".format("")) for i in pers: print(i["nid"],i["caption"]) per_id = input("請輸入需要分配權(quán)限的id:") type_permission.add(user_type_id=type_id, permission_id=per_id) print("{:>8}權(quán)限分配成功".format('')) 4.user """ 添加/刪除用戶 """ from 自己建.src.repository.user_info import UserInfoRepository from 自己建.src.repository.user_type import UserTypeRepository from 自己建.src.utils.MD5 import pwd_md5 from 自己建.src.repository import user_info ''' 實現(xiàn)向user_info表中添加和刪除用戶 ''' def add_user(): obj_user = UserInfoRepository() obj_type = UserTypeRepository() types = obj_type.all_caption() while True: print('{:>8}請輸入以下信息創(chuàng)建用戶'.format('')) name = input('請輸入用戶名:') password = input('請輸入密碼:') pwd = pwd_md5(password) print('{:>8}角色列表如下:'.format('')) for i in types: print(i['nid'], i['caption']) typeid = input('請選擇角色id:') username = obj_user.exist(name) if username: print('\033[36;1m該用戶已存在\033[0m') continue else: obj_user.add(username=name, passwd=pwd, user_type_id=typeid) print('{:>8}成功創(chuàng)建用戶'.format('')) break def del_user(): obj = UserInfoRepository() name = input('請輸入需要刪除的用戶名:') obj.dele(name) print('{:>8}成功刪除用戶'.format('')) 5.user_type """ 添加/刪除角色 """ from 自己建.src.repository.user_type import UserTypeRepository type = UserTypeRepository() def add_type(): caption = input('請輸入你要創(chuàng)建的角色:') type.add(caption) print('角色創(chuàng)建成功') def del_type(): caption = input('請輸入你要刪除的角色:') type.del_data(caption) print('刪除成功!') 6.permission """ 操作權(quán)限表的類 導入路連接數(shù)據(jù)庫和關(guān)閉數(shù)據(jù)庫的操作 """ from 自己建.src.utils.db_connection import DbConnect try: class Permission: # 初始化對象 def __init__(self): self.db_conn=DbConnect() def add(self,**kwargs): conn =self.db_conn.connect() #連接數(shù)據(jù)庫 sql="""insert into permission(%s) VALUES (%s)"""#在mysql表里插入字段名和數(shù)據(jù) key_list=[] #建立一個列表存放key value_list=[]#建立一個列表存放value for k in kwargs:#for循環(huán)遍歷kwargs key_list.append(k)#把key存放在key_list中 value_list.append("%%(%s)s"%k)#把value存放在value_list中 sql=sql%(",".join(key_list),",".join(value_list)) #通過分隔符去分別插入到兩創(chuàng)建的行列表中 conn.execute(sql, kwargs)# 提交數(shù)據(jù)運行語句 self.db_conn.close()#關(guān)閉連接 def fetch_all(self): conn = self.db_conn.connect() #與數(shù)據(jù)庫建立連接 sql="""select * from permission""" #在MySQL的pemission中查詢所有角色權(quán)限信息 conn.execute(sql) #提交數(shù)據(jù)運行語句 result=conn.fetchall() #獲取所有角色的權(quán)限 self.db_conn.close() #關(guān)閉連接 return result #返回獲取的所有數(shù)據(jù) except Exception as e: print(e) 7.user_info from 自己建.src.utils.db_connection import DbConnect class UserInfoRepository: def __init__(self): self.db_conn = DbConnect() def add(self, **kwargs): ''' 新增用戶 :param kwargs: :return: ''' cursor = self.db_conn.connect() sql = """ insert into user_info(%s) values(%s)""" key_list = [] value_list = [] for k, v in kwargs.items(): key_list.append(k) value_list.append('%%(%s)s' % k) sql = sql % (','.join(key_list), ','.join(value_list)) cursor.execute(sql, kwargs) self.db_conn.close() def dele(self, name): ''' 根據(jù)用戶名刪除用戶 :param name: :return: ''' cursor = self.db_conn.connect() sql = "delete from user_info WHERE username=%s" cursor.execute(sql, name) self.db_conn.close() def fetch_by_user_pwd(self, username, password): """ 根據(jù)用戶名密碼獲取賬戶信息、角色類型 :param username: :param password: :return: """ cursor = self.db_conn.connect() sql = """ select user_info.nid as nid, user_info.username as username, user_info. user_type_id as user_type_id, user_type.caption as user_type_caption from user_info left join user_type on user_info.user_type_id=user_type.nid where user_info.username=%s and user_info.passwd=%s """ cursor.execute(sql, [username, password, ]) result = cursor.fetchone() self.db_conn.close() return result def fetch_by_user(self, username): """ 僅根據(jù)用戶名獲取賬戶信息,、角色類型 :param username: :param password: :return: """ cursor = self.db_conn.connect() sql = """ select user_info.nid as nid, user_info.username as username, user_info. user_type_id as user_type_id, user_type.caption as user_type_caption from user_info left join user_type on user_info.user_type_id=user_type.nid where user_info.username=%s """ cursor.execute(sql, username) result = cursor.fetchone() self.db_conn.close() return result def exist(self, username): global users ''' 根據(jù)用戶名判斷用戶是否存在 :param username: :return: ''' sql = "select 1 from user_info where username=%s" cursor = self.db_conn.connect() cursor.execute(sql, [username,]) result = cursor.fetchone() self.db_conn.close() return result def Change_password(self,password,username): sql = """ UPDATE user_info set passwd="%s" where username="%s" """%(password,username) cursor = self.db_conn.connect() a=cursor.execute(sql) result = cursor.fetchone() self.db_conn.close() return result 8.user_type from 自己建.src.utils.db_connection import DbConnect try: class UserTypeRepository: def __init__(self): #創(chuàng)建連接,,建立游標 self.db_conn = DbConnect() #添加角色 def add(self,caption): #建立連接,,游標 conn = self.db_conn.connect() #創(chuàng)建插入表結(jié)構(gòu) sql = """insert into user_type (caption) VALUE (%s)""" #執(zhí)行插入操作 conn.execute(sql, [caption,]) #關(guān)閉連接 self.db_conn.close() #刪除角色 def del_data(self,caption): #建立游標和連接 conn = self.db_conn.connect() #刪除語句 sql = """delete from user_type where caption=%s""" #執(zhí)行刪除語句 conn.execute(sql, [caption,]) #關(guān)閉連接 self.db_conn.close() def all_caption(self): conn = self.db_conn.connect() sql = """select * from user_type """ conn.execute(sql) data = conn.fetchall() self.db_conn.close() return data except Exception as e: print(e) 9.user_type_to_permission from 自己建.src.utils.db_connection import DbConnect try: class UserTypePermission: def __init__(self): #建立數(shù)據(jù)庫,,游標 self.db_conn = DbConnect() def add(self,**kwargs): #連接數(shù)據(jù)庫,游標 cursor = self.db_conn.connect() sql = """ insert into user_type_to_permission(%s) values(%s)""" #key列表存放插入user_type_id_permission的字節(jié) key_list = [] #value列表存放插入user_type_id_permission的字節(jié)的值 value_list = [] for k in kwargs: key_list.append(k) value_list.append('%%(%s)s' % k) sql = sql % (','.join(key_list), ','.join(value_list)) cursor.execute(sql, kwargs) self.db_conn.close() #根據(jù)用戶角色的id獲取所有權(quán)限 def get_permission(self, user_type_id): #建立連接,,游標 cursor = self.db_conn.connect() #查詢權(quán)限表的所有數(shù)據(jù)和外連接權(quán)限和角色對應關(guān)系表用戶角色ID sql = """select * from user_type_to_permission left join permission on user_type_to_permission.permission_id = permission.nid where user_type_to_permission.user_type_id = %s """ #執(zhí)行mysql語句和用戶角色Id cursor.execute(sql,user_type_id) #獲取所有的數(shù)據(jù) result = cursor.fetchall() #關(guān)閉連接 self.db_conn.close() #返回我獲取的數(shù)據(jù) return result except Exception as e: print(e) 10.db_connection ''' 連接和關(guān)閉數(shù)據(jù)庫 ''' import pymysql from 自己建.config import settings class DbConnect: def __init__(self): #導入我的數(shù)據(jù)庫配置信息 self.__conn_dict = settings.pyMysql_Connect_Dict self.conn = None self.cursor = None #建立連接數(shù)據(jù)庫以及當前游標 def connect(self,cursor=pymysql.cursors.DictCursor): self.conn = pymysql.connect(**self.__conn_dict) self.cursor = self.conn.cursor(cursor=cursor) return self.cursor #關(guān)閉連接 def close(self): #提交事務(wù) self.conn.commit() #關(guān)閉游標 self.cursor.close() #關(guān)閉連接 self.conn.close() 11.MD5#加密 import hashlib def pwd_md5(pwd): obj = hashlib.md5() obj.update(pwd.encode('utf-8')) pwd_MD5 = obj.hexdigest() return pwd_MD5
12.service """ from 自己建.src.auth import user from 自己建.src.repository.user_info import UserInfoRepository from 自己建.src.repository.user_type_to_permission import UserTypePermission from 自己建.src.repository.user_type import UserTypeRepository from 自己建.config import settings from 自己建.src.utils.MD5 import pwd_md5 import importlib user_info = UserInfoRepository() type = UserTypeRepository() type_permission = UserTypePermission() def login(): while True: username = input('請輸入用戶名:') password = input('請輸入密碼:') pwd = pwd_md5(password) userinfo = user_info.fetch_by_user_pwd(username,pwd) if not userinfo: print('\033[34;1m用戶名或密碼輸入錯誤,,請重新輸入!\033[0m') continue permission_list = type_permission.get_permission(userinfo['user_type_id']) settings.user_permission_list = permission_list settings.user_info = user_info return True def register(): user.add_user() def change_pwd(): while True: user_name = input('你要修改哪個用戶的密碼:') name = user_name user_name = user_info.exist(user_name) if not user_name: print('\033[36;1m沒有該用戶\033[0m') continue else: password = input('請輸入密碼:') pwd = pwd_md5(password) # print(user_name) user_info.Change_password(pwd,name) print('修改成功') break def all_information(): while True: print('請選擇你要進行的權(quán)限:') for i, item in enumerate(settings.user_permission_list, 1): print('\t',i, item['caption']) msg = int(input('請輸入您選擇的權(quán)限序號:')) permission = settings.user_permission_list[msg-1] modules = permission['module'] func_name = permission['func'] m = importlib.import_module(modules) func = getattr(m,func_name) func() def execute(): while True: print('\033[36;1m歡迎來到權(quán)限管理系統(tǒng):請您選擇:1.登陸 2.注冊 3.修改密碼\033[0m') dic={'1':login,'2':register,'3':change_pwd} msg = input('登陸請按1,,注冊請按2,,修改密碼請按3:') if msg not in dic.keys(): print('\033[34;1m對不起沒有該選項,請重新輸入您的選擇!\033[0m') continue func = dic[msg] say = func() if say: all_information()
具體代碼沒全按照上一篇隨筆的結(jié)構(gòu)圖來編寫 后續(xù)會有更新
|
|