417 lines
19 KiB
Python
417 lines
19 KiB
Python
import os
|
||
import public
|
||
import re
|
||
import json
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
|
||
# 定义一个类用用来生成函数调用关系
|
||
class BuildMap:
|
||
def __init__(self, projectPath='F:\\sesCode\\',
|
||
# subProject=["upub", "uconvert", "umgr", "uqms", "ucbp", "uses", "ucrt"],
|
||
subProject=["umgr","uses", "ucrt", "ucbp", "uconvert", "uqms", "upub"],
|
||
uftatom="\\dev_codes\\uftatom",
|
||
uftbusiness="\\dev_codes\\uftbusiness",
|
||
extend_name=['uftservice', 'uftfunction', 'uftatomfunction', 'uftatomservice', 'extinterface'],
|
||
model_flag=1):
|
||
self.projectPath = projectPath # 工程目录地址
|
||
self.subProject = subProject
|
||
self.uftatom = uftatom
|
||
self.uftbusiness = uftbusiness
|
||
self.search_direction = 0 # 搜索方向,默认为0, 0: 正向搜索 1:反向搜索
|
||
self.model_flag = model_flag # 查询模式 0:默认 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
self.extend = extend_name # 遍历的文件后缀名称
|
||
# find_function, find_txt, gene_file, model_flag, project='uses'
|
||
self.find_function = []
|
||
self.find_txt = ''
|
||
self.gene_file = 'UF30函数调用图.txt'
|
||
self.project = 'uses'
|
||
# 定义全局变量,用来保存函数信息
|
||
# 核心数据结构
|
||
# key:name value:self.FunctionInfo
|
||
self.function_map = {}
|
||
# 定义变量,保存id和Chinese_name
|
||
# key:id value:name
|
||
self.id_name = {}
|
||
self.debug = 0
|
||
|
||
def set_project(self, project):
|
||
self.project = project
|
||
|
||
def set_project_path(self, projectPath):
|
||
self.projectPath = projectPath
|
||
|
||
def set_search_direction(self, search_direction):
|
||
# 获取当前时间并格式化
|
||
# current_time = datetime.now().strftime("%Y%m%d_%H%M%S") # 格式化为年月日_时分秒
|
||
self.search_direction = search_direction
|
||
# if search_direction == 0:
|
||
# self.set_gene_file(f'{self.find_function[0]}-函数调用图_{current_time}.txt')
|
||
# else:
|
||
# self.set_gene_file(f'{self.find_function[0]}-函数被调用图_{current_time}.txt')
|
||
|
||
def set_find_function(self, find_function):
|
||
self.find_function = find_function
|
||
|
||
def set_find_txt(self, find_txt):
|
||
self.find_txt = find_txt
|
||
|
||
def set_gene_file(self, gene_file):
|
||
self.gene_file = gene_file
|
||
|
||
# 用来存储函数信息
|
||
class FunctionInfo:
|
||
def __init__(self, name, id, path, pre_ids=None, next_ids=None):
|
||
self.name = name
|
||
self.id = id
|
||
self.path = path
|
||
self.pre_ids = pre_ids if pre_ids is not None else [] # 确保默认为空列表
|
||
self.next_ids = next_ids if next_ids is not None else [] # 确保默认为空列表
|
||
|
||
# 1.0 保存单个函数的信息,生成全图函数信息时使用
|
||
# 遍历目录,将函数对应中文名称,功能号,函数路径,提取出来;
|
||
# 先以"key":"value"形式保存到functions_map 和 id_name 里面
|
||
# 同时将其 记录到 sub_project + ".txt" 文件中 比如umgr.txt upbu.txt
|
||
def traverse_directory(self, dir_path, sub_project, functions_map, id_name):
|
||
# 获取当前目录下的所有文件和目录
|
||
# print('fir_path:', dir_path)
|
||
if os.path.exists(dir_path):
|
||
entries = os.scandir(dir_path)
|
||
for entry in entries:
|
||
# 拼接完整的文件或目录路径
|
||
full_path = os.path.join(dir_path, entry.name)
|
||
|
||
if entry.is_file():
|
||
file_ext = public.extract_file_extension(full_path)
|
||
if file_ext in self.extend:
|
||
# print(full_path)
|
||
# 读取第二行内容
|
||
temp_text = public.read_second_line(full_path)
|
||
# print(temp_text)
|
||
# 提取名称功能号
|
||
matches = public.extract_name_functionNo(temp_text)
|
||
if matches is None:
|
||
continue
|
||
chinese_name = matches[0]
|
||
object_id = matches[1]
|
||
if chinese_name is None:
|
||
continue
|
||
# print(f"chineseName: {chinese_name}")
|
||
# print(f"objectId: {object_id}")
|
||
if object_id not in id_name:
|
||
id_name[object_id] = chinese_name
|
||
# 确保name的唯一性
|
||
if chinese_name in functions_map:
|
||
# print(full_path)
|
||
# print(temp_text)
|
||
# print(f"Function name '{chinese_name}' is not unique.")
|
||
pass
|
||
# raise ValueError(f"Function name '{chinese_name}' is not unique.")
|
||
else:
|
||
new_func = self.FunctionInfo(chinese_name, object_id, full_path)
|
||
functions_map[chinese_name] = new_func
|
||
if self.debug == 1:
|
||
file_name = sub_project + ".txt"
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
file.write(f"\"function_id\":\"{object_id}\",")
|
||
file.write(f"\"chinese_name\":\"{chinese_name}\",")
|
||
file.write(f"\"full_path\":\"{full_path}\"\n")
|
||
# print(new_func)
|
||
|
||
elif entry.is_dir():
|
||
# 如果是目录,则递归调用本函数
|
||
self.traverse_directory(full_path, sub_project, functions_map, id_name)
|
||
|
||
# 1.1 遍历所有函数字典然后合并函数字典信息
|
||
def traverse_all(self):
|
||
projectPath = self.projectPath
|
||
subProject = self.subProject
|
||
find_function = self.find_function
|
||
find_txt = self.find_txt
|
||
gene_file = self.gene_file
|
||
model_flag = self.model_flag
|
||
project = self.project
|
||
if find_txt == '' or find_txt == None:
|
||
model_flag = 1
|
||
|
||
# 20250823 fix 优化生成函数关系图写法,使用for循环扫描每个代码库
|
||
for model in subProject:
|
||
id_name = {}
|
||
func_map = {}
|
||
self.traverse_directory(projectPath + model, model, func_map, id_name)
|
||
self.function_map.update(func_map)
|
||
self.id_name.update(id_name)
|
||
|
||
# 再使用inner覆盖一下
|
||
inner_id_name = {}
|
||
inner_func_map = {}
|
||
if 'uses' in self.subProject:
|
||
# 再使用inner覆盖一下
|
||
self.traverse_directory(projectPath + 'uses' + '\\dev_codes\\uftbusiness\\inner', 'uses',
|
||
inner_func_map, inner_id_name)
|
||
|
||
if 'ucrt' in self.subProject:
|
||
innercrt_id_name = {}
|
||
innercrt_func_map = {}
|
||
self.traverse_directory(projectPath + 'ucrt' + '\\dev_codes\\uftbusiness\\inner', 'ucrt',
|
||
inner_func_map,
|
||
inner_id_name)
|
||
|
||
self.function_map.update(inner_func_map)
|
||
self.id_name.update(inner_id_name)
|
||
|
||
# 工具函数
|
||
# 将函数信息 字典内容 保存到txt中 file_name = "函数拓扑关系图.txt"
|
||
def save_map_totxt(self, func_map, file_name='函数拓扑关系图.txt'):
|
||
for name in func_map:
|
||
func = func_map[name]
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
file.write(f"\"function_id\":\"{func.id}\",")
|
||
file.write(f"\"chinese_name\":\"{func.name}\",")
|
||
file.write(f"\"full_path\":\"{func.path}\",")
|
||
file.write(f"\"pre_ids\":\"{func.pre_ids}\",")
|
||
file.write(f"\"next_ids\":\"{func.next_ids}\"\n")
|
||
|
||
# 2.扫描函数字典 根据函数信息 生成函数关系图,拓扑图
|
||
def scanl_func(self):
|
||
gene_file = self.gene_file
|
||
func_map = self.function_map
|
||
# 开始遍历功能号,逐行扫描文件内容 添加父子函数id 先搞uses的,暂不支持跨模块查找
|
||
for c_name in func_map:
|
||
# print(c_name)
|
||
c_func = func_map[c_name]
|
||
c_id = c_func.id
|
||
full_path = c_func.path
|
||
|
||
if not os.path.exists(full_path):
|
||
print('full_path notexists:', full_path)
|
||
continue
|
||
# 使用with语句确保文件正确关闭
|
||
with open(full_path, 'r', encoding='utf-8') as file:
|
||
# 逐行读取文件
|
||
for line in file:
|
||
# 提取函数名称
|
||
sub_func_name = public.match_func_name(line)
|
||
# 过滤掉不存在的函数
|
||
if sub_func_name == "None" or sub_func_name == "":
|
||
continue
|
||
if sub_func_name not in func_map:
|
||
continue
|
||
# 提取函数对应的功能号
|
||
sub_func = func_map[sub_func_name]
|
||
fub_func_id = sub_func.id
|
||
if fub_func_id not in c_func.next_ids:
|
||
c_func.next_ids.append(fub_func_id)
|
||
# 添加完子节点后,再反过来添加父节点
|
||
if c_id not in sub_func.pre_ids:
|
||
sub_func.pre_ids.append(c_id)
|
||
with open('UF30函数关系图.txt', 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# 建图完毕,将其保存到test.txt文件中
|
||
self.save_map_totxt(func_map, "UF30函数关系图.txt")
|
||
# 使用 'w' 模式打开文件,这会清空文件内容
|
||
with open(gene_file, 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# 生成函数调用图
|
||
# self.geneAllFucn2(find_function, func_map, id_name, "", gene_file, find_txt, model_flag, 0)
|
||
# if self.debug:
|
||
# print('生成完成')
|
||
|
||
# 3.生成函数调用或者被调用图
|
||
# 直接for循环遍历 扫描生成函数调用图 或者被调用图
|
||
def geneAllFucn2(self,find_function, tab='', call_path="\t\t【Start"):
|
||
func_map = self.function_map
|
||
id_name = self.id_name
|
||
projectPath = self.projectPath
|
||
subProject = self.subProject
|
||
# find_function = self.find_function
|
||
find_txt = self.find_txt
|
||
gene_file = self.gene_file
|
||
model_flag = self.model_flag
|
||
# print('model_flag', model_flag)
|
||
project = self.project
|
||
file_name = self.gene_file
|
||
count = 0
|
||
# call_path = "\t\t【Start"
|
||
|
||
find_flag = 0
|
||
for func_id in find_function:
|
||
if func_id not in id_name:
|
||
print(f"Not this function_id:{func_id}")
|
||
else:
|
||
index = find_function.index(func_id)
|
||
# 函数名称提取出来
|
||
func_name = id_name[func_id]
|
||
if '融资融券' in func_name:
|
||
continue
|
||
func_path = func_map[func_name].path
|
||
if find_txt != "":
|
||
with open(file_name, 'r', encoding='utf-8') as file1:
|
||
if public.check_string_in_file(func_path, find_txt):
|
||
find_flag = 1
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
if find_flag == 1:
|
||
file.write(tab + func_id + ":" + func_name + "\t\t" + find_txt + call_path + "->" + str(
|
||
func_id) + "】\n")
|
||
elif model_flag == 1:
|
||
if (self.search_direction == 1 and 'LS' in func_name and '内部远程调用' not in func_name) or call_path == "\t\t【Start":
|
||
file.write(tab + func_id + ":" + func_name + "\n")
|
||
elif self.search_direction == 0:
|
||
file.write(tab + func_id + ":" + func_name + "\n")
|
||
find_flag = 0
|
||
# 提取 next_ids 此处可以设置函数调用图或者函数被调用图
|
||
next_ids = func_map[func_name].next_ids
|
||
pre_ids = func_map[func_name].pre_ids
|
||
if self.search_direction == 1:
|
||
# 如果标志为1 代表搜索反向调用图
|
||
next_ids = pre_ids
|
||
if len(next_ids) > 0:
|
||
self.geneAllFucn2(next_ids, '\t' + tab, call_path + "->" + str(func_id))
|
||
|
||
def loadInfoFromTxt(self):
|
||
# 读取并处理文件
|
||
with open('UF30函数关系图.txt', 'r', encoding='utf-8') as file:
|
||
for line in file:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 使用正则表达式解析键值对
|
||
data = {}
|
||
pattern = r'"([^"]+)":"(.*?)"(?=,|$)'
|
||
matches = re.findall(pattern, line)
|
||
|
||
for key, value in matches:
|
||
# 处理列表类型的字段(pre_ids/next_ids)
|
||
if key in ['pre_ids', 'next_ids']:
|
||
# 将字符串列表转换为实际列表
|
||
value = value.replace("'", '"') # 替换单引号为双引号
|
||
try:
|
||
data[key] = json.loads(value)
|
||
except json.JSONDecodeError:
|
||
data[key] = []
|
||
else:
|
||
data[key] = value
|
||
|
||
# 确保所有必要字段都存在
|
||
required_fields = ['function_id', 'chinese_name', 'full_path']
|
||
if all(field in data for field in required_fields):
|
||
# 创建FunctionInfo对象
|
||
func_info = self.FunctionInfo(
|
||
name=data['chinese_name'],
|
||
id=data['function_id'],
|
||
path=data['full_path'],
|
||
pre_ids=data.get('pre_ids', []),
|
||
next_ids=data.get('next_ids', [])
|
||
)
|
||
|
||
# 更新function_map
|
||
self.function_map[func_info.name] = func_info
|
||
|
||
# 更新id_name
|
||
self.id_name[func_info.id] = func_info.name
|
||
|
||
def update_func_map(self):
|
||
# 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
self.traverse_all()
|
||
|
||
# 2.扫描函数字典 生成函数关系拓扑图
|
||
self.scanl_func()
|
||
|
||
# 初始化函数关系图到字典中
|
||
def init_func_call_map(self, forceUpdate=False):
|
||
# self.set_project(project)
|
||
# self.set_find_function(find_function)
|
||
# self.set_find_txt(find_txt)
|
||
# self.set_search_direction(search_direction)
|
||
# if find_txt:
|
||
# # 查询模式 0: 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
# self.model_flag = 0
|
||
# if gene_file:
|
||
# self.set_gene_file(gene_file)
|
||
# build.set_gene_file('函数调用图2-1.txt')
|
||
|
||
if os.path.exists('UF30函数关系图.txt') and forceUpdate == False:
|
||
# 从文件中加载配置
|
||
self.loadInfoFromTxt()
|
||
else:
|
||
# print('111')
|
||
# 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
self.traverse_all()
|
||
|
||
# print('222')
|
||
# 2.扫描函数字典 生成函数关系拓扑图
|
||
self.scanl_func()
|
||
|
||
# 20250712 add 合并远程调用的和外部接口的 函数字典信息 pre_id和next_id,解决部分函数反向调用无法穿透问题
|
||
self.merge_func_info()
|
||
|
||
|
||
def gene_func_call_map(self, find_function, find_txt, search_direction=0, project='uses', gene_file=''):
|
||
self.set_project(project)
|
||
self.set_find_function(find_function)
|
||
self.set_find_txt(find_txt)
|
||
self.set_search_direction(search_direction)
|
||
print('find_txt', find_txt)
|
||
if find_txt == ' ' or find_txt == None or find_txt == '':
|
||
# 查询模式 0: 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
self.set_find_txt("")
|
||
self.model_flag = 1
|
||
else:
|
||
self.model_flag = 0
|
||
if gene_file:
|
||
self.set_gene_file(gene_file)
|
||
|
||
# 每次生成调用图之前,先清空文件
|
||
with open(self.gene_file, 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# build.set_gene_file('函数调用图2-1.txt')
|
||
|
||
# # 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
# self.traverse_all()
|
||
|
||
# # 2.扫描函数字典 生成函数关系拓扑图
|
||
# self.scanl_func()
|
||
|
||
# 3.根据函数关系拓扑图 生成函数调用图/被调用图
|
||
self.geneAllFucn2(find_function)
|
||
# print('生成完成')
|
||
if self.debug:
|
||
print('生成完成')
|
||
|
||
# 20250712 add 添加函数,用来合并远程调用函数的pre_id和next_id,解决部分函数反向调用无法穿透问题
|
||
def merge_func_info(self):
|
||
# 更新function_map
|
||
# self.function_map[func_info.name] = func_info
|
||
|
||
# 1. 按 id 收集所有的 pre_ids 和 next_ids
|
||
id_to_combined = defaultdict(lambda: {"pre_ids": set(), "next_ids": set()})
|
||
|
||
for func in self.function_map.values():
|
||
id_to_combined[func.id]["pre_ids"].update(func.pre_ids)
|
||
id_to_combined[func.id]["next_ids"].update(func.next_ids)
|
||
|
||
# 2. 更新每一个 FunctionInfo 对象的 pre_ids 和 next_ids
|
||
for key in self.function_map:
|
||
if '远程调用' in key or '外部接口' in key:
|
||
func = self.function_map[key]
|
||
combined = id_to_combined.get(func.id, {"pre_ids": [], "next_ids": []})
|
||
|
||
# 转回 list 并排序(可选)
|
||
func.pre_ids = list(combined["pre_ids"])
|
||
func.next_ids = list(combined["next_ids"])
|
||
|
||
# 遍历所有目录
|
||
def main():
|
||
build = BuildMap()
|
||
build.init_func_call_map()
|
||
build.gene_func_call_map(['332648'], 'trade_handling_instr', search_direction=0, project='ucbp', gene_file='')
|
||
|
||
if __name__ == '__main__':
|
||
# 1.记录每个文件自己的信息,功能号,中英文名称
|
||
# 2.生成函数关系拓扑图
|
||
# 3.根据生成的函数关系图,来进行搜索之类的操作
|
||
main()
|
||
|
||
|