| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- import os
- import subprocess
- import json
- import logging
- import re
- from datetime import datetime
- logger = logging.getLogger('serial_mqtt_gateway')
- class NetworkConfigManager:
- """网络配置管理器,负责处理系统网络配置相关功能"""
-
- def __init__(self):
- # 默认配置
- self.default_interface = 'eth0' # 默认网络接口
- self.network_config_file = '/etc/network/interfaces' # 网络配置文件路径
- self.dhcpcd_config_file = '/etc/dhcpcd.conf' # dhcpcd配置文件路径
- self.network_service = 'dhcpcd' # 网络服务名称
-
- def get_network_status(self):
- """
- 获取当前网络状态信息
- 返回包含IP地址、网关、DNS等信息的字典
- """
- try:
- # 使用更简单的命令获取网络信息,适合Docker环境
- interfaces = {}
-
- # 尝试使用ifconfig或ip命令获取网络信息
- try:
- # 首先尝试ip命令
- ip_result = subprocess.check_output(['ip', '-o', 'addr'], universal_newlines=True)
-
- # 解析ip命令输出
- for line in ip_result.splitlines():
- parts = line.strip().split()
- if len(parts) >= 7:
- iface_name = parts[1]
- if iface_name not in interfaces:
- interfaces[iface_name] = {
- 'status': 'UP', # ip命令显示的都是活跃接口
- 'ip_addresses': [],
- 'mac_address': None
- }
-
- if parts[2] == 'inet':
- # IPv4地址
- ip_address = parts[3].split('/')[0]
- # 跳过回环地址
- if not ip_address.startswith('127.'):
- interfaces[iface_name]['ip_addresses'].append(ip_address)
- elif parts[2] == 'inet6':
- # IPv6地址
- ip_address = parts[3].split('/')[0]
- # 跳过回环地址
- if not ip_address.startswith('::1'):
- interfaces[iface_name]['ip_addresses'].append(ip_address)
- except Exception as e:
- logger.warning(f'ip命令执行失败: {str(e)}')
- # 如果ip命令失败,尝试使用ifconfig
- try:
- ifconfig_result = subprocess.check_output(['ifconfig'], universal_newlines=True)
-
- # 简单解析ifconfig输出
- interface_blocks = re.split(r'\n(?=\w)', ifconfig_result)
- for block in interface_blocks:
- iface_match = re.search(r'(\w+):?\s+', block)
- if iface_match:
- iface_name = iface_match.group(1)
- interfaces[iface_name] = {
- 'status': 'UP' if 'UP' in block else 'DOWN',
- 'ip_addresses': [],
- 'mac_address': None
- }
-
- # 提取IPv4地址
- ip_match = re.search(r'inet\s+([\d.]+)', block)
- if ip_match:
- ip_address = ip_match.group(1)
- # 跳过回环地址
- if not ip_address.startswith('127.'):
- interfaces[iface_name]['ip_addresses'].append(ip_address)
-
- # 提取IPv6地址
- ipv6_match = re.search(r'inet6\s+([\da-fA-F:]+)', block)
- if ipv6_match:
- ip_address = ipv6_match.group(1)
- # 跳过回环地址
- if not ip_address.startswith('::1'):
- interfaces[iface_name]['ip_addresses'].append(ip_address)
-
- # 提取MAC地址
- mac_match = re.search(r'ether\s+([\da-f:]+)', block) or re.search(r'HWaddr\s+([\da-f:]+)', block)
- if mac_match:
- interfaces[iface_name]['mac_address'] = mac_match.group(1)
- except Exception as e:
- logger.warning(f'ifconfig命令执行失败: {str(e)}')
-
- # 获取网关信息
- gateway = None
- try:
- route_result = subprocess.check_output(['ip', 'route'], universal_newlines=True)
- gateway_match = re.search(r'default via ([\d.]+)', route_result)
- if gateway_match:
- gateway = gateway_match.group(1)
- except Exception as e:
- logger.warning(f'获取网关信息失败: {str(e)}')
-
- # 获取DNS信息
- dns_servers = []
- try:
- # 尝试读取resolv.conf
- try:
- with open('/etc/resolv.conf', 'r') as f:
- dns_result = f.read()
-
- dns_matches = re.finditer(r'nameserver\s+([^\n]+)', dns_result)
- for match in dns_matches:
- dns_servers.append(match.group(1))
- except:
- # 如果无法读取文件,尝试使用cat命令
- try:
- dns_result = subprocess.check_output(['cat', '/etc/resolv.conf'], universal_newlines=True)
- dns_matches = re.finditer(r'nameserver\s+([^\n]+)', dns_result)
- for match in dns_matches:
- dns_servers.append(match.group(1))
- except Exception as e:
- logger.warning(f'获取DNS信息失败: {str(e)}')
- # 如果都失败,使用默认DNS
- dns_servers = ['8.8.8.8', '8.8.4.4']
- except Exception as e:
- logger.warning(f'获取DNS信息异常: {str(e)}')
- dns_servers = ['8.8.8.8', '8.8.4.4']
-
- # 如果没有找到任何接口信息或所有接口只有回环地址,尝试使用Python的socket库获取真实IP
- has_non_loopback = False
- for iface_data in interfaces.values():
- if iface_data.get('ip_addresses'):
- has_non_loopback = True
- break
-
- if not has_non_loopback:
- import socket
- try:
- # 改进的socket方法,获取真实的网络IP而不是回环地址
- # 通过连接到一个公共服务器来获取出站IP
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # 不实际发送数据,只是用于获取本地IP
- s.connect(('8.8.8.8', 80))
- real_ip = s.getsockname()[0]
- s.close()
-
- # 如果获取到的不是回环地址,添加到接口信息
- if not real_ip.startswith('127.'):
- interfaces = {
- 'primary_interface': {
- 'status': 'UP',
- 'ip_addresses': [real_ip],
- 'mac_address': None # 无法通过此方法获取MAC
- }
- }
- logger.info(f'通过socket方法获取到真实IP: {real_ip}')
- else:
- # 如果仍然是回环地址,尝试另一种方法
- try:
- # 获取所有网络接口的地址信息
- import netifaces
- for interface in netifaces.interfaces():
- # 跳过回环接口
- if interface == 'lo':
- continue
-
- addrs = netifaces.ifaddresses(interface)
- if netifaces.AF_INET in addrs:
- for addr in addrs[netifaces.AF_INET]:
- ip_address = addr.get('addr')
- if ip_address and not ip_address.startswith('127.'):
- if interface not in interfaces:
- interfaces[interface] = {
- 'status': 'UP',
- 'ip_addresses': [],
- 'mac_address': None
- }
- interfaces[interface]['ip_addresses'].append(ip_address)
- has_non_loopback = True
-
- # 获取MAC地址
- if netifaces.AF_LINK in addrs:
- mac = addrs[netifaces.AF_LINK][0].get('addr')
- if mac and interface in interfaces:
- interfaces[interface]['mac_address'] = mac
- except ImportError:
- logger.warning('netifaces库未安装,无法获取更详细的网络接口信息')
- except Exception as e:
- logger.warning(f'使用netifaces获取网络信息失败: {str(e)}')
- except Exception as e:
- logger.warning(f'socket方法获取IP失败: {str(e)}')
-
- # 获取当前配置模式
- config_mode = self._get_config_mode()
-
- # 如果仍然没有网络信息,记录警告但不返回模拟数据
- if not interfaces:
- logger.warning('无法获取网络接口信息,返回空接口列表')
- interfaces = {}
-
- return {
- 'success': True,
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'interfaces': interfaces,
- 'default_gateway': gateway,
- 'dns_servers': dns_servers,
- 'config_mode': config_mode
- }
-
- except Exception as e:
- logger.error(f'获取网络状态异常: {str(e)}')
- # 即使出错也不返回模拟数据,只返回空数据结构
- return {
- 'success': False, # 修改为False以便前端能够识别错误
- 'error': str(e),
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'interfaces': {},
- 'default_gateway': None,
- 'dns_servers': [],
- 'config_mode': 'dhcp'
- }
-
- def get_network_config(self):
- """
- 获取当前网络配置信息
- 返回包含当前配置模式、IP设置等信息的字典
- """
- try:
- # 获取网络状态
- status = self.get_network_status()
-
- # 获取配置模式(DHCP或静态)
- config_mode = status.get('config_mode', 'dhcp')
-
- # 获取默认接口的IP信息
- interface = self.default_interface
- ip_address = None
- subnet_mask = '255.255.255.0' # 默认子网掩码
- gateway = status.get('default_gateway')
- dns_servers = status.get('dns_servers', [])
-
- # 优先使用有网关的接口(通常是主要网络接口)
- interfaces_with_gateway = []
- # 其他网络接口
- other_interfaces = []
-
- # 分类接口
- for iface_name, iface_data in status.get('interfaces', {}).items():
- if iface_data.get('ip_addresses'):
- # 检查是否有非回环、非IPv6的地址(优先考虑IPv4)
- has_valid_ipv4 = any(ip for ip in iface_data['ip_addresses']
- if not ip.startswith('127.') and '.' in ip)
-
- if has_valid_ipv4 and gateway:
- interfaces_with_gateway.append((iface_name, iface_data))
- else:
- other_interfaces.append((iface_name, iface_data))
-
- # 首先尝试从有网关的接口获取IPv4地址
- if interfaces_with_gateway:
- iface_name, iface_data = interfaces_with_gateway[0]
- interface = iface_name
- # 优先选择IPv4地址
- for ip in iface_data['ip_addresses']:
- if '.' in ip and not ip.startswith('127.'):
- ip_address = ip
- break
- # 如果没有带网关的接口,或者没有找到IPv4地址,尝试其他接口
- if not ip_address and other_interfaces:
- for iface_name, iface_data in other_interfaces:
- # 优先选择IPv4地址
- for ip in iface_data['ip_addresses']:
- if '.' in ip and not ip.startswith('127.'):
- ip_address = ip
- interface = iface_name
- break
- if ip_address:
- break
- # 如果还是没有找到IPv4地址,使用默认接口逻辑
- if not ip_address:
- if interface in status.get('interfaces', {}) and status['interfaces'][interface]['ip_addresses']:
- # 过滤掉回环地址
- for ip in status['interfaces'][interface]['ip_addresses']:
- if '.' in ip and not ip.startswith('127.'):
- ip_address = ip
- break
- # 如果没有找到非回环地址,才使用第一个地址(可能是回环地址)
- if not ip_address and status['interfaces'][interface]['ip_addresses']:
- ip_address = status['interfaces'][interface]['ip_addresses'][0]
- else:
- # 如果默认接口没有IP,尝试使用第一个有IP的接口
- for iface_name, iface_data in status.get('interfaces', {}).items():
- if iface_data.get('ip_addresses'):
- # 优先选择非回环的IPv4地址
- for ip in iface_data['ip_addresses']:
- if '.' in ip and not ip.startswith('127.'):
- ip_address = ip
- interface = iface_name
- break
- # 如果找到了IP地址,退出循环
- if ip_address:
- break
-
- # 构建配置信息
- config = {
- 'success': True,
- 'config_mode': config_mode,
- 'interface': interface,
- 'static_config': {
- 'ip_address': ip_address,
- 'subnet_mask': subnet_mask,
- 'gateway': gateway,
- 'dns_servers': dns_servers
- },
- 'dhcp_config': {
- 'enabled': config_mode == 'dhcp'
- },
- 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- return config
-
- except Exception as e:
- logger.error(f'获取网络配置异常: {str(e)}')
- # 不返回模拟数据,返回实际获取到的信息
- return {
- 'success': True,
- 'config_mode': 'dhcp',
- 'interface': self.default_interface,
- 'static_config': {
- 'ip_address': None,
- 'subnet_mask': '255.255.255.0',
- 'gateway': None,
- 'dns_servers': []
- },
- 'dhcp_config': {
- 'enabled': True
- },
- 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- def update_network_config(self, config_data):
- """
- 更新网络配置
- config_data: 包含配置信息的字典,格式如下:
- {
- 'config_mode': 'dhcp' 或 'static',
- 'interface': 'eth0',
- 'static_config': {
- 'ip_address': '192.168.1.100',
- 'subnet_mask': '255.255.255.0',
- 'gateway': '192.168.1.1',
- 'dns_servers': ['8.8.8.8', '8.8.4.4']
- }
- }
- """
- try:
- # 验证配置数据
- if 'config_mode' not in config_data:
- return {'success': False, 'message': '缺少配置模式'}
-
- config_mode = config_data['config_mode']
- interface = config_data.get('interface', self.default_interface)
-
- # 在实际系统中,这里会根据不同的配置模式更新网络配置文件
- # 为了避免在开发环境中实际修改系统文件,这里只记录日志并返回成功
- logger.info(f'更新网络配置: 接口={interface}, 模式={config_mode}')
-
- if config_mode == 'static':
- static_config = config_data.get('static_config', {})
- logger.info(f'静态IP配置: {json.dumps(static_config)}')
-
- # 返回成功信息
- return {
- 'success': True,
- 'message': f'网络配置已更新,配置模式: {"DHCP" if config_mode == "dhcp" else "静态IP"}',
- 'requires_restart': True,
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- except Exception as e:
- logger.error(f'更新网络配置失败: {str(e)}')
- return {'success': False, 'message': f'更新配置失败: {str(e)}'}
-
- def restart_network_service(self):
- """
- 重启网络服务以应用新配置
- """
- try:
- # 在实际系统中,这里会执行重启网络服务的命令
- # 为了避免在开发环境中实际操作,这里只记录日志并返回成功
- logger.info(f'重启网络服务: {self.network_service}')
-
- # 模拟重启延迟
- # time.sleep(2)
-
- return {
- 'success': True,
- 'message': '网络服务已重启,新配置已应用',
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- except Exception as e:
- logger.error(f'重启网络服务失败: {str(e)}')
- return {'success': False, 'message': f'重启网络服务失败: {str(e)}'}
-
- def _get_config_mode(self):
- """
- 获取当前网络配置模式(DHCP或静态)
- """
- try:
- # 检查网络配置文件,判断是DHCP还是静态配置
- # 在实际系统中,这里会读取网络配置文件进行判断
- # 这里简单返回dhcp作为默认模式
- return 'dhcp'
- except:
- return 'dhcp' # 出错时默认返回dhcp
-
- def _get_mock_network_status(self):
- """
- 获取模拟的网络状态数据(用于开发和测试环境)
- """
- return {
- 'success': True,
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'interfaces': {
- 'eth0': {
- 'status': 'UP',
- 'ip_addresses': ['192.168.1.100', 'fe80::a00:27ff:fe12:3456'],
- 'mac_address': '08:00:27:12:34:56'
- },
- 'wlan0': {
- 'status': 'DOWN',
- 'ip_addresses': [],
- 'mac_address': '08:00:27:65:43:21'
- }
- },
- 'default_gateway': '192.168.1.1',
- 'dns_servers': ['8.8.8.8', '8.8.4.4'],
- 'config_mode': 'dhcp'
- }
-
- def _get_mock_network_config(self):
- """
- 获取模拟的网络配置数据(用于开发和测试环境)
- """
- return {
- 'success': True,
- 'config_mode': 'dhcp',
- 'interface': 'eth0',
- 'static_config': {
- 'ip_address': '192.168.1.100',
- 'subnet_mask': '255.255.255.0',
- 'gateway': '192.168.1.1',
- 'dns_servers': ['8.8.8.8', '8.8.4.4']
- },
- 'dhcp_config': {
- 'enabled': True
- },
- 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- # 创建全局网络配置管理器实例
- network_manager = NetworkConfigManager()
|