Files
my_py_tools/app.py
2025-10-18 21:32:31 +08:00

697 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, request, jsonify
import sqlite3
import time
import json
import uf30_buildMap_class
import uf20_buildMap_class
import uf20_asset_buildMap_class
import re
from flask_cors import CORS
import importResources
import public
from t2sdk.t2sdk_demo_syn import execMdbSql
import t2sdk.t2sdk_demo_syn as t2Syn
from importlib import reload # Python 3 推荐方式
import sys
reload(sys)
# 添加.pyd文件所在的目录路径可以是绝对路径或相对路径
sys.path.append(r"F:\sescode_review\toolset\hstools\py-hstools\t2sdk")
# 20250827 add 配置文件获取
config_file = "config.json" # JSON文件路径
# 读取配置
config = public.load_config(config_file)
if config == None:
print("config.json配置文件读取失败")
app = Flask(__name__)
# 1. 提前初始化 CORS 扩展,放在其他模块导入之前
cors = CORS(app, resources={
r"/*": {
"origins": "*", # 允许所有域名
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], # 明确的 HTTP 方法
"allow_headers": ["Content-Type", "Authorization"] # 允许的请求头
}
})
# 2. 添加自动的 OPTIONS 请求处理
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS')
return response
# 3. 解决跨域问题的 OPTIONS 路由
@app.route('/', defaults={'path': ''}, methods=['OPTIONS'])
@app.route('/<path:path>', methods=['OPTIONS'])
def options_handler(path):
response = jsonify()
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS')
return response
# @todo:后续支持从配置文件中设置工程目录等
uft30build = uf30_buildMap_class.BuildMap(projectPath=config['uft30_path'])
uft30build.init_func_call_map()
uf20Build = uf20_buildMap_class.UF20BuildMap(projectPath=config['uf20_path'])
uf20Build.init_func_call_map()
uf20AssetBuild = uf20_asset_buildMap_class.UF20AssetBuildMap(projectPath=config['uf20_asset_path'])
uf20AssetBuild.init_func_call_map()
importresource = importResources.ImportSources()
# app = Flask(__name__)
# CORS(app) # 默认允许所有域名跨域访问
app.config['JSON_AS_ASCII'] = False # 禁用ASCII编码确保中文正常显示
DATABASE = 'hstool.db'
print('初始化完成')
def get_db_connection():
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
conn.text_factory = str # 确保文本数据以字符串形式返回
return conn
# 检查SQL语句是否合法只允许特定的更新操作
def is_valid_update_sql(sql):
# 去除注释和多余空格
clean_sql = re.sub(r'--.*?\n', '', sql) # 移除单行注释
clean_sql = re.sub(r'/\*.*?\*/', '', clean_sql, flags=re.DOTALL) # 移除多行注释
clean_sql = ' '.join(clean_sql.split()).lower() # 标准化空格
# 检查是否是允许的更新操作
# if not re.match(r'^\s*(insert\s+into|update\s+\w+\s+set|delete\s+from)\s', clean_sql):
# return False
# # 禁止危险操作
# if any(keyword in clean_sql for keyword in ['drop', 'truncate', 'create', 'replace']):
# return False
return True
@app.route('/query', methods=['POST'])
def query_database():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data or 'sql' not in request_data:
return jsonify({
"code": "400",
"message": "请求参数错误",
"data": None
}), 400
sql = request_data['sql']
# 简单的SQL注入检查
if any(keyword in sql.lower() for keyword in ['insert', 'update', 'delete', 'drop', 'alter']):
return jsonify({
"code": "403",
"message": "禁止的操作类型",
"data": None
}), 403
start_time = time.time()
conn = get_db_connection()
cursor = conn.cursor()
# 执行查询
cursor.execute(sql)
results = cursor.fetchall()
# 格式化结果
table_records = [dict(row) for row in results]
elapsed_time = int((time.time() - start_time) * 1000) # 毫秒
# 使用json.dumps确保中文正常序列化
response_data = {
"code": "200",
"message": "成功",
"data": {
"tableRecords": table_records,
"time": elapsed_time,
"totalCount": len(table_records)
}
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except sqlite3.Error as e:
return jsonify({
"code": "500",
"message": f"数据库错误: {str(e)}",
"data": None
}), 500
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
if 'conn' in locals():
conn.close()
@app.route('/update', methods=['POST'])
def update_database():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data or 'sql' not in request_data:
return jsonify({
"code": "400",
"message": "请求参数错误",
"data": None
}), 400
sql = request_data['sql']
# 验证SQL语句是否合法
if not is_valid_update_sql(sql):
return jsonify({
"code": "403",
"message": "非法的更新操作",
"data": None
}), 403
start_time = time.time()
conn = get_db_connection()
cursor = conn.cursor()
# 执行更新操作
cursor.execute(sql)
affected_rows = cursor.rowcount
results = cursor.fetchall()
# 格式化结果
table_records = [dict(row) for row in results]
conn.commit() # 提交事务
elapsed_time = int((time.time() - start_time) * 1000) # 毫秒
response_data = {
"code": "200",
"message": "更新成功",
"data": {
"affectedRows": affected_rows,
"time": elapsed_time,
"tableRecords": table_records,
"totalCount": len(table_records)
}
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except sqlite3.Error as e:
# 回滚事务
if 'conn' in locals():
conn.rollback()
return jsonify({
"code": "500",
"message": f"数据库错误: {str(e)}",
"data": None
}), 500
except Exception as e:
# 回滚事务
if 'conn' in locals():
conn.rollback()
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
if 'conn' in locals():
conn.close()
@app.route('/uft30mapclass', methods=['POST'])
def uft30MapClass():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data or 'func_list' not in request_data:
return jsonify({
"code": "400",
"message": "func_list notfund",
"data": None
}), 400
print('data:')
print(request_data)
func_list = request_data['func_list']
dir_path = request_data['dir_path']
search = request_data['search']
search_direction = request_data['search_direction']
search_dir = 0
if search_direction == 'pre':
search_dir = 1
print('type search_dir:', type(search_dir))
# func_lists = []
# func_lists.append(str(func_list).strip())
func_lists = public.safe_comma_string_to_list(func_list)
print(f'func_lists = {func_lists}')
dir_path = 'F:\\sesCode'
# buildMap.traverse_all(dir_path+'\\', buildMap.subProject, func_lists, search, "函数调用图.txt",
# model_flag=0, project="ucbp", search_direction=search_dir)
uft30build.gene_func_call_map(func_lists, search, search_direction=search_dir, project='ucbp', gene_file='')
with open("UF30函数调用图.txt", 'r', encoding='utf-8') as file:
content = file.read()
# print(content)
response_data = {
"code": "200",
"message": "成功",
"result": content
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
pass
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('end')
@app.route('/uft20mapclass', methods=['POST'])
def uf20MapClass():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data or 'func_list' not in request_data:
return jsonify({
"code": "400",
"message": "func_list notfund",
"data": None
}), 400
print('data:')
print(request_data)
func_list = request_data['func_list']
dir_path = request_data['dir_path']
search = request_data['search']
search_direction = request_data['search_direction']
search_dir = 0
if search_direction == 'pre':
search_dir = 1
print('type search_dir:', type(search_dir))
# func_lists = []
# func_lists.append(str(func_list).strip())
func_lists = public.safe_comma_string_to_list(func_list)
print(f'func_lists = {func_lists}')
print(func_lists)
dir_path = 'F:\\sesCode'
# buildMap.traverse_all(dir_path+'\\', buildMap.subProject, func_lists, search, "函数调用图.txt",
# model_flag=0, project="ucbp", search_direction=search_dir)
uf20Build.gene_func_call_map(func_lists, search, search_direction=search_dir)
with open("UF20函数调用图.txt", 'r', encoding='utf-8') as file:
content = file.read()
# print(content)
response_data = {
"code": "200",
"message": "成功",
"result": content
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
pass
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('end')
@app.route('/uft20assetmapclass', methods=['POST'])
def uf20assetMapClass():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data or 'func_list' not in request_data:
return jsonify({
"code": "400",
"message": "func_list notfund",
"data": None
}), 400
print('data:')
print(request_data)
func_list = request_data['func_list']
dir_path = request_data['dir_path']
search = request_data['search']
search_direction = request_data['search_direction']
search_dir = 0
if search_direction == 'pre':
search_dir = 1
print('type search_dir:', type(search_dir))
# func_lists = []
# func_lists.append(str(func_list).strip())
func_lists = public.safe_comma_string_to_list(func_list)
print(f'func_lists = {func_lists}')
print(func_lists)
dir_path = 'F:\\sesCode'
# buildMap.traverse_all(dir_path+'\\', buildMap.subProject, func_lists, search, "函数调用图.txt",
# model_flag=0, project="ucbp", search_direction=search_dir)
uf20AssetBuild.gene_func_call_map(func_lists, search, search_direction=search_dir)
with open("UF20账户函数调用图.txt", 'r', encoding='utf-8') as file:
content = file.read()
# print(content)
response_data = {
"code": "200",
"message": "成功",
"result": content
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
pass
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('end')
@app.route('/importResources', methods=['POST'])
def importResources():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data:
return jsonify({
"code": "400",
"message": "func_list notfund",
"data": None
}), 400
# 获取 import_tasks 列表
import_tasks = request_data.get("import_tasks", [])
if not import_tasks:
return jsonify({"error": "import_tasks 不存在或为空"}), 400
print('data:')
print(request_data)
importresource.reset_inof()
for import_task in import_tasks:
if import_task == 'uf20search':
uf20Build.init_func_call_map(True)
elif import_task == 'uft30search':
uft30build.init_func_call_map(True)
elif import_task == 'dts_task_info':
dts_path = config['dts_path']
importresource.import_dts_data(dts_path)
else:
# 更新状态
importresource.update_table_enabled('import_config.json', import_task, True)
# 更新数据库
importresource.import_xml_data(config['uf20_path'], config['uft30_path'])
importresource.update_table_enabled('import_config.json', import_task, False)
importresource.append_info(import_task + '更新成功')
response_data = {
"code": "200",
"message": "成功",
"result": importresource.get_info()
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('end')
@app.route('/updateSysConfig', methods=['POST'])
def updateSysConfig():
try:
# 获取请求数据
request_data = request.get_json()
if not request_data:
return jsonify({
"code": "400",
"message": "func_list notfund",
"data": None
}), 400
print(request_data)
# 获取 路径配置,并更新
uft30_path = request_data['uft30_path']
uf20_path = request_data['uf20_path']
try:
uf20_asset_path = request_data['uf20_asset_path']
except Exception as e:
uf20_asset_path = 'F:\\客户账户管理系统V22\\'
dts_path = request_data['dts_path']
cppHost = request_data['cppHost']
cppPort = request_data['cppPort']
t2Host = request_data['t2Host']
t2Port = request_data['t2Port']
pythonHost = request_data['pythonHost']
pythonPort = request_data['pythonPort']
config['uft30_path'] = uft30_path
config['uf20_path'] = uf20_path
config['uf20_asset_path'] = uf20_asset_path
config['dts_path'] = dts_path
config['cppHost'] = cppHost
config['cppPort'] = cppPort
config['t2Host'] = t2Host
config['t2Port'] = t2Port
config['pythonHost'] = pythonHost
config['pythonPort'] = pythonPort
print(f'update config:\nuft30_path:{uft30_path}\nuf20_path:{uf20_path}\ndts_path:{dts_path}')
print(f'\nuf20_asset_path:{uf20_asset_path}')
print(f'\ncppHost:{cppHost}:{cppPort}\nt2Host:{t2Host}:{t2Port}\npythonHost{pythonHost}:{pythonPort}')
public.save_config(config_file, config)
uft30build.set_project_path(uft30_path)
uf20Build.set_project_path(uf20_path)
uf20AssetBuild.set_project_path(uf20_asset_path)
response_data = {
"code": "200",
"message": "成功",
"result": '更新成功'
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('end')
@app.route('/getSysConfig', methods=['POST'])
def getSysConfig():
try:
# 获取请求数据
request_data = request.get_json()
# 获取 路径配置,并更新
uft30_path = config['uft30_path']
uf20_path = config['uf20_path']
uf20_asset_path = config['uf20_asset_path']
dts_path = config['dts_path']
cppHost = config['cppHost']
cppPort = config['cppPort']
t2Host = config['t2Host']
t2Port = config['t2Port']
pythonHost = config['pythonHost']
pythonPort = config['pythonPort']
response_data = {
"uf20Path": uf20_path,
"uf20AssetPath": uf20_asset_path,
"uft30Path": uft30_path,
"dtsPath": dts_path,
"cppHost": cppHost,
"cppPort": cppPort,
"t2Host": t2Host,
"t2Port": t2Port,
"pythonHost": pythonHost,
"pythonPort": pythonPort
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('end')
@app.route('/ExecMdbSql', methods=['POST'])
def execMdbSql():
try:
# 获取请求数据
print('start')
request_data = request.get_json()
# if not request_data or 'mdbSql' not in request_data:
# return jsonify({
# "code": "400",
# "message": "请求参数错误",
# "data": None
# }), 400
Request = request_data['Request']
sql = Request['mdbSql']
t2Host = Request['t2Host']
t2Port = Request['t2Port']
mdbNodeName = Request['mdbNodeName']
print(sql,t2Host, t2Port, mdbNodeName)
# 简单的SQL注入检查
# if any(keyword in sql.lower() for keyword in ['insert', 'update', 'delete', 'drop', 'alter']):
# return jsonify({
# "code": "403",
# "message": "禁止的操作类型",
# "data": None
# }), 403
start_time = time.time()
results = t2Syn.execMdbSql(t2Host, t2Port, mdbNodeName, sql)
# print('result', results)
# 格式化结果
table_records = [dict(row) for row in results]
elapsed_time = int((time.time() - start_time) * 1000) # 毫秒
# 使用json.dumps确保中文正常序列化
response_data = {
"code": "200",
"message": "成功",
"tableRecords": table_records,
"ErrorNo": 0
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('close')
@app.route('/InitMdbSql', methods=['POST'])
def initMdbSql():
try:
# 获取请求数据
print('start')
request_data = request.get_json()
# if not request_data or 'mdbSql' not in request_data:
# return jsonify({
# "code": "400",
# "message": "请求参数错误",
# "data": None
# }), 400
Request = request_data['Request']
sql = Request['mdbSql']
t2Host = Request['t2Host']
t2Port = Request['t2Port']
mdbNodeName = Request['mdbNodeName']
print(sql,t2Host, t2Port, mdbNodeName)
# 简单的SQL注入检查
# if any(keyword in sql.lower() for keyword in ['insert', 'update', 'delete', 'drop', 'alter']):
# return jsonify({
# "code": "403",
# "message": "禁止的操作类型",
# "data": None
# }), 403
start_time = time.time()
result1 = t2Syn.execMdbSql(t2Host, t2Port, 'usesbid92', sql)
result2 = t2Syn.execMdbSql(t2Host, t2Port, 'usescbp92', sql)
result3 = t2Syn.execMdbSql(t2Host, t2Port, 'uconvert99', sql)
# 格式化结果
table_records = [dict(row) for row in result1]
table_records += [dict(row) for row in result2]
table_records += [dict(row) for row in result3]
print('table_records', table_records)
elapsed_time = int((time.time() - start_time) * 1000) # 毫秒
# 使用json.dumps确保中文正常序列化
response_data = {
"code": "200",
"message": "成功",
"tableRecords": table_records,
"ErrorNo": 0,
"elapsed_time": elapsed_time
}
return app.response_class(
response=json.dumps(response_data, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
return jsonify({
"code": "500",
"message": f"服务器错误: {str(e)}",
"data": None
}), 500
finally:
print('close')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000,debug=True)