import os import glob from collections import defaultdict import sqlite3 from sqlfluff.api import fix from datetime import datetime DB_NAME = '../hstool.db' # 美化sql语句 使用sqlfluff太耗时了 整个流程估计要七八分钟左右 def format_with_sqlfluff(sql_code, dialect='oracle'): return fix(sql_code, dialect=dialect) def extract_sql_task_info(file_path): # 初始化结果字典 result = { 'taskName': None, 'srcTable': None, 'targetTable': None, 'targetTableUniqueIndex': None, 'querySql': None } # 读取文件内容 with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() # 用于标记是否进入begin-end块 in_block = False # 处理每行内容 for i, line in enumerate(lines): # 去除行尾注释和空白 clean_line = line.split('--')[0].strip() # 去除行尾注释 clean_line = clean_line.split('/*')[0].strip() # 去除块注释开头 clean_line = clean_line.strip() # 去除前后空白 # 跳过空行 if not clean_line: continue # 检测是否进入[begin]-[end]块 if clean_line.lower() == '[begin]': in_block = True continue if clean_line.lower() == '[end]': in_block = False continue # 只在[begin]-[end]块内处理内容 if not in_block: continue # 处理任务信息 if clean_line.startswith('taskName:'): result['taskName'] = clean_line.split(':', 1)[1].strip() elif clean_line.startswith('srcTable:'): result['srcTable'] = clean_line.split(':', 1)[1].strip() elif clean_line.startswith('targetTable:'): result['targetTable'] = clean_line.split(':', 1)[1].strip() elif clean_line.startswith('targetTableUniqueIndex:'): result['targetTableUniqueIndex'] = clean_line.split(':', 1)[1].strip() # 处理查询SQL elif clean_line.startswith('[querySqlBegin]'): # SQL在下一行 if i + 1 < len(lines): # 直接取下一行原内容(不过多处理) querySql = lines[i + 1].strip() # 格式化sql语句 formatted_sql = format_with_sqlfluff(querySql) # print('格式化后的sql', formatted_sql) result['querySql'] = formatted_sql return result def process_directory(directory='.'): """递归处理目录下所有SQL文件,返回包含提取信息的字典""" results = { 'files': [], # 所有文件信息 'by_task_name': {}, # 按任务名称组织的任务信息 'stats': { 'total_files': 0, 'processed_files': 0, 'files_with_sql': 0 } } # 递归查找所有SQL文件 for root, _, files in os.walk(directory): for file in files: if file.lower().endswith('.sql'): file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, directory) results['stats']['total_files'] += 1 try: file_info = extract_sql_task_info(file_path) # 如果有有效数据才加入结果 if any(file_info.values()): results['files'].append({ 'file_path': relative_path, # 使用相对路径 'info': file_info }) results['stats']['processed_files'] += 1 # 记录包含SQL查询的文件 if file_info['querySql']: results['stats']['files_with_sql'] += 1 # 按任务名称组织结果(任务名称不为空时才添加) if file_info['taskName']: results['by_task_name'][file_info['taskName']] = file_info except Exception as e: print(f"处理文件 {relative_path} 时出错: {e}") return results def create_database(db_path=DB_NAME): """创建SQLite数据库和表结构""" conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS dts_task_info ( task_name TEXT PRIMARY KEY NOT NULL, src_table TEXT, target_table TEXT, target_table_unique_index TEXT, query_sql TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def insert_into_sqlite(task_info, db_path=DB_NAME): """ 将任务信息导入SQLite数据库 :param task_info: extract_sql_task_info函数返回的字典 :param db_path: SQLite数据库路径 """ conn = sqlite3.connect(db_path) cursor = conn.cursor() try: cursor.execute(''' INSERT OR REPLACE INTO dts_task_info ( task_name, src_table, target_table, target_table_unique_index, query_sql ) VALUES (?, ?, ?, ?, ?) ''', ( task_info['taskName'], task_info['srcTable'], task_info['targetTable'], task_info['targetTableUniqueIndex'], task_info['querySql'] )) conn.commit() print(f"✅ 成功导入任务信息: {task_info['taskName']}") except sqlite3.Error as e: print(f"❌ 导入失败: {task_info['taskName']}, 错误: {e}") finally: conn.close() # 修改后的示例用法 if __name__ == "__main__": # 处理当前目录下所有SQL文件 directory_path = r'E:\dtsCode\dts\DevCodes\broker-dcp\dcp-resources\src\main\resources\Uf20ToUft' all_results = process_directory(directory_path) # 创建数据库 create_database() # 打印并导入每个任务的信息 for file_info in all_results['files']: task_info = file_info['info'] task_name = task_info['taskName'] # 打印任务信息 print(f"\n=== 任务: {task_name} ===") print(f"源表: {task_info['srcTable']}") print(f"目标表: {task_info['targetTable']}") print(f"唯一索引: {task_info['targetTableUniqueIndex']}") print(f"查询SQL:\n{task_info['querySql']}") # 导入到数据库 if task_name: # 确保任务名称不为空 insert_into_sqlite(task_info) else: print(f"⚠️ 跳过未命名任务: {file_info['file_name']}") # 打印统计信息 print(f"\n统计信息:") print(f"共找到 {all_results['stats']['total_files']} 个SQL文件") print(f"成功处理 {all_results['stats']['processed_files']} 个文件") print(f"其中 {all_results['stats']['files_with_sql']} 个文件包含SQL查询") print(f"成功导入 {len([f for f in all_results['files'] if f['info']['taskName']])} 个任务信息")