Files
my_py_tools/utilities/pushVueIp.py
2025-10-18 21:32:31 +08:00

182 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import re
import requests
import json
import time
import os
from retrying import retry
import ctypes
ctypes.windll.user32.ShowWindow(ctypes.windll.kernel32.GetConsoleWindow(), 0) # 强制隐藏
class IPMonitor:
"""
IP监控与推送服务
功能:
1. 自动检测hundsun.com局域网的IP变化
2. IP变化时自动推送到指定服务器
3. 支持失败重试和超时设置
"""
def __init__(
self,
server_url="http://10.20.163.101:5535/ldp?Pretty=1&PluginName=ldp_db_listen_plugin&FuncName=SetVueConfig",
fixed_port="5173",
ip_storage_file="last_ip.txt",
request_timeout=5
):
"""
初始化监控器
:param server_url: 推送目标URL
:param fixed_port: 固定端口号
:param ip_storage_file: IP存储文件路径
:param request_timeout: 请求超时时间(秒)
"""
self.server_url = server_url
self.fixed_port = fixed_port
self.ip_storage_file = ip_storage_file
self.request_timeout = request_timeout
def get_hundsun_ip(self):
"""获取hundsun.com局域网的当前IP"""
try:
result = subprocess.run(
['ipconfig', '/all'],
capture_output=True,
text=True,
check=True
)
# pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(\d+\.\d+\.\d+\.\d+)'
# 修改后的正确模式匹配127.24开头的IP
pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(172\.24\.\d+\.\d+)'
match = re.search(pattern, result.stdout, re.DOTALL)
return match.group(1) if match else None
except Exception as e:
# print(f"[IPMonitor] 获取IP失败: {e}")
return None
def get_handsome_ip(self):
"""获取hs.handsome.com.cn局域网的当前IP"""
try:
result = subprocess.run(
['ipconfig', '/all'],
capture_output=True,
text=True,
check=True
)
# 修改模式以匹配hs.handsome.com.cn后缀和IPv4地址
pattern = r'hs\.handsome\.com\.cn.*?IPv4 地址[^\d]*(10\.\d+\.\d+\.\d+)'
match = re.search(pattern, result.stdout, re.DOTALL)
return match.group(1) if match else None
except Exception as e:
# print(f"[IPMonitor] 获取IP失败: {e}")
return None
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def _send_ip(self, ip_address):
"""内部方法发送IP到服务器自动重试"""
headers = {'Content-Type': 'application/json'}
data = {
"Request": {
"vueHost": ip_address,
"vuePort": self.fixed_port
}
}
try:
response = requests.post(
self.server_url,
headers=headers,
data=json.dumps(data),
timeout=self.request_timeout
)
if not response.ok:
raise Exception(f"服务器返回 {response.status_code}: {response.text}")
return True
except requests.exceptions.RequestException as e:
raise Exception(f"网络请求失败: {e}")
def push_current_ip(self):
"""
强制推送当前IP到服务器
:return: (success, ip) 元组
"""
current_ip = self.get_hundsun_ip()
if not current_ip:
return (False, None)
try:
if self._send_ip(current_ip):
self._save_ip(current_ip)
return (True, current_ip)
except Exception as e:
# print(f"[IPMonitor] 推送失败: {e}")
pass
return (False, current_ip)
def check_and_push(self):
"""
检查IP变化并自动推送
:return: (changed, success, ip) 元组
"""
current_ip = self.get_hundsun_ip()
if current_ip == None:
current_ip = self.get_handsome_ip()
last_ip = self._load_ip()
if not current_ip:
return (False, False, None)
if current_ip != last_ip:
try:
if self._send_ip(current_ip):
self._save_ip(current_ip)
return (True, True, current_ip)
return (True, False, current_ip)
except Exception as e:
# print(f"[IPMonitor] 推送失败: {e}")
return (True, False, current_ip)
return (False, True, current_ip)
def _save_ip(self, ip):
"""内部方法保存IP到文件"""
try:
with open(self.ip_storage_file, 'w') as f:
f.write(ip)
except Exception as e:
# print(f"[IPMonitor] 保存IP失败: {e}")
pass
def _load_ip(self):
"""内部方法从文件加载IP"""
if not os.path.exists(self.ip_storage_file):
return None
try:
with open(self.ip_storage_file, 'r') as f:
return f.read().strip()
except Exception as e:
# print(f"[IPMonitor] 读取IP失败: {e}")
return None
# 示例使用方式
if __name__ == "__main__":
# 初始化监控器(可自定义参数)
monitor = IPMonitor()
# 示例1单次检查并推送
# changed, success, ip = monitor.check_and_push()
# print(f"\n检查结果: 变化={changed}, 成功={success}, IP={ip}")
# 示例2强制推送当前IP
# success, ip = monitor.push_current_ip()
# print(f"强制推送结果: 成功={success}, IP={ip}")
# 示例3定时任务集成
import schedule
def job():
monitor.check_and_push()
schedule.every(5).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)