Files
my_py_tools/t2sdk/t2sdk_demo_syn.py
2025-10-18 21:32:31 +08:00

200 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding=utf-8
from importlib import reload # Python 3 推荐方式
from datetime import datetime
from array import array
import time
import threading
import traceback
import sys
reload(sys)
# 添加.pyd文件所在的目录路径可以是绝对路径或相对路径
sys.path.append(r"F:\sescode_review\toolset\hstools\py-hstools\t2sdk")
import py_t2sdk
# 打包
def GetT2Pack(mdbsql):
pLoginPack = py_t2sdk.pyIF2Packer()
pLoginPack.BeginPack()
if mdbsql.endswith('.sql'):
with open('.\sql\\' + mdbsql, "r", encoding="utf-8") as file:
content = file.read() # 读取整个文件内容到字符串
# print(content)
# 获取当天日期格式YYYYMMDD
today = datetime.now().strftime("%Y%m%d") # 例如20250903
# 替换占位符
sql_final = content.format(init_date=today) # 替换 {init_date}
else:
sql_final = mdbsql
print('len:', len(sql_final))
# 添加包体
pLoginPack.AddField('sql', 'S', len(sql_final)*1.5, 0)
if 'show tables' not in sql_final:
pLoginPack.AddField('general', 'S')
pLoginPack.AddStr(sql_final)
if 'show' not in sql_final:
pLoginPack.AddStr(" ")
pLoginPack.EndPack()
return pLoginPack
def GetT2Msg(lpPack, t2FunctionNo, t2SystemNo):
pyMsg = py_t2sdk.pyIBizMessage()
pyMsg.SetFunction(t2FunctionNo)
pyMsg.SetSystemNo(t2SystemNo)
pyMsg.SetPacketType(0)
pyMsg.SetContent(lpPack.GetPackBuf(),lpPack.GetPackLen())
return pyMsg
def PrintUnpack(lpUnpack):
iDataSetCount = lpUnpack.GetDatasetCount()
index = 0
print('count ', iDataSetCount)
print('开始解包')
while index < iDataSetCount:
lpUnpack.SetCurrentDatasetByIndex(index)
iRowCount = lpUnpack.GetRowCount()
print('iRowCount:', iRowCount)
RowIndex = 0
while RowIndex < iRowCount:
iColCount = lpUnpack.GetColCount()
print('iColCount', iColCount)
iColIndex = 0
while iColIndex < iColCount:
ColType = lpUnpack.GetColType(iColIndex)
print('coltype', ColType)
if ColType == 'S':
print(lpUnpack.GetColName(iColIndex) + ':' + lpUnpack.GetStrByIndex(iColIndex))
elif ColType == 'I':
print(lpUnpack.GetColName(iColIndex) + ':' + str(lpUnpack.GetIntByIndex(iColIndex)))
elif ColType == 'C':
print(lpUnpack.GetColName(iColIndex) + ':' + lpUnpack.GetCharByIndex(iColIndex))
elif ((ColType == 'D') | (ColType == 'F')):
print(lpUnpack.GetColName(iColIndex) + ':' + str(lpUnpack.GetDoubleByIndex(iColIndex)))
iColIndex += 1
lpUnpack.Next()
RowIndex += 1
index += 1
print('结束解包')
# 将返回结果集转换字典数组
def UnpackToMapArr(lpUnpack):
iDataSetCount = lpUnpack.GetDatasetCount()
index = 0
print('count ', iDataSetCount)
print('开始解包')
result = []
while index < iDataSetCount:
lpUnpack.SetCurrentDatasetByIndex(index)
iRowCount = lpUnpack.GetRowCount()
print('iRowCount:', iRowCount)
RowIndex = 0
while RowIndex < iRowCount:
iColCount = lpUnpack.GetColCount()
# print('iColCount', iColCount)
iColIndex = 0
arr = {}
while iColIndex < iColCount:
ColType = lpUnpack.GetColType(iColIndex)
# print('coltype', ColType)
if ColType == 'S':
# print(lpUnpack.GetColName(iColIndex) + ':' + lpUnpack.GetStrByIndex(iColIndex))
arr[lpUnpack.GetColName(iColIndex)] = lpUnpack.GetStrByIndex(iColIndex)
elif ColType == 'I':
# print(lpUnpack.GetColName(iColIndex) + ':' + str(lpUnpack.GetIntByIndex(iColIndex)))
arr[lpUnpack.GetColName(iColIndex)] = str(lpUnpack.GetIntByIndex(iColIndex))
elif ColType == 'C':
# print(lpUnpack.GetColName(iColIndex) + ':' + lpUnpack.GetCharByIndex(iColIndex))
arr[lpUnpack.GetColName(iColIndex)] = lpUnpack.GetCharByIndex(iColIndex)
elif ((ColType == 'D') | (ColType == 'F')):
# print(lpUnpack.GetColName(iColIndex) + ':' + str(lpUnpack.GetDoubleByIndex(iColIndex)))
arr[lpUnpack.GetColName(iColIndex)] = str(lpUnpack.GetDoubleByIndex(iColIndex))
iColIndex += 1
result.append(arr)
lpUnpack.Next()
RowIndex += 1
index += 1
# print('结束解包')
return result
def execMdbSql(t2Host,t2Port,mdbNodeName,mdbSql):
py_t2sdk.PyT2sdkInitialize()
# 初始化配置
config = py_t2sdk.pyCConfigInterface()
config.Load('t2sdk.ini')
# 重新设置t2地址
config.SetString('t2sdk','servers', t2Host + ":" + t2Port)
connect = py_t2sdk.pyConnectionInterface(config)
pCallBack = py_t2sdk.pyCallbackInterface('pyCallBack', 'pyCallBack')
pCallBack.InitInstance()
ret = connect.Create2BizMsg(pCallBack)
result = []
if 'cbp' in mdbNodeName:
t2FunctionNo = 508901
else:
t2FunctionNo = 508902
last_two = mdbNodeName[-2:] # 提取最后两位字符
t2SystemNo = int(last_two) # 转换为整数
if ret != 0:
print('creat faild!!')
exit()
ret = connect.Connect(3000)
if ret != 0:
print('connect faild:')
ErrMsg = connect.GetErrorMsg(ret)
print(ErrMsg)
exit()
print('YES')
try:
# 设置pack包体
pLoginPack = GetT2Pack(mdbSql)
# 设置功能号,系统号等
pMsg = GetT2Msg(pLoginPack, t2FunctionNo, t2SystemNo)
pLoginPack.FreeMem()
pLoginPack.Release()
ret = connect.SendBizMsg(pMsg)
time.sleep(1)
pMsg.Release()
iRetAns, pyMsgAns = connect.RecvBizMsg(ret, 6000)
if iRetAns == 0:
if pyMsgAns.GetErrorNo() == 0:
buf, len = pyMsgAns.GetContent()
LoginUnPack = py_t2sdk.pyIF2UnPacker()
LoginUnPack.Open(buf, len)
# PrintUnpack(LoginUnPack)
result = UnpackToMapArr(LoginUnPack)
LoginUnPack.Release()
else:
# print(pyMsgAns.GetErrorNo())
# print(pyMsgAns.GetErrorInfo())
res = {}
res['error_no'] = pyMsgAns.GetErrorNo()
res['error_info'] = mdbNodeName + ':' + pyMsgAns.GetErrorInfo()
result.append(res)
else:
# print(iRetAns)
# print(connect.GetErrorMsg(iRetAns))
res = {}
res['error_no'] = iRetAns
res['error_info'] = mdbNodeName + ':' + connect.GetErrorMsg(iRetAns)
result.append(res)
pyMsgAns.Release()
except:
traceback.print_exc()
finally:
print('finally')
return result
py_t2sdk.PyT2sdkFinalize()
if __name__ == '__main__':
# py_t2sdk.PyT2sdkInitialize()
result = execMdbSql('10.20.163.101', '14007','usesbid92', "init_date.sql")
print(result)
# raw_input("Press Enter")
# py_t2sdk.PyT2sdkFinalize()