Files
my_py_tools/import_files/import_dts_info.py
2025-10-18 21:32:31 +08:00

207 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import glob
from collections import defaultdict
import sqlite3
from sqlfluff.api import fix
from datetime import datetime
DB_NAME = '../hstool.db'
# 美化sql语句 使用sqlfluff太耗时了 整个流程估计要七八分钟左右
def format_with_sqlfluff(sql_code, dialect='oracle'):
return fix(sql_code, dialect=dialect)
def extract_sql_task_info(file_path):
# 初始化结果字典
result = {
'taskName': None,
'srcTable': None,
'targetTable': None,
'targetTableUniqueIndex': None,
'querySql': None
}
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 用于标记是否进入begin-end块
in_block = False
# 处理每行内容
for i, line in enumerate(lines):
# 去除行尾注释和空白
clean_line = line.split('--')[0].strip() # 去除行尾注释
clean_line = clean_line.split('/*')[0].strip() # 去除块注释开头
clean_line = clean_line.strip() # 去除前后空白
# 跳过空行
if not clean_line:
continue
# 检测是否进入[begin]-[end]块
if clean_line.lower() == '[begin]':
in_block = True
continue
if clean_line.lower() == '[end]':
in_block = False
continue
# 只在[begin]-[end]块内处理内容
if not in_block:
continue
# 处理任务信息
if clean_line.startswith('taskName:'):
result['taskName'] = clean_line.split(':', 1)[1].strip()
elif clean_line.startswith('srcTable:'):
result['srcTable'] = clean_line.split(':', 1)[1].strip()
elif clean_line.startswith('targetTable:'):
result['targetTable'] = clean_line.split(':', 1)[1].strip()
elif clean_line.startswith('targetTableUniqueIndex:'):
result['targetTableUniqueIndex'] = clean_line.split(':', 1)[1].strip()
# 处理查询SQL
elif clean_line.startswith('[querySqlBegin]'):
# SQL在下一行
if i + 1 < len(lines):
# 直接取下一行原内容(不过多处理)
querySql = lines[i + 1].strip()
# 格式化sql语句
formatted_sql = format_with_sqlfluff(querySql)
# print('格式化后的sql', formatted_sql)
result['querySql'] = formatted_sql
return result
def process_directory(directory='.'):
"""递归处理目录下所有SQL文件返回包含提取信息的字典"""
results = {
'files': [], # 所有文件信息
'by_task_name': {}, # 按任务名称组织的任务信息
'stats': {
'total_files': 0,
'processed_files': 0,
'files_with_sql': 0
}
}
# 递归查找所有SQL文件
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith('.sql'):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, directory)
results['stats']['total_files'] += 1
try:
file_info = extract_sql_task_info(file_path)
# 如果有有效数据才加入结果
if any(file_info.values()):
results['files'].append({
'file_path': relative_path, # 使用相对路径
'info': file_info
})
results['stats']['processed_files'] += 1
# 记录包含SQL查询的文件
if file_info['querySql']:
results['stats']['files_with_sql'] += 1
# 按任务名称组织结果(任务名称不为空时才添加)
if file_info['taskName']:
results['by_task_name'][file_info['taskName']] = file_info
except Exception as e:
print(f"处理文件 {relative_path} 时出错: {e}")
return results
def create_database(db_path=DB_NAME):
"""创建SQLite数据库和表结构"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS dts_task_info (
task_name TEXT PRIMARY KEY NOT NULL,
src_table TEXT,
target_table TEXT,
target_table_unique_index TEXT,
query_sql TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def insert_into_sqlite(task_info, db_path=DB_NAME):
"""
将任务信息导入SQLite数据库
:param task_info: extract_sql_task_info函数返回的字典
:param db_path: SQLite数据库路径
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO dts_task_info (
task_name,
src_table,
target_table,
target_table_unique_index,
query_sql
) VALUES (?, ?, ?, ?, ?)
''', (
task_info['taskName'],
task_info['srcTable'],
task_info['targetTable'],
task_info['targetTableUniqueIndex'],
task_info['querySql']
))
conn.commit()
print(f"✅ 成功导入任务信息: {task_info['taskName']}")
except sqlite3.Error as e:
print(f"❌ 导入失败: {task_info['taskName']}, 错误: {e}")
finally:
conn.close()
# 修改后的示例用法
if __name__ == "__main__":
# 处理当前目录下所有SQL文件
directory_path = r'E:\dtsCode\dts\DevCodes\broker-dcp\dcp-resources\src\main\resources\Uf20ToUft'
all_results = process_directory(directory_path)
# 创建数据库
create_database()
# 打印并导入每个任务的信息
for file_info in all_results['files']:
task_info = file_info['info']
task_name = task_info['taskName']
# 打印任务信息
print(f"\n=== 任务: {task_name} ===")
print(f"源表: {task_info['srcTable']}")
print(f"目标表: {task_info['targetTable']}")
print(f"唯一索引: {task_info['targetTableUniqueIndex']}")
print(f"查询SQL:\n{task_info['querySql']}")
# 导入到数据库
if task_name: # 确保任务名称不为空
insert_into_sqlite(task_info)
else:
print(f"⚠️ 跳过未命名任务: {file_info['file_name']}")
# 打印统计信息
print(f"\n统计信息:")
print(f"共找到 {all_results['stats']['total_files']} 个SQL文件")
print(f"成功处理 {all_results['stats']['processed_files']} 个文件")
print(f"其中 {all_results['stats']['files_with_sql']} 个文件包含SQL查询")
print(f"成功导入 {len([f for f in all_results['files'] if f['info']['taskName']])} 个任务信息")