from flask import Flask, request, jsonify import sqlite3 import time import json import uf30_buildMap_class import uf20_buildMap_class import uf20_asset_buildMap_class import re from flask_cors import CORS import importResources import public from t2sdk.t2sdk_demo_syn import execMdbSql import t2sdk.t2sdk_demo_syn as t2Syn from importlib import reload # Python 3 推荐方式 import sys reload(sys) # 添加.pyd文件所在的目录路径(可以是绝对路径或相对路径) sys.path.append(r"F:\sescode_review\toolset\hstools\py-hstools\t2sdk") # 20250827 add 配置文件获取 config_file = "config.json" # JSON文件路径 # 读取配置 config = public.load_config(config_file) if config == None: print("config.json配置文件读取失败!") app = Flask(__name__) # 1. 提前初始化 CORS 扩展,放在其他模块导入之前 cors = CORS(app, resources={ r"/*": { "origins": "*", # 允许所有域名 "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], # 明确的 HTTP 方法 "allow_headers": ["Content-Type", "Authorization"] # 允许的请求头 } }) # 2. 添加自动的 OPTIONS 请求处理 @app.after_request def after_request(response): response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS') return response # 3. 解决跨域问题的 OPTIONS 路由 @app.route('/', defaults={'path': ''}, methods=['OPTIONS']) @app.route('/', methods=['OPTIONS']) def options_handler(path): response = jsonify() response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS') return response # @todo:后续支持从配置文件中设置工程目录等 uft30build = uf30_buildMap_class.BuildMap(projectPath=config['uft30_path']) uft30build.init_func_call_map() uf20Build = uf20_buildMap_class.UF20BuildMap(projectPath=config['uf20_path']) uf20Build.init_func_call_map() uf20AssetBuild = uf20_asset_buildMap_class.UF20AssetBuildMap(projectPath=config['uf20_asset_path']) uf20AssetBuild.init_func_call_map() importresource = importResources.ImportSources() # app = Flask(__name__) # CORS(app) # 默认允许所有域名跨域访问 app.config['JSON_AS_ASCII'] = False # 禁用ASCII编码,确保中文正常显示 DATABASE = 'hstool.db' print('初始化完成') def get_db_connection(): conn = sqlite3.connect(DATABASE) conn.row_factory = sqlite3.Row conn.text_factory = str # 确保文本数据以字符串形式返回 return conn # 检查SQL语句是否合法(只允许特定的更新操作) def is_valid_update_sql(sql): # 去除注释和多余空格 clean_sql = re.sub(r'--.*?\n', '', sql) # 移除单行注释 clean_sql = re.sub(r'/\*.*?\*/', '', clean_sql, flags=re.DOTALL) # 移除多行注释 clean_sql = ' '.join(clean_sql.split()).lower() # 标准化空格 # 检查是否是允许的更新操作 # if not re.match(r'^\s*(insert\s+into|update\s+\w+\s+set|delete\s+from)\s', clean_sql): # return False # # 禁止危险操作 # if any(keyword in clean_sql for keyword in ['drop', 'truncate', 'create', 'replace']): # return False return True @app.route('/query', methods=['POST']) def query_database(): try: # 获取请求数据 request_data = request.get_json() if not request_data or 'sql' not in request_data: return jsonify({ "code": "400", "message": "请求参数错误", "data": None }), 400 sql = request_data['sql'] # 简单的SQL注入检查 if any(keyword in sql.lower() for keyword in ['insert', 'update', 'delete', 'drop', 'alter']): return jsonify({ "code": "403", "message": "禁止的操作类型", "data": None }), 403 start_time = time.time() conn = get_db_connection() cursor = conn.cursor() # 执行查询 cursor.execute(sql) results = cursor.fetchall() # 格式化结果 table_records = [dict(row) for row in results] elapsed_time = int((time.time() - start_time) * 1000) # 毫秒 # 使用json.dumps确保中文正常序列化 response_data = { "code": "200", "message": "成功", "data": { "tableRecords": table_records, "time": elapsed_time, "totalCount": len(table_records) } } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except sqlite3.Error as e: return jsonify({ "code": "500", "message": f"数据库错误: {str(e)}", "data": None }), 500 except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: if 'conn' in locals(): conn.close() @app.route('/update', methods=['POST']) def update_database(): try: # 获取请求数据 request_data = request.get_json() if not request_data or 'sql' not in request_data: return jsonify({ "code": "400", "message": "请求参数错误", "data": None }), 400 sql = request_data['sql'] # 验证SQL语句是否合法 if not is_valid_update_sql(sql): return jsonify({ "code": "403", "message": "非法的更新操作", "data": None }), 403 start_time = time.time() conn = get_db_connection() cursor = conn.cursor() # 执行更新操作 cursor.execute(sql) affected_rows = cursor.rowcount results = cursor.fetchall() # 格式化结果 table_records = [dict(row) for row in results] conn.commit() # 提交事务 elapsed_time = int((time.time() - start_time) * 1000) # 毫秒 response_data = { "code": "200", "message": "更新成功", "data": { "affectedRows": affected_rows, "time": elapsed_time, "tableRecords": table_records, "totalCount": len(table_records) } } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except sqlite3.Error as e: # 回滚事务 if 'conn' in locals(): conn.rollback() return jsonify({ "code": "500", "message": f"数据库错误: {str(e)}", "data": None }), 500 except Exception as e: # 回滚事务 if 'conn' in locals(): conn.rollback() return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: if 'conn' in locals(): conn.close() @app.route('/uft30mapclass', methods=['POST']) def uft30MapClass(): try: # 获取请求数据 request_data = request.get_json() if not request_data or 'func_list' not in request_data: return jsonify({ "code": "400", "message": "func_list notfund", "data": None }), 400 print('data:') print(request_data) func_list = request_data['func_list'] dir_path = request_data['dir_path'] search = request_data['search'] search_direction = request_data['search_direction'] search_dir = 0 if search_direction == 'pre': search_dir = 1 print('type search_dir:', type(search_dir)) # func_lists = [] # func_lists.append(str(func_list).strip()) func_lists = public.safe_comma_string_to_list(func_list) print(f'func_lists = {func_lists}') dir_path = 'F:\\sesCode' # buildMap.traverse_all(dir_path+'\\', buildMap.subProject, func_lists, search, "函数调用图.txt", # model_flag=0, project="ucbp", search_direction=search_dir) uft30build.gene_func_call_map(func_lists, search, search_direction=search_dir, project='ucbp', gene_file='') with open("UF30函数调用图.txt", 'r', encoding='utf-8') as file: content = file.read() # print(content) response_data = { "code": "200", "message": "成功", "result": content } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) pass except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('end') @app.route('/uft20mapclass', methods=['POST']) def uf20MapClass(): try: # 获取请求数据 request_data = request.get_json() if not request_data or 'func_list' not in request_data: return jsonify({ "code": "400", "message": "func_list notfund", "data": None }), 400 print('data:') print(request_data) func_list = request_data['func_list'] dir_path = request_data['dir_path'] search = request_data['search'] search_direction = request_data['search_direction'] search_dir = 0 if search_direction == 'pre': search_dir = 1 print('type search_dir:', type(search_dir)) # func_lists = [] # func_lists.append(str(func_list).strip()) func_lists = public.safe_comma_string_to_list(func_list) print(f'func_lists = {func_lists}') print(func_lists) dir_path = 'F:\\sesCode' # buildMap.traverse_all(dir_path+'\\', buildMap.subProject, func_lists, search, "函数调用图.txt", # model_flag=0, project="ucbp", search_direction=search_dir) uf20Build.gene_func_call_map(func_lists, search, search_direction=search_dir) with open("UF20函数调用图.txt", 'r', encoding='utf-8') as file: content = file.read() # print(content) response_data = { "code": "200", "message": "成功", "result": content } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) pass except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('end') @app.route('/uft20assetmapclass', methods=['POST']) def uf20assetMapClass(): try: # 获取请求数据 request_data = request.get_json() if not request_data or 'func_list' not in request_data: return jsonify({ "code": "400", "message": "func_list notfund", "data": None }), 400 print('data:') print(request_data) func_list = request_data['func_list'] dir_path = request_data['dir_path'] search = request_data['search'] search_direction = request_data['search_direction'] search_dir = 0 if search_direction == 'pre': search_dir = 1 print('type search_dir:', type(search_dir)) # func_lists = [] # func_lists.append(str(func_list).strip()) func_lists = public.safe_comma_string_to_list(func_list) print(f'func_lists = {func_lists}') print(func_lists) dir_path = 'F:\\sesCode' # buildMap.traverse_all(dir_path+'\\', buildMap.subProject, func_lists, search, "函数调用图.txt", # model_flag=0, project="ucbp", search_direction=search_dir) uf20AssetBuild.gene_func_call_map(func_lists, search, search_direction=search_dir) with open("UF20账户函数调用图.txt", 'r', encoding='utf-8') as file: content = file.read() # print(content) response_data = { "code": "200", "message": "成功", "result": content } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) pass except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('end') @app.route('/importResources', methods=['POST']) def importResources(): try: # 获取请求数据 request_data = request.get_json() if not request_data: return jsonify({ "code": "400", "message": "func_list notfund", "data": None }), 400 # 获取 import_tasks 列表 import_tasks = request_data.get("import_tasks", []) if not import_tasks: return jsonify({"error": "import_tasks 不存在或为空"}), 400 print('data:') print(request_data) importresource.reset_inof() for import_task in import_tasks: if import_task == 'uf20search': uf20Build.init_func_call_map(True) elif import_task == 'uft30search': uft30build.init_func_call_map(True) elif import_task == 'dts_task_info': dts_path = config['dts_path'] importresource.import_dts_data(dts_path) else: # 更新状态 importresource.update_table_enabled('import_config.json', import_task, True) # 更新数据库 importresource.import_xml_data(config['uf20_path'], config['uft30_path']) importresource.update_table_enabled('import_config.json', import_task, False) importresource.append_info(import_task + '更新成功') response_data = { "code": "200", "message": "成功", "result": importresource.get_info() } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('end') @app.route('/updateSysConfig', methods=['POST']) def updateSysConfig(): try: # 获取请求数据 request_data = request.get_json() if not request_data: return jsonify({ "code": "400", "message": "func_list notfund", "data": None }), 400 print(request_data) # 获取 路径配置,并更新 uft30_path = request_data['uft30_path'] uf20_path = request_data['uf20_path'] try: uf20_asset_path = request_data['uf20_asset_path'] except Exception as e: uf20_asset_path = 'F:\\客户账户管理系统V22\\' dts_path = request_data['dts_path'] cppHost = request_data['cppHost'] cppPort = request_data['cppPort'] t2Host = request_data['t2Host'] t2Port = request_data['t2Port'] pythonHost = request_data['pythonHost'] pythonPort = request_data['pythonPort'] config['uft30_path'] = uft30_path config['uf20_path'] = uf20_path config['uf20_asset_path'] = uf20_asset_path config['dts_path'] = dts_path config['cppHost'] = cppHost config['cppPort'] = cppPort config['t2Host'] = t2Host config['t2Port'] = t2Port config['pythonHost'] = pythonHost config['pythonPort'] = pythonPort print(f'update config:\nuft30_path:{uft30_path}\nuf20_path:{uf20_path}\ndts_path:{dts_path}') print(f'\nuf20_asset_path:{uf20_asset_path}') print(f'\ncppHost:{cppHost}:{cppPort}\nt2Host:{t2Host}:{t2Port}\npythonHost{pythonHost}:{pythonPort}') public.save_config(config_file, config) uft30build.set_project_path(uft30_path) uf20Build.set_project_path(uf20_path) uf20AssetBuild.set_project_path(uf20_asset_path) response_data = { "code": "200", "message": "成功", "result": '更新成功' } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('end') @app.route('/getSysConfig', methods=['POST']) def getSysConfig(): try: # 获取请求数据 request_data = request.get_json() # 获取 路径配置,并更新 uft30_path = config['uft30_path'] uf20_path = config['uf20_path'] uf20_asset_path = config['uf20_asset_path'] dts_path = config['dts_path'] cppHost = config['cppHost'] cppPort = config['cppPort'] t2Host = config['t2Host'] t2Port = config['t2Port'] pythonHost = config['pythonHost'] pythonPort = config['pythonPort'] response_data = { "uf20Path": uf20_path, "uf20AssetPath": uf20_asset_path, "uft30Path": uft30_path, "dtsPath": dts_path, "cppHost": cppHost, "cppPort": cppPort, "t2Host": t2Host, "t2Port": t2Port, "pythonHost": pythonHost, "pythonPort": pythonPort } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('end') @app.route('/ExecMdbSql', methods=['POST']) def execMdbSql(): try: # 获取请求数据 print('start') request_data = request.get_json() # if not request_data or 'mdbSql' not in request_data: # return jsonify({ # "code": "400", # "message": "请求参数错误", # "data": None # }), 400 Request = request_data['Request'] sql = Request['mdbSql'] t2Host = Request['t2Host'] t2Port = Request['t2Port'] mdbNodeName = Request['mdbNodeName'] print(sql,t2Host, t2Port, mdbNodeName) # 简单的SQL注入检查 # if any(keyword in sql.lower() for keyword in ['insert', 'update', 'delete', 'drop', 'alter']): # return jsonify({ # "code": "403", # "message": "禁止的操作类型", # "data": None # }), 403 start_time = time.time() results = t2Syn.execMdbSql(t2Host, t2Port, mdbNodeName, sql) # print('result', results) # 格式化结果 table_records = [dict(row) for row in results] elapsed_time = int((time.time() - start_time) * 1000) # 毫秒 # 使用json.dumps确保中文正常序列化 response_data = { "code": "200", "message": "成功", "tableRecords": table_records, "ErrorNo": 0 } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('close') @app.route('/InitMdbSql', methods=['POST']) def initMdbSql(): try: # 获取请求数据 print('start') request_data = request.get_json() # if not request_data or 'mdbSql' not in request_data: # return jsonify({ # "code": "400", # "message": "请求参数错误", # "data": None # }), 400 Request = request_data['Request'] sql = Request['mdbSql'] t2Host = Request['t2Host'] t2Port = Request['t2Port'] mdbNodeName = Request['mdbNodeName'] print(sql,t2Host, t2Port, mdbNodeName) # 简单的SQL注入检查 # if any(keyword in sql.lower() for keyword in ['insert', 'update', 'delete', 'drop', 'alter']): # return jsonify({ # "code": "403", # "message": "禁止的操作类型", # "data": None # }), 403 start_time = time.time() result1 = t2Syn.execMdbSql(t2Host, t2Port, 'usesbid92', sql) result2 = t2Syn.execMdbSql(t2Host, t2Port, 'usescbp92', sql) result3 = t2Syn.execMdbSql(t2Host, t2Port, 'uconvert99', sql) # 格式化结果 table_records = [dict(row) for row in result1] table_records += [dict(row) for row in result2] table_records += [dict(row) for row in result3] print('table_records', table_records) elapsed_time = int((time.time() - start_time) * 1000) # 毫秒 # 使用json.dumps确保中文正常序列化 response_data = { "code": "200", "message": "成功", "tableRecords": table_records, "ErrorNo": 0, "elapsed_time": elapsed_time } return app.response_class( response=json.dumps(response_data, ensure_ascii=False), mimetype='application/json' ) except Exception as e: return jsonify({ "code": "500", "message": f"服务器错误: {str(e)}", "data": None }), 500 finally: print('close') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000,debug=True)