add:初始化工程
This commit is contained in:
181
utilities/pushVueIp.py
Normal file
181
utilities/pushVueIp.py
Normal file
@@ -0,0 +1,181 @@
|
||||
|
||||
import subprocess
|
||||
import re
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
from retrying import retry
|
||||
import ctypes
|
||||
ctypes.windll.user32.ShowWindow(ctypes.windll.kernel32.GetConsoleWindow(), 0) # 强制隐藏
|
||||
|
||||
class IPMonitor:
|
||||
"""
|
||||
IP监控与推送服务
|
||||
功能:
|
||||
1. 自动检测hundsun.com局域网的IP变化
|
||||
2. IP变化时自动推送到指定服务器
|
||||
3. 支持失败重试和超时设置
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_url="http://10.20.163.101:5535/ldp?Pretty=1&PluginName=ldp_db_listen_plugin&FuncName=SetVueConfig",
|
||||
fixed_port="5173",
|
||||
ip_storage_file="last_ip.txt",
|
||||
request_timeout=5
|
||||
):
|
||||
"""
|
||||
初始化监控器
|
||||
:param server_url: 推送目标URL
|
||||
:param fixed_port: 固定端口号
|
||||
:param ip_storage_file: IP存储文件路径
|
||||
:param request_timeout: 请求超时时间(秒)
|
||||
"""
|
||||
self.server_url = server_url
|
||||
self.fixed_port = fixed_port
|
||||
self.ip_storage_file = ip_storage_file
|
||||
self.request_timeout = request_timeout
|
||||
|
||||
def get_hundsun_ip(self):
|
||||
"""获取hundsun.com局域网的当前IP"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['ipconfig', '/all'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
# pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(\d+\.\d+\.\d+\.\d+)'
|
||||
# 修改后的正确模式(匹配127.24开头的IP)
|
||||
pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(172\.24\.\d+\.\d+)'
|
||||
match = re.search(pattern, result.stdout, re.DOTALL)
|
||||
return match.group(1) if match else None
|
||||
except Exception as e:
|
||||
# print(f"[IPMonitor] 获取IP失败: {e}")
|
||||
return None
|
||||
|
||||
def get_handsome_ip(self):
|
||||
"""获取hs.handsome.com.cn局域网的当前IP"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['ipconfig', '/all'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
# 修改模式以匹配hs.handsome.com.cn后缀和IPv4地址
|
||||
pattern = r'hs\.handsome\.com\.cn.*?IPv4 地址[^\d]*(10\.\d+\.\d+\.\d+)'
|
||||
match = re.search(pattern, result.stdout, re.DOTALL)
|
||||
return match.group(1) if match else None
|
||||
except Exception as e:
|
||||
# print(f"[IPMonitor] 获取IP失败: {e}")
|
||||
return None
|
||||
|
||||
@retry(stop_max_attempt_number=3, wait_fixed=2000)
|
||||
def _send_ip(self, ip_address):
|
||||
"""内部方法:发送IP到服务器(自动重试)"""
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
data = {
|
||||
"Request": {
|
||||
"vueHost": ip_address,
|
||||
"vuePort": self.fixed_port
|
||||
}
|
||||
}
|
||||
try:
|
||||
response = requests.post(
|
||||
self.server_url,
|
||||
headers=headers,
|
||||
data=json.dumps(data),
|
||||
timeout=self.request_timeout
|
||||
)
|
||||
if not response.ok:
|
||||
raise Exception(f"服务器返回 {response.status_code}: {response.text}")
|
||||
return True
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"网络请求失败: {e}")
|
||||
|
||||
def push_current_ip(self):
|
||||
"""
|
||||
强制推送当前IP到服务器
|
||||
:return: (success, ip) 元组
|
||||
"""
|
||||
current_ip = self.get_hundsun_ip()
|
||||
if not current_ip:
|
||||
return (False, None)
|
||||
|
||||
try:
|
||||
if self._send_ip(current_ip):
|
||||
self._save_ip(current_ip)
|
||||
return (True, current_ip)
|
||||
except Exception as e:
|
||||
# print(f"[IPMonitor] 推送失败: {e}")
|
||||
pass
|
||||
return (False, current_ip)
|
||||
|
||||
def check_and_push(self):
|
||||
"""
|
||||
检查IP变化并自动推送
|
||||
:return: (changed, success, ip) 元组
|
||||
"""
|
||||
current_ip = self.get_hundsun_ip()
|
||||
if current_ip == None:
|
||||
current_ip = self.get_handsome_ip()
|
||||
last_ip = self._load_ip()
|
||||
|
||||
if not current_ip:
|
||||
return (False, False, None)
|
||||
|
||||
if current_ip != last_ip:
|
||||
try:
|
||||
if self._send_ip(current_ip):
|
||||
self._save_ip(current_ip)
|
||||
return (True, True, current_ip)
|
||||
return (True, False, current_ip)
|
||||
except Exception as e:
|
||||
# print(f"[IPMonitor] 推送失败: {e}")
|
||||
return (True, False, current_ip)
|
||||
return (False, True, current_ip)
|
||||
|
||||
def _save_ip(self, ip):
|
||||
"""内部方法:保存IP到文件"""
|
||||
try:
|
||||
with open(self.ip_storage_file, 'w') as f:
|
||||
f.write(ip)
|
||||
except Exception as e:
|
||||
# print(f"[IPMonitor] 保存IP失败: {e}")
|
||||
pass
|
||||
|
||||
def _load_ip(self):
|
||||
"""内部方法:从文件加载IP"""
|
||||
if not os.path.exists(self.ip_storage_file):
|
||||
return None
|
||||
try:
|
||||
with open(self.ip_storage_file, 'r') as f:
|
||||
return f.read().strip()
|
||||
except Exception as e:
|
||||
# print(f"[IPMonitor] 读取IP失败: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# 示例使用方式
|
||||
if __name__ == "__main__":
|
||||
# 初始化监控器(可自定义参数)
|
||||
monitor = IPMonitor()
|
||||
|
||||
# 示例1:单次检查并推送
|
||||
# changed, success, ip = monitor.check_and_push()
|
||||
# print(f"\n检查结果: 变化={changed}, 成功={success}, IP={ip}")
|
||||
|
||||
# 示例2:强制推送当前IP
|
||||
# success, ip = monitor.push_current_ip()
|
||||
# print(f"强制推送结果: 成功={success}, IP={ip}")
|
||||
|
||||
# 示例3:定时任务集成
|
||||
import schedule
|
||||
def job():
|
||||
monitor.check_and_push()
|
||||
schedule.every(5).minutes.do(job)
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(1)
|
||||
Reference in New Issue
Block a user