简单封装DBUtils 和 pymysql 并实现简单的逆向工程生成class 类的py文件

  这里使用的 Python 版本是:Python 3.6.0b2。

  涉及的三方库:DBUtils、pymysql

1.ConfigurationParser

  通过调用Python内置的 xml.dom.minidom 对 xml 文件进行解析,获取xml内容。此类用于下面 BaseDao 获取数据库连接信息。

 import sys
import re
import pymysql
import xml.dom.minidom from xml.dom.minidom import parse class ConfigurationParser(object):
"""
解析xml
- @return configDict = {"jdbcConnectionDictList":jdbcConnectionDictList,"tableList":tableList}
"""
def __init__(self, configFilePath=None):
if configFilePath:
self.__configFilePath = configFilePath
else:
self.__configFilePath = sys.path[0] + "/config/config.xml"
pass def parseConfiguration(self):
"""
解析xml,返回jdbc配置信息以及需要生成python对象的表集合
"""
# 解析xml文件,获取Document对象
DOMTree = xml.dom.minidom.parse(self.__configFilePath) # <class 'xml.dom.minidom.Document'>
# 获取 generatorConfiguration 节点的NodeList对象
configDOM = DOMTree.getElementsByTagName("generatorConfiguration")[0] #<class 'xml.dom.minicompat.NodeList'> # 获取 jdbcConnection 节点的 property 节点集合
jdbcConnectionPropertyList = configDOM.getElementsByTagName("jdbcConnection")[0].getElementsByTagName("property")
# 循环 jdbcConnection 节点的 property 节点集合,获取属性名称和属性值
jdbcConnectionDict = {}
for property in jdbcConnectionPropertyList:
name = property.getAttributeNode("name").nodeValue.strip().lower()
if property.hasAttribute("value"):
value = property.getAttributeNode("value").nodeValue
if re.match("[0-9]",value) and name != "password" and name != "host":
value = int(value)
else:
value = property.childNodes[0].data
if re.match("[0-9]",value) and name != "password" and name != "host":
value = int(value)
if name == "charset":
if re.match("utf-8|UTF8", value, re.I):
value = "utf8"
elif name == "port":
value = int(value)
elif name == "creator":
if value == "pymysql":
value = pymysql
jdbcConnectionDict[name] = value
# print(jdbcConnectionDict)
return jdbcConnectionDict if __name__ == "__main__":
print(ConfigurationParser().parseConfiguration())

  config.xml

 <?xml version="1.0" encoding="utf-8"?>
<generatorConfiguration>
<jdbcConnection>
<property name="creator">pymysql</property>
<property name="host">127.0.0.1</property>
<property name="database">rcddup</property>
<property name="port">3306</property>
<property name="user">root</property>
<property name="password">root</property>
<property name="charset">Utf-8</property>
<property name="mincached">0</property>
<property name="maxcached">10</property>
<property name="maxshared">0</property>
<property name="maxconnections">20</property>
</jdbcConnection>
</generatorConfiguration>

2.BaseDao

  BaseDao是在 DBUtils 的基础上对 pymysql 操作数据库进行了一些简单的封装。

  其中 queryUtil 用于拼接SQL语句,log4py用于控制台输出信息,page 分页对象。

  由于DBUtils基础上执行的 SQL 查询结果是一个元组类型结果,在 SQL 查询结果返回之后,利用 setattr()方法实现将 SQL 查询结果转换成想要的类对象。为了得到想要的结果,因此类对象(User)的 __str__()需要按照特定的格式重写(下文会给出User类的代码示例)。

 import pymysql
import time
import json from DBUtils.PooledDB import PooledDB
from configParser import ConfigurationParser
from queryUtil import QueryUtil
from log4py import Logger
from page import Page global PRIMARY_KEY_DICT_LIST
PRIMARY_KEY_DICT_LIST = [] class BaseDao(object):
"""
Python 操作数据库基类方法
- @Author RuanCheng
- @UpdateDate 2017/5/17
"""
__logger = None
__parser = None # 获取 xml 文件信息对象
__poolConfigDict = None # 从 xml 中获取的数据库连接信息的字典对象
__pool = None # 数据库连接池
__obj = None # 实体类
__className = None # 实体类类名
__tableName = None # 实体类对应的数据库名
__primaryKeyDict = {} # 数据库表的主键字典对象
__columnList = [] def __init__(self, obj=None):
"""
初始化方法:
- 1.初始化配置信息
- 2.初始化 className
- 3.初始化数据库表的主键
"""
if not obj:
raise Exception("BaseDao is missing a required parameter --> obj(class object).\nFor example [super().__init__(User)].")
else:
self.__logger = Logger(self.__class__) # 初始化日志对象
self.__logger.start() # 开启日志
if not self.__parser: # 解析 xml
self.__parser = ConfigurationParser()
self.__poolConfigDict = self.__parser.parseConfiguration()
print(self.__poolConfigDict)
self.__pool = PooledDB(**self.__poolConfigDict)
# 初始化参数
if (self.__obj == None) or ( self.__obj != obj):
global PRIMARY_KEY_DICT_LIST
if (not PRIMARY_KEY_DICT_LIST) or (PRIMARY_KEY_DICT_LIST.count == 0):
self.__init_primary_key_dict_list() # 初始化主键字典列表
self.__init_params(obj) # 初始化参数
self.__init_columns() # 初始化字段列表
self.__logger.end() # 结束日志
pass
################################################# 外部调用方法 #################################################
def selectAll(self):
"""
查询所有
"""
sql = QueryUtil.queryAll(self.__tableName, self.__columnList)
return self.__executeQuery(sql) def selectByPrimaryKey(self, value):
"""
按主键查询
- @Param: value 主键
"""
if (not value) or (value == ""):
raise Exception("selectByPrimaryKey() is missing a required paramter 'value'.")
sql = QueryUtil.queryByPrimaryKey(self.__primaryKeyDict, value, self.__columnList)
return self.__executeQuery(sql) def selectCount(self):
"""
查询总记录数
"""
sql = QueryUtil.queryCount(self.__tableName);
return self.__execute(sql)[0][0] def selectAllByPage(self, page=None):
"""
分页查询
"""
if (not page) or (not isinstance(page,Page)):
raise Exception("Paramter [page] is not correct. Parameter [page] must a Page object instance. ")
sql = QueryUtil.queryAllByPage(self.__tableName, self.__columnList, page)
return self.__executeQuery(sql, logEnable=True) def insert(self, obj):
"""
新增
- @Param: obj 实体对象
"""
if (not obj) or (obj == ""):
raise Exception("insert() is missing a required paramter 'obj'.")
sql = QueryUtil.queryInsert(self.__primaryKeyDict, json.loads(str(obj)))
return self.__executeUpdate(sql) def delete(self, obj=None):
"""
根据实体删除
- @Param: obj 实体对象
"""
if (not obj) or (obj == ""):
raise Exception("delete() is missing a required paramter 'obj'.")
sql = QueryUtil.queryDelete(self.__primaryKeyDict, json.loads(str(obj)))
return self.__executeUpdate(sql) def deleteByPrimaryKey(self, value=None):
"""
根据主键删除
- @Param: value 主键
"""
if (not value) or (value == ""):
raise Exception("deleteByPrimaryKey() is missing a required paramter 'value'.")
sql = QueryUtil.queryDeleteByPrimaryKey(self.__primaryKeyDict, value)
return self.__executeUpdate(sql) def updateByPrimaryKey(self, obj=None):
"""
根据主键更新
- @Param: obj 实体对象
"""
if (not obj) or (obj == ""):
raise Exception("updateByPrimaryKey() is missing a required paramter 'obj'.")
sql = QueryUtil.queryUpdateByPrimaryKey(self.__primaryKeyDict, json.loads(str(obj)))
return self.__executeUpdate(sql) ################################################# 内部调用方法 #################################################
def __execute(self, sql="", logEnable=True):
"""
执行 SQL 语句(用于内部初始化参数使用):
- @Param: sql 执行sql
- @Param: logEnable 是否开启输出日志
- @return 查询结果
"""
if not sql:
raise Exception("Execute method is missing a required parameter --> sql.")
try:
self.__logger.outSQL(sql, enable=logEnable)
conn = self.__pool.connection()
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
resultList = []
for r in result:
resultList.append(r)
return resultList
except Exception as e:
conn.rollback()
raise Exception(e)
finally:
cur.close()
conn.close()
pass def __executeQuery(self, sql="", logEnable=True):
"""
执行查询 SQL 语句:
- @Param: sql 执行sql
- @Param: logEnable 是否开启输出日志
- @return 查询结果
"""
if not sql:
raise Exception("Execute method is missing a required parameter --> sql.")
try:
self.__logger.outSQL(sql, enable=logEnable)
conn = self.__pool.connection()
cur = conn.cursor()
cur.execute(sql)
resultTuple = cur.fetchall()
resultList = list(resultTuple)
objList = [] for result in resultList:
i = 0
obj = self.__obj()
for col in self.__columnList:
prop = '_%s__%s'%(self.__className, col)
setattr(obj, prop, result[i])
i += 1
objList.append(obj)
if not objList:
return None
elif len(objList) == 1:
return objList[0]
else:
return objList
except Exception as e:
conn.rollback()
raise Exception(e)
finally:
cur.close()
conn.close()
pass def __executeUpdate(self, sql=None, logEnable=True):
"""
执行修改 SQL 语句:
- @Param: sql 执行sql
- @Param: logEnable 是否开启输出日志
- @return 影响行数
"""
try:
self.__logger.outSQL(sql, enable=logEnable)
conn = self.__pool.connection()
cur = conn.cursor()
return cur.execute(sql)
pass
except Exception as e:
conn.rollback()
raise Exception(e)
pass
finally:
conn.commit()
cur.close()
conn.close()
pass def __init_params(self, obj):
"""
初始化参数
- @Param:obj class 对象
"""
self.__obj = obj
self.__className = obj.__name__
for i in PRIMARY_KEY_DICT_LIST:
if i.get("className") == self.__className:
self.__primaryKeyDict = i
self.__className = i["className"]
self.__tableName = i["tableName"]
break def __init_primary_key_dict_list(self):
"""
初始化数据库主键集合:
- pk_dict = {"className": {"tableName":tableName,"primaryKey":primaryKey,"auto_increment":auto_increment}}
"""
global PRIMARY_KEY_DICT_LIST
sql = """
SELECT
t.TABLE_NAME,
c.COLUMN_NAME,
c.ORDINAL_POSITION
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS as t,
INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS c
WHERE t.TABLE_NAME = c.TABLE_NAME
AND t.TABLE_SCHEMA = "%s"
AND c.CONSTRAINT_SCHEMA = "%s"
"""%(self.__poolConfigDict.get("database"),self.__poolConfigDict.get("database"))
resultList = self.__execute(sql, logEnable=False)
for result in resultList:
pk_dict = dict()
pk_dict["tableName"] = result[0]
pk_dict["primaryKey"] = result[1]
pk_dict["ordinalPosition"] = result[2]
pk_dict["className"] = self.__convertToClassName(result[0])
PRIMARY_KEY_DICT_LIST.append(pk_dict)
self.__logger.outMsg("initPrimaryKey is done.") def __init_columns(self):
"""
初始化表字段
"""
sql = "SELECT column_name FROM Information_schema.columns WHERE table_Name = '%s' AND TABLE_SCHEMA='%s'"
            %(self.__tableName, self.__poolConfigDict["database"])
resultList = self.__execute(sql, logEnable=False)
for result in resultList:
self.__columnList.append(result[0])
self.__logger.outMsg("init_columns is done.")
# print(self.__columnList)
pass def __convertToClassName(self, tableName):
"""
表名转换方法:
- @Param: tableName 表名
- @return 转换后的类名
"""
result = None
if tableName.startswith("t_md_"):
result = tableName.replace("t_md_", "").replace("_","").lower()
elif tableName.startswith("t_ac_"):
result = tableName.replace("t_ac_","").replace("_","").lower()
elif tableName.startswith("t_"):
result = tableName.replace("t_","").replace("_","").lower()
else:
result = tableName
return result.capitalize()

3.简单应用 UserDao

  创建以个 UserDao,继承BaseDao之后调用父类初始化方法,传递一个 User 对象给父类,我们就可以很方便的对 User 进行CRUD了。

 import random
import math from baseDao import BaseDao
from user import User
from page import Page class UserDao(BaseDao): def __init__(self):
super().__init__(User)
pass userDao = UserDao() ######################################## CRUD # print(userDao.selectAll())
# user = userDao.selectByPrimaryKey(1)
# print(user) # print(userDao.insert(user)) # print(userDao.delete(user))
# print(userDao.deleteByPrimaryKey(4)) # user = userDao.selectByPrimaryKey(1)
# print(userDao.updateByPrimaryKey())
# print(userDao.update()) ######################################## 根据主键更新 # strList = list("QWERTYUI欧帕斯电饭锅和进口量自行车VB你们送人头刚回家个省份和健康的根本就可获得*VB你们从v莫妮卡了VB了")
# for index in range(1000):
# user = User()
# user.set_id(index+1)
# name = ""
# for i in range(random.randint(3,8)):
# name += random.chioce(strList)
# user.set_name(name)
# user.set_status(1)
# i += 1
# userDao.updateByPrimaryKey(user) ######################################## 更新 # user = User()
# user.set_id(2)
# user.set_name("测试更新")
# userDao.updateByPrimaryKey(user) ######################################## 分页查询 # page = Page()
# pageNum = 1
# limit = 10
# page.set_page(pageNum)
# page.set_limit(limit)
# total_count = userDao.selectCount()
# page.set_total_count(total_count)
# if total_count % limit == 0:
# total_page = total_count / limit
# else:
# total_page = math.ceil(total_count / limit)
# page.set_total_page(total_page)
# begin = (pageNum - 1) * limit # for user in userDao.selectAllByPage(page):
# print(user)

4. User

  User 对象属性设置为私有,通过 get/set 方法访问,最后重写 __str__() 方法,用于 BaseDao 返回 User 对象,而不是一个字典对象或者字符串什么的。

 import json

 class User(object):

     def __init__(self):
self.__id = None
self.__name = None
self.__status = None
pass def get_id(self):
return self.__id def set_id(self, id):
self.__id = id def get_name(self):
return self.__name def set_name(self, name):
self.__name = name def get_status(self):
return self.__status def set_status(self, status):
self.__status = status def __str__(self):
userDict = {'id':self.__id,'name':self.__name,'status':self.__status}
return json.dumps(userDict)

5.QueryUtil

  拼接 SQL 语句的工具类。

 from page import Page

 class QueryUtil(object):

     def __init__(self):
pass @staticmethod
def queryColumns(columnList):
i = 1
s = ""
for col in columnList:
if i != 1:
s += ", `%s`"%(col)
else:
s += "`%s`"%(col)
i += 1
return s
@staticmethod
def queryByPrimaryKey(primaryKeyDict, value, columnList):
"""
拼接主键查询
"""
sql = 'SELECT %s FROM `%s` WHERE `%s`="%s"'%(QueryUtil.queryColumns(columnList), primaryKeyDict["tableName"], primaryKeyDict["primaryKey"], str(value))
return sql @staticmethod
def queryAll(tableName, columnList):
"""
拼接查询所有
"""
return 'SELECT %s FROM %s'%(QueryUtil.queryColumns(columnList), tableName) @staticmethod
def queryCount(tableName):
"""
拼接查询记录数
"""
return 'SELECT COUNT(*) FROM %s'%(tableName) @staticmethod
def queryAllByPage(tableName, columnList, page=None):
"""
拼接分页查询
"""
if not page:
page = Page()
return 'SELECT %s FROM %s LIMIT %d,%d'%(QueryUtil.queryColumns(columnList), tableName, page.get_begin(), page.get_limit()) @staticmethod
def queryInsert(primaryKeyDict, objDict):
"""
拼接新增
"""
tableName = primaryKeyDict["tableName"]
key = primaryKeyDict["primaryKey"]
columns = list(objDict.keys())
values = list(objDict.values()) sql = "INSERT INTO `%s`("%(tableName)
for i in range(0, columns.__len__()):
if i == 0:
sql += '`%s`'%(columns[i])
else:
sql += ',`%s`'%(columns[i])
sql += ') VALUES('
for i in range(0, values.__len__()):
if values[i] == None or values[i] == "None":
value = "null"
else:
value = '"%s"'%(values[i])
if i == 0:
sql += value
else:
sql += ',%s'%(value);
sql += ')'
return sql @staticmethod
def queryDelete(primaryKeyDict, objDict):
"""
拼接删除
"""
tableName = primaryKeyDict["tableName"]
key = primaryKeyDict["primaryKey"]
columns = list(objDict.keys())
values = list(objDict.values()) sql = "DELETE FROM `%s` WHERE 1=1 "%(tableName)
for i in range(0, values.__len__()):
if values[i] != None and values[i] != "None":
sql += 'and `%s`="%s"'%(columns[i], values[i])
return sql @staticmethod
def queryDeleteByPrimaryKey(primaryKeyDict, value=None):
"""
拼接根据主键删除
"""
sql = 'DELETE FROM `%s` WHERE `%s`="%s"'%(primaryKeyDict["tableName"], primaryKeyDict["primaryKey"], value)
return sql @staticmethod
def queryUpdateByPrimaryKey(primaryKeyDict, objDict):
"""
拼接根据主键更新
UPDATE t_user SET name='test' WHERE id = 1007
"""
tableName = primaryKeyDict["tableName"]
key = primaryKeyDict["primaryKey"]
columns = list(objDict.keys())
values = list(objDict.values())
keyValue = None
sql = "UPDATE `%s` SET"%(tableName)
for i in range(0, columns.__len__()):
if (values[i] != None) and (values[i] != "None"):
if columns[i] != key:
sql += ' `%s`="%s", '%(columns[i], values[i])
else:
keyValue = values[i]
sql = sql[0:len(sql)-2] + ' WHERE `%s`="%s"'%(key, keyValue)
return sql

6. Page

  分页对象

import json
import math class Page(object): def __init__(self):
self.__page = 1
self.__total_page = 1
self.__total_count = 0
self.__begin = 0
self.__limit = 10
self.__result = []
pass def get_page(self):
return self.__page def set_page(self, page):
if page > 1:
self.__page = page def get_total_page(self):
return self.__total_page def set_total_page(self, total_page):
if total_page > 1:
self.__total_page = total_page def get_total_count(self):
return self.__total_count def set_total_count(self, total_count):
if total_count > 0:
self.__total_count = total_count def get_begin(self):
return self.__begin def set_begin(self, begin):
if begin > 0:
self.__begin = begin def get_limit(self):
return self.__limit def set_limit(self, limit):
if limit > 0:
self.__limit = limit def get_result(self):
return self.__result def set_result(self, result):
self.__result = result def __str__(self):
pageDict = {'page':self.__page,'total_page':self.__total_page,'total_count':self.__total_count,'begin':self.__begin,'limit':self.__limit,'result':self.__result}
return json.dumps(pageDict)

7.Logger

  简单的用于输出信息。

 import time

 class Logger(object):

     def __init__(self, obj):
self.__obj = obj
self.__start = None
pass def start(self):
self.__start = time.time()
pass def end(self):
print("%s >>> [%s] Finished [Time consuming %dms]"%(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), self.__obj.__name__, time.time()-self.__start))
pass def outSQL(self, msg, enable=True):
"""
输出 SQL 日志:
- @Param: msg SQL语句
- @Param: enable 日志开关
"""
if enable:
print("%s >>> [%s] [SQL] %s"%(str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())), self.__obj.__name__, msg))
pass def outMsg(self, msg, enable=True):
"""
输出消息日志:
- @Param: msg 日志信息
- @Param: enable 日志开关
"""
if enable:
print("%s >>> [%s] [Msg] %s"%(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), self.__obj.__name__, msg))
pass

8.Generator

  为了便于创建 user.py文件,此出提供了自动生成方法,只需要在配置文件中简单的配置数据库连接信息以及要生成的表即可生成对象的py类文件。

  目前只实现了类对象文件的创建。

 import sys
import re
import pymysql
import time
import os
import xml.dom.minidom from xml.dom.minidom import parse global _pythonPath
global _daoPath
global _servicePath
global _controllerPath class Generator(object):
"""
# python类生成器
@param configDict 配置文件信息的字典对象
"""
def __init__(self, configFilePath=None):
if not configFilePath:
self.__configDict = ConfigurationParser().parseConfiguration()
else:
if os.path.isabs(configFilePath):
self.__configDict = ConfigurationParser(configFilePath).parseConfiguration()
else:
configFilePath = configFilePath.replace(".", sys.path[0])
pass def run(self):
"""
# 生成器执行方法
"""
fieldDict = DBOperator(self.__configDict).queryFieldDict()
PythonGenarator(self.__configDict, fieldDict).run()
# DaoGenarator(self.__configDict).run()
# ServiceGenarator(self.__configDict).run()
# ControllerGenarator(self.__configDict).run() class PythonGenarator(object):
"""
# pyEntity文件生成类
@param configDict 配置文件信息的字典对象
"""
def __init__(self, configDict, fieldDict):
self.__configDict = configDict
self.__fieldDict = fieldDict
self.__content = ""
pass def run(self):
"""
执行 py 类生成方法
"""
for filePath in self.__configDict["pythonPathList"]:
if not os.path.exists(filePath):
os.makedirs(os.path.dirname(filePath), exist_ok=True)
# 获取表名
fileName = os.path.basename(filePath).split(".py")[0]
# 表名(首字母大写)
ClassName = fileName.capitalize()
# 打开新建文件
file = open(filePath, "w", encoding="utf-8")
self.writeImport(file) # 生成 import 内容
self.writeHeader(file, ClassName) # 生成 class 头部内容
self.writeInit(file, fileName, ClassName) # 生成 class 的 init 方法
tableDictString = self.writeGetSet(file, fileName) # 生成 get/set 方法,并返回一个类属性的字典对象
self.writeStr(file, fileName, tableDictString) # 重写 class 的 str 方法
file.write(self.__content)
file.close()
print("Generator --> %s"%(filePath))
pass def writeImport(self,file ,importList = None):
"""
# 写import部分
"""
self.__content += "import json\r\n"
pass def writeHeader(self, file, className, superClass = None):
"""
# 写类头部(class ClassName(object):)
"""
if not superClass:
self.__content += "class %s(object):\r\n"%(className)
else:
self.__content += "class %s(%s):\r\n"%(className, superClass)
pass def writeInit(self, file, fileName, className):
"""
# 写类初始化方法
"""
self.__content += "\tdef __init__(self):\n\t\t"
for field in self.__fieldDict[fileName]:
self.__content += "self.__%s = None\n\t\t"%(field)
self.__content += "pass\r\n"
pass def writeGetSet(self, file, fileName):
"""
# 写类getXXX(),setXXX()方法
@return tableDictString 表属性字典的字符串对象,用于写__str__()方法
"""
tableDictString = ""
i = 1
for field in self.__fieldDict[fileName]:
if i != len(self.__fieldDict[fileName]):
tableDictString += "'%s':self.__%s,"%(field,field)
else:
tableDictString += "'%s':self.__%s"%(field,field)
Field = field.capitalize()
self.__content += "\tdef get_%(field)s(self):\n\t\treturn self.__%(field)s\n\n\tdef set_%(field)s(self, %(field)s):\n\t\tself.__%(field)s = %(field)s\n\n"%({"field":field})
i += 1
return tableDictString def writeStr(self, file, fileName, tableDictString):
"""
# 重写__str__()方法
"""
tableDictString = "{" + tableDictString + "}"
self.__content += "\n\tdef __str__(self):\n\t\t%sDict = %s\r\t\treturn json.dumps(%sDict)\n"%(fileName, tableDictString, fileName)
pass class DaoGenarator(object):
"""
# pyDao文件生成类
@param configDict 配置文件信息的字典对象
"""
def __init__(self, configDict):
self.__configDict = configDict
pass def run(self):
pass class ServiceGenarator(object):
"""
# pyService文件生成类
@param configDict 配置文件信息的字典对象
"""
def __init__(self, configDict):
self.__configDict = configDict
pass def run(self):
pass class ControllerGenarator(object):
"""
# pyControlelr生成类
@param configDict 配置文件信息的字典对象
"""
def __init__(self, configDict):
self.__configDict = configDict
pass def run(self):
pass class ConfigurationParser(object):
"""
解析xml\n
@return configDict = {"jdbcConnectionDictList":jdbcConnectionDictList,"tableList":tableList}
"""
def __init__(self, configFilePath=None):
if configFilePath:
self.__configFilePath = configFilePath
else:
self.__configFilePath = sys.path[0] + "/config/generatorConfig.xml"
self.__generatorBasePath = sys.path[0] + "/src/"
pass def parseConfiguration(self):
"""
解析xml,返回jdbc配置信息以及需要生成python对象的表集合
"""
# 解析xml文件,获取Document对象
DOMTree = xml.dom.minidom.parse(self.__configFilePath) # <class 'xml.dom.minidom.Document'>
# 获取 generatorConfiguration 节点的NodeList对象
configDOM = DOMTree.getElementsByTagName("generatorConfiguration")[0] #<class 'xml.dom.minicompat.NodeList'> # jdbcConnection 节点的 property 节点集合
jdbcConnectionPropertyList = configDOM.getElementsByTagName("jdbcConnection")[0].getElementsByTagName("property") # pythonGenerator节点对象
pythonDOM = configDOM.getElementsByTagName("pythonGenerator")[0]
_pythonPath = self.__getGeneratorPath(pythonDOM.getAttributeNode("targetPath").nodeValue) # serviceGenerator 节点对象
serviceDOM = configDOM.getElementsByTagName("serviceGenerator")[0]
_servicePath = self.__getGeneratorPath(serviceDOM.getAttributeNode("targetPath").nodeValue) # pythonGenerator节点对象
daoDOM = configDOM.getElementsByTagName("daoGenerator")[0]
_daoPath = self.__getGeneratorPath(daoDOM.getAttributeNode("targetPath").nodeValue) # controllerGenerator 节点对象
controllerDOM = configDOM.getElementsByTagName("controllerGenerator")[0]
_controllerPath = self.__getGeneratorPath(controllerDOM.getAttributeNode("targetPath").nodeValue) # 循环 jdbcConnection 节点的 property 节点集合,获取属性名称和属性值
jdbcConnectionDict = {"host":None,"user":None,"password":None,"port":3306,"database":None,"charset":"utf8"}
for property in jdbcConnectionPropertyList:
name = property.getAttributeNode("name").nodeValue.strip().lower()
if property.hasAttribute("value"):
value = property.getAttributeNode("value").nodeValue
else:
value = property.childNodes[0].data
if name == "charset":
if re.match("utf-8|UTF8", value, re.I):
continue
elif name == "port":
value = int(value)
jdbcConnectionDict[name] = value
# print(jdbcConnectionDict) pythonPathList = []
daoPathList = []
servicePathList = []
controllerPathList = [] # 获取 table 节点的集合
tableList = []
tableDOMList = configDOM.getElementsByTagName("table")
for tableDOM in tableDOMList:
table = {}
name = tableDOM.getAttributeNode("name").nodeValue.strip().lower()
alias = tableDOM.getAttributeNode("alias").nodeValue.strip().lower()
if (not alias) or alias == '' :
prefix = name
else:
prefix = alias
table["tableName"] = name
table["alias"] = alias
tableList.append(table) pythonPath = "%s/%s.py" %(_pythonPath, prefix)
pythonPathList.append(pythonPath)
daoPath = "%s/%sDao.py" %(_daoPath, prefix)
daoPathList.append(daoPath)
servicePath = "%s/%sService.py" %(_servicePath, prefix)
servicePathList.append(servicePath)
controllerPath = "%s/%sController.py" %(_controllerPath, prefix)
controllerPathList.append(controllerPath) configDict = {
"jdbcConnectionDict":jdbcConnectionDict,
"tableList":tableList,
"pythonPathList":pythonPathList,
"daoPathList":daoPathList,
"servicePathList":servicePathList,
"controllerPathList":controllerPathList
}
# print(configDict)
return configDict def __getGeneratorPath(self, targetPath):
return self.__generatorBasePath + targetPath.replace(".","/") class DBOperator(object): def __init__(self, configDict=None):
if configDict == None:
raise Exception("Error in DBOperator >>> jdbcConnectionDict is None")
self.__configDict = configDict
pass def queryFieldDict(self):
"""
* 获取数据库表中的所有字段名
* @ return tableDict
"""
fieldDict = {}
jdbcConnectionDict = self.__configDict["jdbcConnectionDict"]
conn = pymysql.Connect(**jdbcConnectionDict)
# 循环数据表
for table in self.__configDict["tableList"]:
tableName = table["tableName"]
alias = table["alias"]
fieldList = []
# 获取游标
cursor = conn.cursor()
# 查询表的字段名称和类型
sql = """SELECT COLUMN_NAME as name, DATA_TYPE as type
FROM information_schema.columns
WHERE table_schema = '%s' AND table_name = '%s'
"""%(self.__configDict["jdbcConnectionDict"]["database"], tableName)
# 执行sql
cursor.execute(sql)
# 返回所有查询结果
results = cursor.fetchall()
# 关闭游标
cursor.close()
# 将表所有字段添加到 fieldList 中
for result in results:
field = result[0].lower()
fieldList.append(field)
fieldDict[alias] = fieldList
# 关闭数据库连接
conn.close()
return fieldDict if __name__ == "__main__":
Generator().run()

  generatorConfig.xml

 <?xml version="1.0" encoding="utf-8"?>
<generatorConfiguration>
<jdbcConnection>
<property name="host">127.0.0.1</property>
<property name="database">rcddup</property>
<property name="port">3306</property>
<property name="user">root</property>
<property name="password">root</property>
<property name="charset">UTF-8</property>
</jdbcConnection>
<!-- targetPath 文件生成路径 -->
<pythonGenerator targetPath="cn.rcddup.entity"></pythonGenerator>
<daoGenerator targetPath="cn.rcddup.dao"></daoGenerator>
<serviceGenerator targetPath="cn.rcddup.service"></serviceGenerator>
<controllerGenerator targetPath="cn.rcddup.controller"> </controllerGenerator> <!-- name:数据库表明,alias:生成的 class 类名 -->
<table name="t_user" alias="User" ></table>
</generatorConfiguration>

  到这最近一段时间的 python 学习成果就完了,用兴趣的可以加群:626787819。如果你是小白你可以来这询问,如果你是大牛希望不要嫌弃我们小白,一起交流学习。

  本程序代码在 github 上可以下载,下载地址:https://github.com/ruancheng77/baseDao
  

  创建于:2017-05-20

TABLE_SCHEMA='%s'
上一篇:【bzoj2073】[POI2004]PRZ 状态压缩dp


下一篇:mysql 主从单库单表同步 binlog-do-db replicate-do-db