182 lines
6.0 KiB
Python
182 lines
6.0 KiB
Python
|
||
import subprocess
|
||
import re
|
||
import requests
|
||
import json
|
||
import time
|
||
import os
|
||
from retrying import retry
|
||
import ctypes
|
||
ctypes.windll.user32.ShowWindow(ctypes.windll.kernel32.GetConsoleWindow(), 0) # 强制隐藏
|
||
|
||
class IPMonitor:
|
||
"""
|
||
IP监控与推送服务
|
||
功能:
|
||
1. 自动检测hundsun.com局域网的IP变化
|
||
2. IP变化时自动推送到指定服务器
|
||
3. 支持失败重试和超时设置
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
server_url="http://10.20.163.101:5535/ldp?Pretty=1&PluginName=ldp_db_listen_plugin&FuncName=SetVueConfig",
|
||
fixed_port="5173",
|
||
ip_storage_file="last_ip.txt",
|
||
request_timeout=5
|
||
):
|
||
"""
|
||
初始化监控器
|
||
:param server_url: 推送目标URL
|
||
:param fixed_port: 固定端口号
|
||
:param ip_storage_file: IP存储文件路径
|
||
:param request_timeout: 请求超时时间(秒)
|
||
"""
|
||
self.server_url = server_url
|
||
self.fixed_port = fixed_port
|
||
self.ip_storage_file = ip_storage_file
|
||
self.request_timeout = request_timeout
|
||
|
||
def get_hundsun_ip(self):
|
||
"""获取hundsun.com局域网的当前IP"""
|
||
try:
|
||
result = subprocess.run(
|
||
['ipconfig', '/all'],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
# pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(\d+\.\d+\.\d+\.\d+)'
|
||
# 修改后的正确模式(匹配127.24开头的IP)
|
||
pattern = r'hundsun\.com.*?IPv4 地址[^\d]*(172\.24\.\d+\.\d+)'
|
||
match = re.search(pattern, result.stdout, re.DOTALL)
|
||
return match.group(1) if match else None
|
||
except Exception as e:
|
||
# print(f"[IPMonitor] 获取IP失败: {e}")
|
||
return None
|
||
|
||
def get_handsome_ip(self):
|
||
"""获取hs.handsome.com.cn局域网的当前IP"""
|
||
try:
|
||
result = subprocess.run(
|
||
['ipconfig', '/all'],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
# 修改模式以匹配hs.handsome.com.cn后缀和IPv4地址
|
||
pattern = r'hs\.handsome\.com\.cn.*?IPv4 地址[^\d]*(10\.\d+\.\d+\.\d+)'
|
||
match = re.search(pattern, result.stdout, re.DOTALL)
|
||
return match.group(1) if match else None
|
||
except Exception as e:
|
||
# print(f"[IPMonitor] 获取IP失败: {e}")
|
||
return None
|
||
|
||
@retry(stop_max_attempt_number=3, wait_fixed=2000)
|
||
def _send_ip(self, ip_address):
|
||
"""内部方法:发送IP到服务器(自动重试)"""
|
||
headers = {'Content-Type': 'application/json'}
|
||
data = {
|
||
"Request": {
|
||
"vueHost": ip_address,
|
||
"vuePort": self.fixed_port
|
||
}
|
||
}
|
||
try:
|
||
response = requests.post(
|
||
self.server_url,
|
||
headers=headers,
|
||
data=json.dumps(data),
|
||
timeout=self.request_timeout
|
||
)
|
||
if not response.ok:
|
||
raise Exception(f"服务器返回 {response.status_code}: {response.text}")
|
||
return True
|
||
except requests.exceptions.RequestException as e:
|
||
raise Exception(f"网络请求失败: {e}")
|
||
|
||
def push_current_ip(self):
|
||
"""
|
||
强制推送当前IP到服务器
|
||
:return: (success, ip) 元组
|
||
"""
|
||
current_ip = self.get_hundsun_ip()
|
||
if not current_ip:
|
||
return (False, None)
|
||
|
||
try:
|
||
if self._send_ip(current_ip):
|
||
self._save_ip(current_ip)
|
||
return (True, current_ip)
|
||
except Exception as e:
|
||
# print(f"[IPMonitor] 推送失败: {e}")
|
||
pass
|
||
return (False, current_ip)
|
||
|
||
def check_and_push(self):
|
||
"""
|
||
检查IP变化并自动推送
|
||
:return: (changed, success, ip) 元组
|
||
"""
|
||
current_ip = self.get_hundsun_ip()
|
||
if current_ip == None:
|
||
current_ip = self.get_handsome_ip()
|
||
last_ip = self._load_ip()
|
||
|
||
if not current_ip:
|
||
return (False, False, None)
|
||
|
||
if current_ip != last_ip:
|
||
try:
|
||
if self._send_ip(current_ip):
|
||
self._save_ip(current_ip)
|
||
return (True, True, current_ip)
|
||
return (True, False, current_ip)
|
||
except Exception as e:
|
||
# print(f"[IPMonitor] 推送失败: {e}")
|
||
return (True, False, current_ip)
|
||
return (False, True, current_ip)
|
||
|
||
def _save_ip(self, ip):
|
||
"""内部方法:保存IP到文件"""
|
||
try:
|
||
with open(self.ip_storage_file, 'w') as f:
|
||
f.write(ip)
|
||
except Exception as e:
|
||
# print(f"[IPMonitor] 保存IP失败: {e}")
|
||
pass
|
||
|
||
def _load_ip(self):
|
||
"""内部方法:从文件加载IP"""
|
||
if not os.path.exists(self.ip_storage_file):
|
||
return None
|
||
try:
|
||
with open(self.ip_storage_file, 'r') as f:
|
||
return f.read().strip()
|
||
except Exception as e:
|
||
# print(f"[IPMonitor] 读取IP失败: {e}")
|
||
return None
|
||
|
||
|
||
# 示例使用方式
|
||
if __name__ == "__main__":
|
||
# 初始化监控器(可自定义参数)
|
||
monitor = IPMonitor()
|
||
|
||
# 示例1:单次检查并推送
|
||
# changed, success, ip = monitor.check_and_push()
|
||
# print(f"\n检查结果: 变化={changed}, 成功={success}, IP={ip}")
|
||
|
||
# 示例2:强制推送当前IP
|
||
# success, ip = monitor.push_current_ip()
|
||
# print(f"强制推送结果: 成功={success}, IP={ip}")
|
||
|
||
# 示例3:定时任务集成
|
||
import schedule
|
||
def job():
|
||
monitor.check_and_push()
|
||
schedule.every(5).minutes.do(job)
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|