520 lines
24 KiB
Python
520 lines
24 KiB
Python
import os
|
||
import public
|
||
import re
|
||
import json
|
||
from datetime import datetime
|
||
|
||
# 定义一个类用用来生成函数调用关系
|
||
class UF20BuildMap:
|
||
def __init__(self, projectPath='D:\\Sources\\经纪业务运营平台V21\\',
|
||
subProject=["业务逻辑", "原子"],
|
||
uftatom="\\dev_codes\\uftatom",
|
||
uftbusiness="\\dev_codes\\uftbusiness",
|
||
extend_name=['service_design', 'function_design', 'aservice_design', 'afunction_design', 'procedure_design'],
|
||
model_flag=1):
|
||
self.projectPath = projectPath # 工程目录地址
|
||
self.subProject = subProject
|
||
self.uftatom = uftatom
|
||
self.uftbusiness = uftbusiness
|
||
self.search_direction = 0 # 搜索方向,默认为0, 0: 正向搜索 1:反向搜索
|
||
self.model_flag = model_flag # 查询模式 0:默认 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
self.extend = extend_name # 遍历的文件后缀名称
|
||
# find_function, find_txt, gene_file, model_flag, project='uses'
|
||
self.find_function = []
|
||
self.find_txt = ''
|
||
self.gene_file = 'UF20函数调用图.txt'
|
||
self.project = 'uses'
|
||
# 定义全局变量,用来保存函数信息
|
||
# 核心数据结构
|
||
# key:name value:self.FunctionInfo
|
||
self.function_map = {}
|
||
# 定义变量,保存id和Chinese_name
|
||
# key:id value:name
|
||
self.id_name = {}
|
||
self.debug = 0
|
||
self.soname = set()
|
||
|
||
|
||
def set_project(self, project):
|
||
self.project = project
|
||
|
||
def set_project_path(self, projectPath):
|
||
self.projectPath = projectPath
|
||
|
||
def set_search_direction(self, search_direction):
|
||
# 获取当前时间并格式化
|
||
# current_time = datetime.now().strftime("%Y%m%d_%H%M%S") # 格式化为年月日_时分秒
|
||
self.search_direction = search_direction
|
||
# if search_direction == 0:
|
||
# self.set_gene_file(f'{self.find_function[0]}-函数调用图_{current_time}.txt')
|
||
# else:
|
||
# self.set_gene_file(f'{self.find_function[0]}-函数被调用图_{current_time}.txt')
|
||
|
||
def set_find_function(self, find_function):
|
||
self.find_function = find_function
|
||
|
||
def set_find_txt(self, find_txt):
|
||
self.find_txt = find_txt
|
||
|
||
def set_gene_file(self, gene_file):
|
||
self.gene_file = gene_file
|
||
|
||
# 用来存储函数信息
|
||
class FunctionInfo:
|
||
def __init__(self, name, id, path, pre_ids=None, next_ids=None):
|
||
self.name = name
|
||
self.id = id
|
||
self.path = path
|
||
self.pre_ids = pre_ids if pre_ids is not None else [] # 确保默认为空列表
|
||
self.next_ids = next_ids if next_ids is not None else [] # 确保默认为空列表
|
||
|
||
# 1.0 保存单个函数的信息,生成全图函数信息时使用
|
||
# 遍历目录,将函数对应中文名称,功能号,函数路径,提取出来;
|
||
# 先以"key":"value"形式保存到functions_map 和 id_name 里面
|
||
# 同时将其 记录到 sub_project + ".txt" 文件中 比如umgr.txt upbu.txt
|
||
def traverse_directory(self, dir_path, sub_project, functions_map, id_name):
|
||
# 获取当前目录下的所有文件和目录
|
||
if os.path.exists(dir_path):
|
||
entries = os.scandir(dir_path)
|
||
for entry in entries:
|
||
# 拼接完整的文件或目录路径
|
||
full_path = os.path.join(dir_path, entry.name)
|
||
|
||
if entry.is_file():
|
||
file_ext = public.extract_file_extension(full_path)
|
||
file_name_withoutExt = public.extract_filename(full_path)
|
||
if file_ext in self.extend:
|
||
# print(full_path)
|
||
# 读取第二行内容
|
||
temp_text = public.read_five_line(full_path)
|
||
# print(temp_text)
|
||
# 提取名称功能号
|
||
matches = public.uf20_extract_name_functionNo(temp_text)
|
||
if matches is None:
|
||
continue
|
||
chinese_name = file_name_withoutExt
|
||
object_id = matches[0]
|
||
english_name = matches[1]
|
||
if chinese_name is None:
|
||
continue
|
||
# print(f"chineseName: {chinese_name}")
|
||
# print(f"objectId: {object_id}")
|
||
if object_id not in id_name:
|
||
id_name[object_id] = chinese_name
|
||
# 确保name的唯一性
|
||
if chinese_name in functions_map:
|
||
# print(full_path)
|
||
# print(temp_text)
|
||
# print(f"Function name '{chinese_name}' is not unique.")
|
||
pass
|
||
# raise ValueError(f"Function name '{chinese_name}' is not unique.")
|
||
else:
|
||
new_func = self.FunctionInfo(chinese_name, object_id, full_path)
|
||
functions_map[chinese_name] = new_func
|
||
if self.debug == 1:
|
||
file_name = sub_project + ".txt"
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
file.write(f"\"function_id\":\"{object_id}\",")
|
||
file.write(f"\"chinese_name\":\"{chinese_name}\",")
|
||
file.write(f"\"full_path\":\"{full_path}\"\n")
|
||
# print(new_func)
|
||
|
||
elif entry.is_dir():
|
||
# 如果是目录,则递归调用本函数
|
||
self.traverse_directory(full_path, sub_project, functions_map, id_name)
|
||
|
||
# 1.1 遍历所有函数字典然后合并函数字典信息
|
||
def traverse_all(self):
|
||
projectPath = self.projectPath
|
||
subProject = self.subProject
|
||
|
||
LS_id_name = {}
|
||
LS_func_map = {}
|
||
self.traverse_directory(projectPath + subProject[0], subProject[0], LS_func_map, LS_id_name)
|
||
|
||
AS_id_name = {}
|
||
AS_func_map = {}
|
||
self.traverse_directory(projectPath + subProject[1], subProject[1], AS_func_map, AS_id_name)
|
||
|
||
LS_func_map.update(AS_func_map)
|
||
LS_id_name.update(AS_id_name)
|
||
|
||
self.function_map = LS_func_map
|
||
self.id_name = LS_id_name
|
||
|
||
# 工具函数
|
||
# 将函数信息 字典内容 保存到txt中 file_name = "函数拓扑关系图.txt"
|
||
def save_map_totxt(self, func_map, file_name='函数拓扑关系图.txt'):
|
||
for name in func_map:
|
||
func = func_map[name]
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
file.write(f"\"function_id\":\"{func.id}\",")
|
||
file.write(f"\"chinese_name\":\"{func.name}\",")
|
||
file.write(f"\"full_path\":\"{func.path}\",")
|
||
file.write(f"\"pre_ids\":\"{func.pre_ids}\",")
|
||
file.write(f"\"next_ids\":\"{func.next_ids}\"\n")
|
||
|
||
# 2.扫描函数字典 根据函数信息 生成函数关系图,拓扑图
|
||
def scanl_func(self):
|
||
gene_file = self.gene_file
|
||
func_map = self.function_map
|
||
# 开始遍历功能号,逐行扫描文件内容 添加父子函数id 先搞uses的,暂不支持跨模块查找
|
||
for c_name in func_map:
|
||
# print(c_name)
|
||
c_func = func_map[c_name]
|
||
c_id = c_func.id
|
||
full_path = c_func.path
|
||
|
||
# 使用with语句确保文件正确关闭
|
||
with open(full_path, 'r', encoding='utf-8') as file:
|
||
# 逐行读取文件
|
||
for line in file:
|
||
# 提取函数名称
|
||
sub_func_name = public.match_func_name(line)
|
||
# 过滤掉不存在的函数
|
||
if sub_func_name == "None" or sub_func_name == "":
|
||
continue
|
||
if sub_func_name not in func_map:
|
||
continue
|
||
# 提取函数对应的功能号
|
||
sub_func = func_map[sub_func_name]
|
||
fub_func_id = sub_func.id
|
||
if fub_func_id not in c_func.next_ids:
|
||
c_func.next_ids.append(fub_func_id)
|
||
# 添加完子节点后,再反过来添加父节点
|
||
if c_id not in sub_func.pre_ids:
|
||
sub_func.pre_ids.append(c_id)
|
||
with open('UF20函数关系图.txt', 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# 建图完毕,将其保存到test.txt文件中
|
||
self.save_map_totxt(func_map, "UF20函数关系图.txt")
|
||
# 使用 'w' 模式打开文件,这会清空文件内容
|
||
with open(gene_file, 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# 生成函数调用图
|
||
# self.geneAllFucn2(find_function, func_map, id_name, "", gene_file, find_txt, model_flag, 0)
|
||
# if self.debug:
|
||
# print('生成完成')
|
||
|
||
# 3.生成函数调用或者被调用图
|
||
# 直接for循环遍历 扫描生成函数调用图 或者被调用图
|
||
def geneAllFucn2(self,find_function, tab='', call_path="\t\t【Start", callDeep=0):
|
||
# 增加层数限制,避免无限递归
|
||
if callDeep >= 15:
|
||
return
|
||
func_map = self.function_map
|
||
id_name = self.id_name
|
||
projectPath = self.projectPath
|
||
subProject = self.subProject
|
||
# find_function = self.find_function
|
||
find_txt = self.find_txt
|
||
gene_file = self.gene_file
|
||
model_flag = self.model_flag
|
||
project = self.project
|
||
file_name = self.gene_file
|
||
count = 0
|
||
# call_path = "\t\t【Start"
|
||
|
||
find_flag = 0
|
||
for func_id in find_function:
|
||
if func_id not in id_name:
|
||
print(f"Not this function_id:{func_id}")
|
||
else:
|
||
index = find_function.index(func_id)
|
||
# 函数名称提取出来
|
||
func_name = id_name[func_id]
|
||
if '融资融券' in func_name:
|
||
continue
|
||
func_path = func_map[func_name].path
|
||
if find_txt != "":
|
||
with open(file_name, 'r', encoding='utf-8') as file1:
|
||
if public.check_string_in_file(func_path, find_txt):
|
||
find_flag = 1
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
if find_flag == 1:
|
||
file.write(tab + func_id + ":" + func_name + "\t\t" + find_txt + call_path + "->" + str(
|
||
func_id) + "】\n")
|
||
elif model_flag == 1:
|
||
if (self.search_direction == 1 and 'LS' in func_name) or call_path == "\t\t【Start":
|
||
file.write(tab + func_id + ":" + func_name + "\n")
|
||
elif self.search_direction == 0:
|
||
file.write(tab + func_id + ":" + func_name + "\n")
|
||
find_flag = 0
|
||
# 提取 next_ids 此处可以设置函数调用图或者函数被调用图
|
||
next_ids = func_map[func_name].next_ids
|
||
pre_ids = func_map[func_name].pre_ids
|
||
if self.search_direction == 1:
|
||
# 如果标志为1 代表搜索反向调用图
|
||
next_ids = pre_ids
|
||
if len(next_ids) > 0:
|
||
# fix 20250708 dg 防止无限递归调用
|
||
if next_ids == find_function:
|
||
return
|
||
self.geneAllFucn2(next_ids, '\t' + tab, call_path + "->" + str(func_id), callDeep + 1)
|
||
|
||
def update_func_map(self):
|
||
# 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
self.traverse_all()
|
||
|
||
# 2.扫描函数字典 生成函数关系拓扑图
|
||
self.scanl_func()
|
||
|
||
def loadInfoFromTxt(self):
|
||
# 读取并处理文件
|
||
with open('UF20函数关系图.txt', 'r', encoding='utf-8') as file:
|
||
for line in file:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 使用正则表达式解析键值对
|
||
data = {}
|
||
pattern = r'"([^"]+)":"(.*?)"(?=,|$)'
|
||
matches = re.findall(pattern, line)
|
||
|
||
for key, value in matches:
|
||
# 处理列表类型的字段(pre_ids/next_ids)
|
||
if key in ['pre_ids', 'next_ids']:
|
||
# 将字符串列表转换为实际列表
|
||
value = value.replace("'", '"') # 替换单引号为双引号
|
||
try:
|
||
data[key] = json.loads(value)
|
||
except json.JSONDecodeError:
|
||
data[key] = []
|
||
else:
|
||
data[key] = value
|
||
|
||
# 确保所有必要字段都存在
|
||
required_fields = ['function_id', 'chinese_name', 'full_path']
|
||
if all(field in data for field in required_fields):
|
||
# 创建FunctionInfo对象
|
||
func_info = self.FunctionInfo(
|
||
name=data['chinese_name'],
|
||
id=data['function_id'],
|
||
path=data['full_path'],
|
||
pre_ids=data.get('pre_ids', []),
|
||
next_ids=data.get('next_ids', [])
|
||
)
|
||
|
||
# 更新function_map
|
||
self.function_map[func_info.name] = func_info
|
||
|
||
# 更新id_name
|
||
self.id_name[func_info.id] = func_info.name
|
||
|
||
def init_func_call_map(self, forceUpdate=False):
|
||
# self.set_project(project)
|
||
# self.set_find_function(find_function)
|
||
# self.set_find_txt(find_txt)
|
||
# self.set_search_direction(search_direction)
|
||
# if find_txt:
|
||
# # 查询模式 0: 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
# self.model_flag = 0
|
||
# if gene_file:
|
||
# self.set_gene_file(gene_file)
|
||
# build.set_gene_file('函数调用图2-1.txt')
|
||
|
||
if os.path.exists('UF20函数关系图.txt') and forceUpdate == False:
|
||
# 从文件中加载配置
|
||
self.loadInfoFromTxt()
|
||
else:
|
||
# 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
self.traverse_all()
|
||
|
||
# 2.扫描函数字典 生成函数关系拓扑图
|
||
self.scanl_func()
|
||
|
||
def gene_func_call_map(self, find_function, find_txt, search_direction=0, project='uses', gene_file=''):
|
||
self.set_project(project)
|
||
self.set_find_function(find_function)
|
||
self.set_find_txt(find_txt)
|
||
self.set_search_direction(search_direction)
|
||
print('find_txt:[', find_txt, ']')
|
||
if find_txt == ' ' or find_txt == None or find_txt == '':
|
||
# 查询模式 0: 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
self.set_find_txt("")
|
||
self.model_flag = 1
|
||
else:
|
||
self.model_flag = 0
|
||
print('model_flag', self.model_flag)
|
||
if gene_file:
|
||
self.set_gene_file(gene_file)
|
||
|
||
# 每次生成调用图之前,先清空文件
|
||
with open(self.gene_file, 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# build.set_gene_file('函数调用图2-1.txt')
|
||
|
||
# # 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
# self.traverse_all()
|
||
|
||
# # 2.扫描函数字典 生成函数关系拓扑图
|
||
# self.scanl_func()
|
||
|
||
# 3.根据函数关系拓扑图 生成函数调用图/被调用图
|
||
self.geneAllFucn2(find_function)
|
||
print('生成完成')
|
||
if self.debug:
|
||
print('生成完成')
|
||
|
||
# 20250825 add 新增个性化功能:生成被调用关系中AS或者LS对应的SO
|
||
def gene_func_call_map_so(self, find_function, find_txt, search_direction=1, project='uses', gene_file=''):
|
||
self.set_project(project)
|
||
self.set_find_function(find_function)
|
||
self.set_find_txt(find_txt)
|
||
self.set_search_direction(search_direction)
|
||
print('find_txt:[', find_txt, ']')
|
||
if find_txt == ' ' or find_txt == None or find_txt == '':
|
||
# 查询模式 0: 只显示搜到关键字的函数调用信息 1:显示全部调用路径
|
||
self.set_find_txt("")
|
||
self.model_flag = 1
|
||
else:
|
||
self.model_flag = 0
|
||
print('model_flag', self.model_flag)
|
||
if gene_file:
|
||
self.set_gene_file(gene_file)
|
||
|
||
# 每次生成调用图之前,先清空文件
|
||
with open(self.gene_file, 'w', encoding='utf-8') as file:
|
||
pass # 文件内容被清空,pass语句在这里不执行任何操作
|
||
# build.set_gene_file('函数调用图2-1.txt')
|
||
|
||
# # 1.遍历所有文件 生成每个库的函数字典 然后合并数据字典
|
||
# self.traverse_all()
|
||
|
||
# # 2.扫描函数字典 生成函数关系拓扑图
|
||
# self.scanl_func()
|
||
|
||
# 3.根据函数关系拓扑图 生成函数调用图/被调用图
|
||
self.geneAllFucn_so(find_function)
|
||
print('生成完成')
|
||
if self.debug:
|
||
print('生成完成')
|
||
|
||
# 20250825 add 新增个性化功能,根据搜索AS或者LS对应的so
|
||
# 直接for循环遍历 扫描生成函数调用图 或者被调用图
|
||
def geneAllFucn_so(self, find_function, tab='', call_path="\t\t【Start", callDeep=0):
|
||
# 增加层数限制,避免无限递归
|
||
if callDeep >= 15:
|
||
return
|
||
func_map = self.function_map
|
||
id_name = self.id_name
|
||
projectPath = self.projectPath
|
||
subProject = self.subProject
|
||
# find_function = self.find_function
|
||
find_txt = self.find_txt
|
||
gene_file = self.gene_file
|
||
model_flag = self.model_flag
|
||
project = self.project
|
||
file_name = self.gene_file
|
||
count = 0
|
||
# call_path = "\t\t【Start"
|
||
|
||
find_flag = 0
|
||
for func_id in find_function:
|
||
if func_id not in id_name:
|
||
print(f"Not this function_id:{func_id}")
|
||
else:
|
||
index = find_function.index(func_id)
|
||
# 函数名称提取出来
|
||
func_name = id_name[func_id]
|
||
if '融资融券' in func_name:
|
||
continue
|
||
func_path = func_map[func_name].path
|
||
if find_txt != "":
|
||
with open(file_name, 'r', encoding='utf-8') as file1:
|
||
if public.check_string_in_file(func_path, find_txt):
|
||
find_flag = 1
|
||
# print('222', func_name)
|
||
with open(file_name, 'a', encoding='utf-8') as file:
|
||
# 2.判断是否存在AS
|
||
print('222', func_name)
|
||
if 'AS_' in func_name:
|
||
# 获取对应的moudel.xml路径
|
||
# 2.1先获取函数对应信息
|
||
func_info = self.function_map[func_name]
|
||
func_path = func_info.path
|
||
service_pos = func_path.rfind("服务")
|
||
|
||
if service_pos != -1:
|
||
# 2.2 提取对应路径信息
|
||
extracted_path = func_path[:service_pos]
|
||
print(extracted_path)
|
||
# 2.3 读取moudel.xml文件
|
||
ename, database = public.read_module_xml(extracted_path + 'module.xml')
|
||
print(ename, database)
|
||
if 'DB' in database:
|
||
# AS模块中有DB,需要保存so,然后continue
|
||
# 2.4 拼接so名称
|
||
asname = 'libs_as_' + ename + 'flow.10.so'
|
||
print('asname', asname)
|
||
# 2.5 使用set保存so名称
|
||
self.soname.add(asname)
|
||
return
|
||
else:
|
||
print("路径中未找到 '服务' 目录")
|
||
elif 'LS_' in func_name:
|
||
# 获取对应的moudel.xml路径
|
||
# 2.1先获取函数对应信息
|
||
func_info = self.function_map[func_name]
|
||
func_path = func_info.path
|
||
service_pos = func_path.rfind("服务")
|
||
|
||
if service_pos != -1:
|
||
# 2.2 提取对应路径信息
|
||
extracted_path = func_path[:service_pos]
|
||
print(extracted_path)
|
||
# 2.3 读取moudel.xml文件
|
||
lsename = public.read_ls_module_xml(extracted_path + 'module.xml')
|
||
print('lsinfo:', lsename)
|
||
# 2.4 拼接so名称
|
||
lsname = 'libs_ls_' + lsename + 'flow.10.so'
|
||
print('lsname', lsname)
|
||
# 2.5 使用set保存so名称
|
||
self.soname.add(lsname)
|
||
else:
|
||
print("路径中未找到 '服务' 目录")
|
||
if find_flag == 1:
|
||
file.write(tab + func_id + ":" + func_name + "\t\t" + find_txt + call_path + "->" + str(
|
||
func_id) + "】\n")
|
||
elif model_flag == 1:
|
||
if (self.search_direction == 1 and 'LS' in func_name) or call_path == "\t\t【Start":
|
||
file.write(tab + func_id + ":" + func_name + "\n")
|
||
elif self.search_direction == 0:
|
||
file.write(tab + func_id + ":" + func_name + "\n")
|
||
find_flag = 0
|
||
# 提取 next_ids 此处可以设置函数调用图或者函数被调用图
|
||
next_ids = func_map[func_name].next_ids
|
||
pre_ids = func_map[func_name].pre_ids
|
||
if self.search_direction == 1:
|
||
# 如果标志为1 代表搜索反向调用图
|
||
next_ids = pre_ids
|
||
if len(next_ids) > 0:
|
||
# fix 20250708 dg 防止无限递归调用
|
||
if next_ids == find_function:
|
||
return
|
||
self.geneAllFucn_so(next_ids, '\t' + tab, call_path + "->" + str(func_id), callDeep + 1)
|
||
|
||
|
||
# 遍历所有目录
|
||
def main():
|
||
build = UF20BuildMap()
|
||
print('start')
|
||
build.init_func_call_map()
|
||
print('初始化完成')
|
||
build.gene_func_call_map_so(['2106807'], ' ')
|
||
print(build.soname)
|
||
with open('soinfo', 'a', encoding='utf-8') as file:
|
||
file.write('\n' + build.find_function[0] + '\t')
|
||
for sn in build.soname:
|
||
file.write(sn + ' ')
|
||
print('end')
|
||
|
||
if __name__ == '__main__':
|
||
# 1.记录每个文件自己的信息,功能号,中英文名称
|
||
# 2.生成函数关系拓扑图
|
||
# 3.根据生成的函数关系图,来进行搜索之类的操作
|
||
main()
|
||
|
||
|