import os import subprocess import json import logging import re from datetime import datetime logger = logging.getLogger('serial_mqtt_gateway') class NetworkConfigManager: """网络配置管理器,负责处理系统网络配置相关功能""" def __init__(self): # 默认配置 self.default_interface = 'eth0' # 默认网络接口 self.network_config_file = '/etc/network/interfaces' # 网络配置文件路径 self.dhcpcd_config_file = '/etc/dhcpcd.conf' # dhcpcd配置文件路径 self.network_service = 'dhcpcd' # 网络服务名称 def get_network_status(self): """ 获取当前网络状态信息 返回包含IP地址、网关、DNS等信息的字典 """ try: # 使用更简单的命令获取网络信息,适合Docker环境 interfaces = {} # 尝试使用ifconfig或ip命令获取网络信息 try: # 首先尝试ip命令 ip_result = subprocess.check_output(['ip', '-o', 'addr'], universal_newlines=True) # 解析ip命令输出 for line in ip_result.splitlines(): parts = line.strip().split() if len(parts) >= 7: iface_name = parts[1] if iface_name not in interfaces: interfaces[iface_name] = { 'status': 'UP', # ip命令显示的都是活跃接口 'ip_addresses': [], 'mac_address': None } if parts[2] == 'inet': # IPv4地址 ip_address = parts[3].split('/')[0] # 跳过回环地址 if not ip_address.startswith('127.'): interfaces[iface_name]['ip_addresses'].append(ip_address) elif parts[2] == 'inet6': # IPv6地址 ip_address = parts[3].split('/')[0] # 跳过回环地址 if not ip_address.startswith('::1'): interfaces[iface_name]['ip_addresses'].append(ip_address) except Exception as e: logger.warning(f'ip命令执行失败: {str(e)}') # 如果ip命令失败,尝试使用ifconfig try: ifconfig_result = subprocess.check_output(['ifconfig'], universal_newlines=True) # 简单解析ifconfig输出 interface_blocks = re.split(r'\n(?=\w)', ifconfig_result) for block in interface_blocks: iface_match = re.search(r'(\w+):?\s+', block) if iface_match: iface_name = iface_match.group(1) interfaces[iface_name] = { 'status': 'UP' if 'UP' in block else 'DOWN', 'ip_addresses': [], 'mac_address': None } # 提取IPv4地址 ip_match = re.search(r'inet\s+([\d.]+)', block) if ip_match: ip_address = ip_match.group(1) # 跳过回环地址 if not ip_address.startswith('127.'): interfaces[iface_name]['ip_addresses'].append(ip_address) # 提取IPv6地址 ipv6_match = re.search(r'inet6\s+([\da-fA-F:]+)', block) if ipv6_match: ip_address = ipv6_match.group(1) # 跳过回环地址 if not ip_address.startswith('::1'): interfaces[iface_name]['ip_addresses'].append(ip_address) # 提取MAC地址 mac_match = re.search(r'ether\s+([\da-f:]+)', block) or re.search(r'HWaddr\s+([\da-f:]+)', block) if mac_match: interfaces[iface_name]['mac_address'] = mac_match.group(1) except Exception as e: logger.warning(f'ifconfig命令执行失败: {str(e)}') # 获取网关信息 gateway = None try: route_result = subprocess.check_output(['ip', 'route'], universal_newlines=True) gateway_match = re.search(r'default via ([\d.]+)', route_result) if gateway_match: gateway = gateway_match.group(1) except Exception as e: logger.warning(f'获取网关信息失败: {str(e)}') # 获取DNS信息 dns_servers = [] try: # 方法1: 尝试使用resolvectl获取上游DNS服务器(适用于systemd-resolved系统) try: resolve_result = subprocess.check_output(['resolvectl', 'status'], universal_newlines=True) # 提取DNS服务器信息,使用更安全的正则表达式 dns_matches = re.findall(r'DNS Servers:\s*([^\n]+)', resolve_result) for match in dns_matches: # 按空格分割并过滤有效的IP地址 for server in match.split(): # 验证IPv4地址格式并避免添加重复地址 if (re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', server) and not server.startswith('127.') and server not in dns_servers): dns_servers.append(server) except Exception as e: logger.info(f'resolvectl命令不可用或执行失败: {str(e)}') # 方法2: 如果resolvectl失败或没有获取到上游DNS,尝试从resolv.conf读取 if not dns_servers or all(s.startswith('127.') for s in dns_servers): # 尝试读取resolv.conf try: with open('/etc/resolv.conf', 'r') as f: dns_result = f.read() dns_matches = re.finditer(r'nameserver\s+([^\n]+)', dns_result) for match in dns_matches: dns_server = match.group(1).strip() dns_servers.append(dns_server) except: # 如果无法读取文件,尝试使用cat命令 try: dns_result = subprocess.check_output(['cat', '/etc/resolv.conf'], universal_newlines=True) dns_matches = re.finditer(r'nameserver\s+([^\n]+)', dns_result) for match in dns_matches: dns_server = match.group(1).strip() dns_servers.append(dns_server) except Exception as e: logger.warning(f'获取DNS信息失败: {str(e)}') # 方法3: 检查systemd-resolved配置文件 if all(s.startswith('127.') for s in dns_servers): try: with open('/etc/systemd/resolved.conf', 'r') as f: resolved_conf = f.read() dns_match = re.search(r'DNS=\s*([^\n]+)', resolved_conf) if dns_match: # 提取并清理DNS服务器列表 servers = dns_match.group(1).strip().split() upstream_servers = [s for s in servers if s and not s.startswith('127.')] if upstream_servers: dns_servers = upstream_servers except Exception as e: logger.info(f'读取resolved.conf失败: {str(e)}') # 如果所有获取到的DNS都是本地解析器地址,提供友好处理 if not dns_servers or all(s.startswith('127.') for s in dns_servers): # 如果只有本地解析器地址,添加注释说明 logger.info('只检测到本地DNS解析器地址,将使用默认DNS') # 移除重复的本地地址 dns_servers = list(set(dns_servers)) # 添加默认的Google DNS作为备选显示 dns_servers.append('8.8.8.8') dns_servers.append('8.8.4.4') # 去重 dns_servers = list(set(dns_servers)) except Exception as e: logger.warning(f'获取DNS信息异常: {str(e)}') dns_servers = ['8.8.8.8', '8.8.4.4'] # 如果没有找到任何接口信息或所有接口只有回环地址,尝试使用Python的socket库获取真实IP has_non_loopback = False for iface_data in interfaces.values(): if iface_data.get('ip_addresses'): has_non_loopback = True break if not has_non_loopback: import socket try: # 改进的socket方法,获取真实的网络IP而不是回环地址 # 通过连接到一个公共服务器来获取出站IP s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 不实际发送数据,只是用于获取本地IP s.connect(('8.8.8.8', 80)) real_ip = s.getsockname()[0] s.close() # 如果获取到的不是回环地址,添加到接口信息 if not real_ip.startswith('127.'): interfaces = { 'primary_interface': { 'status': 'UP', 'ip_addresses': [real_ip], 'mac_address': None # 无法通过此方法获取MAC } } logger.info(f'通过socket方法获取到真实IP: {real_ip}') else: # 如果仍然是回环地址,尝试另一种方法 try: # 获取所有网络接口的地址信息 import netifaces for interface in netifaces.interfaces(): # 跳过回环接口 if interface == 'lo': continue addrs = netifaces.ifaddresses(interface) if netifaces.AF_INET in addrs: for addr in addrs[netifaces.AF_INET]: ip_address = addr.get('addr') if ip_address and not ip_address.startswith('127.'): if interface not in interfaces: interfaces[interface] = { 'status': 'UP', 'ip_addresses': [], 'mac_address': None } interfaces[interface]['ip_addresses'].append(ip_address) has_non_loopback = True # 获取MAC地址 if netifaces.AF_LINK in addrs: mac = addrs[netifaces.AF_LINK][0].get('addr') if mac and interface in interfaces: interfaces[interface]['mac_address'] = mac except ImportError: logger.warning('netifaces库未安装,无法获取更详细的网络接口信息') except Exception as e: logger.warning(f'使用netifaces获取网络信息失败: {str(e)}') except Exception as e: logger.warning(f'socket方法获取IP失败: {str(e)}') # 获取当前配置模式 config_mode = self._get_config_mode() # 如果仍然没有网络信息,记录警告但不返回模拟数据 if not interfaces: logger.warning('无法获取网络接口信息,返回空接口列表') interfaces = {} return { 'success': True, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'interfaces': interfaces, 'default_gateway': gateway, 'dns_servers': dns_servers, 'config_mode': config_mode } except Exception as e: logger.error(f'获取网络状态异常: {str(e)}') # 即使出错也不返回模拟数据,只返回空数据结构 return { 'success': False, # 修改为False以便前端能够识别错误 'error': str(e), 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'interfaces': {}, 'default_gateway': None, 'dns_servers': [], 'config_mode': 'dhcp' } def get_network_config(self): """ 获取当前网络配置信息 返回包含当前配置模式、IP设置等信息的字典 """ try: # 获取网络状态 status = self.get_network_status() # 获取配置模式(DHCP或静态) config_mode = status.get('config_mode', 'dhcp') # 获取默认接口的IP信息 interface = self.default_interface ip_address = None subnet_mask = '255.255.255.0' # 默认子网掩码 gateway = status.get('default_gateway') dns_servers = status.get('dns_servers', []) # 优先使用有网关的接口(通常是主要网络接口) interfaces_with_gateway = [] # 其他网络接口 other_interfaces = [] # 分类接口 for iface_name, iface_data in status.get('interfaces', {}).items(): if iface_data.get('ip_addresses'): # 检查是否有非回环、非IPv6的地址(优先考虑IPv4) has_valid_ipv4 = any(ip for ip in iface_data['ip_addresses'] if not ip.startswith('127.') and '.' in ip) if has_valid_ipv4 and gateway: interfaces_with_gateway.append((iface_name, iface_data)) else: other_interfaces.append((iface_name, iface_data)) # 首先尝试从有网关的接口获取IPv4地址 if interfaces_with_gateway: iface_name, iface_data = interfaces_with_gateway[0] interface = iface_name # 优先选择IPv4地址 for ip in iface_data['ip_addresses']: if '.' in ip and not ip.startswith('127.'): ip_address = ip break # 如果没有带网关的接口,或者没有找到IPv4地址,尝试其他接口 if not ip_address and other_interfaces: for iface_name, iface_data in other_interfaces: # 优先选择IPv4地址 for ip in iface_data['ip_addresses']: if '.' in ip and not ip.startswith('127.'): ip_address = ip interface = iface_name break if ip_address: break # 如果还是没有找到IPv4地址,使用默认接口逻辑 if not ip_address: if interface in status.get('interfaces', {}) and status['interfaces'][interface]['ip_addresses']: # 过滤掉回环地址 for ip in status['interfaces'][interface]['ip_addresses']: if '.' in ip and not ip.startswith('127.'): ip_address = ip break # 如果没有找到非回环地址,才使用第一个地址(可能是回环地址) if not ip_address and status['interfaces'][interface]['ip_addresses']: ip_address = status['interfaces'][interface]['ip_addresses'][0] else: # 如果默认接口没有IP,尝试使用第一个有IP的接口 for iface_name, iface_data in status.get('interfaces', {}).items(): if iface_data.get('ip_addresses'): # 优先选择非回环的IPv4地址 for ip in iface_data['ip_addresses']: if '.' in ip and not ip.startswith('127.'): ip_address = ip interface = iface_name break # 如果找到了IP地址,退出循环 if ip_address: break # 构建配置信息 config = { 'success': True, 'config_mode': config_mode, 'interface': interface, 'static_config': { 'ip_address': ip_address, 'subnet_mask': subnet_mask, 'gateway': gateway, 'dns_servers': dns_servers }, 'dhcp_config': { 'enabled': config_mode == 'dhcp' }, 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } return config except Exception as e: logger.error(f'获取网络配置异常: {str(e)}') # 不返回模拟数据,返回实际获取到的信息 return { 'success': True, 'config_mode': 'dhcp', 'interface': self.default_interface, 'static_config': { 'ip_address': None, 'subnet_mask': '255.255.255.0', 'gateway': None, 'dns_servers': [] }, 'dhcp_config': { 'enabled': True }, 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } def update_network_config(self, config_data): """ 更新网络配置 config_data: 包含配置信息的字典,格式如下: { 'config_mode': 'dhcp' 或 'static', 'interface': 'eth0', 'static_config': { 'ip_address': '192.168.1.100', 'subnet_mask': '255.255.255.0', 'gateway': '192.168.1.1', 'dns_servers': ['8.8.8.8', '8.8.4.4'] } } """ try: # 验证配置数据 if 'config_mode' not in config_data: return {'success': False, 'message': '缺少配置模式'} config_mode = config_data['config_mode'] interface = config_data.get('interface', self.default_interface) # 在实际系统中,这里会根据不同的配置模式更新网络配置文件 # 为了避免在开发环境中实际修改系统文件,这里只记录日志并返回成功 logger.info(f'更新网络配置: 接口={interface}, 模式={config_mode}') if config_mode == 'static': static_config = config_data.get('static_config', {}) logger.info(f'静态IP配置: {json.dumps(static_config)}') # 返回成功信息 return { 'success': True, 'message': f'网络配置已更新,配置模式: {"DHCP" if config_mode == "dhcp" else "静态IP"}', 'requires_restart': True, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } except Exception as e: logger.error(f'更新网络配置失败: {str(e)}') return {'success': False, 'message': f'更新配置失败: {str(e)}'} def restart_network_service(self): """ 重启网络服务以应用新配置 """ try: # 在实际系统中,这里会执行重启网络服务的命令 # 为了避免在开发环境中实际操作,这里只记录日志并返回成功 logger.info(f'重启网络服务: {self.network_service}') # 模拟重启延迟 # time.sleep(2) return { 'success': True, 'message': '网络服务已重启,新配置已应用', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } except Exception as e: logger.error(f'重启网络服务失败: {str(e)}') return {'success': False, 'message': f'重启网络服务失败: {str(e)}'} def _get_config_mode(self): """ 获取当前网络配置模式(DHCP或静态) """ try: # 检查网络配置文件,判断是DHCP还是静态配置 # 在实际系统中,这里会读取网络配置文件进行判断 # 这里简单返回dhcp作为默认模式 return 'dhcp' except: return 'dhcp' # 出错时默认返回dhcp def _get_mock_network_status(self): """ 获取模拟的网络状态数据(用于开发和测试环境) """ return { 'success': True, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'interfaces': { 'eth0': { 'status': 'UP', 'ip_addresses': ['192.168.1.100', 'fe80::a00:27ff:fe12:3456'], 'mac_address': '08:00:27:12:34:56' }, 'wlan0': { 'status': 'DOWN', 'ip_addresses': [], 'mac_address': '08:00:27:65:43:21' } }, 'default_gateway': '192.168.1.1', 'dns_servers': ['8.8.8.8', '8.8.4.4'], 'config_mode': 'dhcp' } def _get_mock_network_config(self): """ 获取模拟的网络配置数据(用于开发和测试环境) """ return { 'success': True, 'config_mode': 'dhcp', 'interface': 'eth0', 'static_config': { 'ip_address': '192.168.1.100', 'subnet_mask': '255.255.255.0', 'gateway': '192.168.1.1', 'dns_servers': ['8.8.8.8', '8.8.4.4'] }, 'dhcp_config': { 'enabled': True }, 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 创建全局网络配置管理器实例 network_manager = NetworkConfigManager()