import subprocess import re import requests import json import time import os from retrying import retry import ctypes ctypes.windll.user32.ShowWindow(ctypes.windll.kernel32.GetConsoleWindow(), 0) # 强制隐藏 class IPMonitor: """ IP监控与推送服务 功能: 1. 自动检测hundsun.com局域网的IP变化 2. IP变化时自动推送到指定服务器 3. 支持失败重试和超时设置 """ def __init__( self, server_url="http://10.20.163.101:5535/ldp?Pretty=1&PluginName=ldp_db_listen_plugin&FuncName=SetVueConfig", fixed_port="5173", ip_storage_file="last_ip.txt", request_timeout=5 ): """ 初始化监控器 :param server_url: 推送目标URL :param fixed_port: 固定端口号 :param ip_storage_file: IP存储文件路径 :param request_timeout: 请求超时时间(秒) """ self.server_url = server_url self.fixed_port = fixed_port self.ip_storage_file = ip_storage_file self.request_timeout = request_timeout def get_hundsun_ip(self): """获取hundsun.com局域网的当前IP""" try: result = subprocess.run( ['ipconfig', '/all'], capture_output=True, text=True, check=True ) # pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(\d+\.\d+\.\d+\.\d+)' # 修改后的正确模式(匹配127.24开头的IP) pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(172\.24\.\d+\.\d+)' match = re.search(pattern, result.stdout, re.DOTALL) return match.group(1) if match else None except Exception as e: # print(f"[IPMonitor] 获取IP失败: {e}") return None def get_handsome_ip(self): """获取hs.handsome.com.cn局域网的当前IP""" try: result = subprocess.run( ['ipconfig', '/all'], capture_output=True, text=True, check=True ) # 修改模式以匹配hs.handsome.com.cn后缀和IPv4地址 pattern = r'hs\.handsome\.com\.cn.*?IPv4 地址[^\d]*(10\.\d+\.\d+\.\d+)' match = re.search(pattern, result.stdout, re.DOTALL) return match.group(1) if match else None except Exception as e: # print(f"[IPMonitor] 获取IP失败: {e}") return None @retry(stop_max_attempt_number=3, wait_fixed=2000) def _send_ip(self, ip_address): """内部方法:发送IP到服务器(自动重试)""" headers = {'Content-Type': 'application/json'} data = { "Request": { "vueHost": ip_address, "vuePort": self.fixed_port } } try: response = requests.post( self.server_url, headers=headers, data=json.dumps(data), timeout=self.request_timeout ) if not response.ok: raise Exception(f"服务器返回 {response.status_code}: {response.text}") return True except requests.exceptions.RequestException as e: raise Exception(f"网络请求失败: {e}") def push_current_ip(self): """ 强制推送当前IP到服务器 :return: (success, ip) 元组 """ current_ip = self.get_hundsun_ip() if not current_ip: return (False, None) try: if self._send_ip(current_ip): self._save_ip(current_ip) return (True, current_ip) except Exception as e: # print(f"[IPMonitor] 推送失败: {e}") pass return (False, current_ip) def check_and_push(self): """ 检查IP变化并自动推送 :return: (changed, success, ip) 元组 """ current_ip = self.get_hundsun_ip() if current_ip == None: current_ip = self.get_handsome_ip() last_ip = self._load_ip() if not current_ip: return (False, False, None) if current_ip != last_ip: try: if self._send_ip(current_ip): self._save_ip(current_ip) return (True, True, current_ip) return (True, False, current_ip) except Exception as e: # print(f"[IPMonitor] 推送失败: {e}") return (True, False, current_ip) return (False, True, current_ip) def _save_ip(self, ip): """内部方法:保存IP到文件""" try: with open(self.ip_storage_file, 'w') as f: f.write(ip) except Exception as e: # print(f"[IPMonitor] 保存IP失败: {e}") pass def _load_ip(self): """内部方法:从文件加载IP""" if not os.path.exists(self.ip_storage_file): return None try: with open(self.ip_storage_file, 'r') as f: return f.read().strip() except Exception as e: # print(f"[IPMonitor] 读取IP失败: {e}") return None # 示例使用方式 if __name__ == "__main__": # 初始化监控器(可自定义参数) monitor = IPMonitor() # 示例1:单次检查并推送 # changed, success, ip = monitor.check_and_push() # print(f"\n检查结果: 变化={changed}, 成功={success}, IP={ip}") # 示例2:强制推送当前IP # success, ip = monitor.push_current_ip() # print(f"强制推送结果: 成功={success}, IP={ip}") # 示例3:定时任务集成 import schedule def job(): monitor.check_and_push() schedule.every(5).minutes.do(job) while True: schedule.run_pending() time.sleep(1)