207 lines
7.2 KiB
Python
207 lines
7.2 KiB
Python
|
||
import os
|
||
import glob
|
||
from collections import defaultdict
|
||
import sqlite3
|
||
from sqlfluff.api import fix
|
||
from datetime import datetime
|
||
|
||
DB_NAME = '../hstool.db'
|
||
|
||
# 美化sql语句 使用sqlfluff太耗时了 整个流程估计要七八分钟左右
|
||
def format_with_sqlfluff(sql_code, dialect='oracle'):
|
||
return fix(sql_code, dialect=dialect)
|
||
def extract_sql_task_info(file_path):
|
||
# 初始化结果字典
|
||
result = {
|
||
'taskName': None,
|
||
'srcTable': None,
|
||
'targetTable': None,
|
||
'targetTableUniqueIndex': None,
|
||
'querySql': None
|
||
}
|
||
|
||
# 读取文件内容
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
lines = file.readlines()
|
||
|
||
# 用于标记是否进入begin-end块
|
||
in_block = False
|
||
|
||
# 处理每行内容
|
||
for i, line in enumerate(lines):
|
||
# 去除行尾注释和空白
|
||
clean_line = line.split('--')[0].strip() # 去除行尾注释
|
||
clean_line = clean_line.split('/*')[0].strip() # 去除块注释开头
|
||
clean_line = clean_line.strip() # 去除前后空白
|
||
|
||
# 跳过空行
|
||
if not clean_line:
|
||
continue
|
||
|
||
# 检测是否进入[begin]-[end]块
|
||
if clean_line.lower() == '[begin]':
|
||
in_block = True
|
||
continue
|
||
|
||
if clean_line.lower() == '[end]':
|
||
in_block = False
|
||
continue
|
||
|
||
# 只在[begin]-[end]块内处理内容
|
||
if not in_block:
|
||
continue
|
||
|
||
# 处理任务信息
|
||
if clean_line.startswith('taskName:'):
|
||
result['taskName'] = clean_line.split(':', 1)[1].strip()
|
||
elif clean_line.startswith('srcTable:'):
|
||
result['srcTable'] = clean_line.split(':', 1)[1].strip()
|
||
elif clean_line.startswith('targetTable:'):
|
||
result['targetTable'] = clean_line.split(':', 1)[1].strip()
|
||
elif clean_line.startswith('targetTableUniqueIndex:'):
|
||
result['targetTableUniqueIndex'] = clean_line.split(':', 1)[1].strip()
|
||
|
||
# 处理查询SQL
|
||
elif clean_line.startswith('[querySqlBegin]'):
|
||
# SQL在下一行
|
||
if i + 1 < len(lines):
|
||
# 直接取下一行原内容(不过多处理)
|
||
querySql = lines[i + 1].strip()
|
||
# 格式化sql语句
|
||
formatted_sql = format_with_sqlfluff(querySql)
|
||
# print('格式化后的sql', formatted_sql)
|
||
result['querySql'] = formatted_sql
|
||
|
||
return result
|
||
|
||
|
||
def process_directory(directory='.'):
|
||
"""递归处理目录下所有SQL文件,返回包含提取信息的字典"""
|
||
results = {
|
||
'files': [], # 所有文件信息
|
||
'by_task_name': {}, # 按任务名称组织的任务信息
|
||
'stats': {
|
||
'total_files': 0,
|
||
'processed_files': 0,
|
||
'files_with_sql': 0
|
||
}
|
||
}
|
||
|
||
# 递归查找所有SQL文件
|
||
for root, _, files in os.walk(directory):
|
||
for file in files:
|
||
if file.lower().endswith('.sql'):
|
||
file_path = os.path.join(root, file)
|
||
relative_path = os.path.relpath(file_path, directory)
|
||
results['stats']['total_files'] += 1
|
||
|
||
try:
|
||
file_info = extract_sql_task_info(file_path)
|
||
|
||
# 如果有有效数据才加入结果
|
||
if any(file_info.values()):
|
||
results['files'].append({
|
||
'file_path': relative_path, # 使用相对路径
|
||
'info': file_info
|
||
})
|
||
results['stats']['processed_files'] += 1
|
||
|
||
# 记录包含SQL查询的文件
|
||
if file_info['querySql']:
|
||
results['stats']['files_with_sql'] += 1
|
||
|
||
# 按任务名称组织结果(任务名称不为空时才添加)
|
||
if file_info['taskName']:
|
||
results['by_task_name'][file_info['taskName']] = file_info
|
||
except Exception as e:
|
||
print(f"处理文件 {relative_path} 时出错: {e}")
|
||
|
||
return results
|
||
|
||
|
||
def create_database(db_path=DB_NAME):
|
||
"""创建SQLite数据库和表结构"""
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS dts_task_info (
|
||
task_name TEXT PRIMARY KEY NOT NULL,
|
||
src_table TEXT,
|
||
target_table TEXT,
|
||
target_table_unique_index TEXT,
|
||
query_sql TEXT,
|
||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def insert_into_sqlite(task_info, db_path=DB_NAME):
|
||
"""
|
||
将任务信息导入SQLite数据库
|
||
:param task_info: extract_sql_task_info函数返回的字典
|
||
:param db_path: SQLite数据库路径
|
||
"""
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO dts_task_info (
|
||
task_name,
|
||
src_table,
|
||
target_table,
|
||
target_table_unique_index,
|
||
query_sql
|
||
) VALUES (?, ?, ?, ?, ?)
|
||
''', (
|
||
task_info['taskName'],
|
||
task_info['srcTable'],
|
||
task_info['targetTable'],
|
||
task_info['targetTableUniqueIndex'],
|
||
task_info['querySql']
|
||
))
|
||
conn.commit()
|
||
print(f"✅ 成功导入任务信息: {task_info['taskName']}")
|
||
except sqlite3.Error as e:
|
||
print(f"❌ 导入失败: {task_info['taskName']}, 错误: {e}")
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# 修改后的示例用法
|
||
if __name__ == "__main__":
|
||
# 处理当前目录下所有SQL文件
|
||
directory_path = r'E:\dtsCode\dts\DevCodes\broker-dcp\dcp-resources\src\main\resources\Uf20ToUft'
|
||
all_results = process_directory(directory_path)
|
||
|
||
# 创建数据库
|
||
create_database()
|
||
|
||
# 打印并导入每个任务的信息
|
||
for file_info in all_results['files']:
|
||
task_info = file_info['info']
|
||
task_name = task_info['taskName']
|
||
|
||
# 打印任务信息
|
||
print(f"\n=== 任务: {task_name} ===")
|
||
print(f"源表: {task_info['srcTable']}")
|
||
print(f"目标表: {task_info['targetTable']}")
|
||
print(f"唯一索引: {task_info['targetTableUniqueIndex']}")
|
||
print(f"查询SQL:\n{task_info['querySql']}")
|
||
|
||
# 导入到数据库
|
||
if task_name: # 确保任务名称不为空
|
||
insert_into_sqlite(task_info)
|
||
else:
|
||
print(f"⚠️ 跳过未命名任务: {file_info['file_name']}")
|
||
|
||
# 打印统计信息
|
||
print(f"\n统计信息:")
|
||
print(f"共找到 {all_results['stats']['total_files']} 个SQL文件")
|
||
print(f"成功处理 {all_results['stats']['processed_files']} 个文件")
|
||
print(f"其中 {all_results['stats']['files_with_sql']} 个文件包含SQL查询")
|
||
print(f"成功导入 {len([f for f in all_results['files'] if f['info']['taskName']])} 个任务信息")
|