""" DHT11 温湿度传感器本地读取模块 通过 libgpiod 操作 GPIO,定时读取 DHT11 数据并上报到应用层。 支持真实传感器读取和模拟模式(用于无硬件环境测试)。 """ import os import time import json import logging import threading import subprocess logger = logging.getLogger('dht11_sensor') # 尝试导入 gpiod;如果缺失,真实读取模式会不可用,但模拟模式仍可运行 try: import gpiod _GPIOD_AVAILABLE = True # 简单判断 gpiod 1.x/2.x:1.x 有 Chip/get_line,2.x 没有 get_line _GPIOD_V1 = hasattr(gpiod, 'Chip') and hasattr(gpiod.Chip, 'get_line') _GPIOD_V2 = hasattr(gpiod, 'Chip') and not _GPIOD_V1 if _GPIOD_V2: logger.warning("检测到 gpiod 2.x,当前 DHT11 驱动使用 gpiod 1.x API;" "请在目标设备使用系统包管理器安装 python3-libgpiod (1.x)") _GPIOD_AVAILABLE = False except Exception as _e: # pragma: no cover gpiod = None _GPIOD_AVAILABLE = False logger.warning(f"gpiod 库未安装或加载失败,DHT11 真实读取不可用: {_e}") class DHT11Sensor: """ DHT11 传感器读取器。 参数: gpio_pin: GPIO line 编号(在 gpiochip0 上)。 例如 Rockchip 的 GPIO2_A0 通常对应 sysfs 编号 64, 但具体编号取决于内核 gpio 描述。请按实际接线配置。 gpio_chip: GPIO chip 名称,默认 'gpiochip0'。 poll_interval: 读取间隔(秒),默认 5。 simulate: 是否使用模拟数据(无真实硬件时测试用)。 on_data: 数据回调函数,签名 on_data(temperature, humidity) -> None。 """ def __init__(self, gpio_pin=None, gpio_chip='gpiochip0', poll_interval=5, simulate=False, on_data=None): self.gpio_pin = gpio_pin self.gpio_chip = gpio_chip self.poll_interval = poll_interval self.simulate = simulate self.on_data = on_data self._running = False self._thread = None self._last_error = None self._last_success_time = None def start(self): """启动后台读取线程。""" if self._running: return self._running = True self._thread = threading.Thread(target=self._read_loop, daemon=True) self._thread.start() mode = '模拟' if self.simulate else '真实' logger.info(f"DHT11 读取线程已启动 (模式={mode}, pin={self.gpio_pin}, chip={self.gpio_chip}, 间隔={self.poll_interval}s)") def stop(self): """停止后台读取线程。""" self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=2) logger.info("DHT11 读取线程已停止") def get_status(self): """返回当前传感器状态。""" return { 'running': self._running, 'simulate': self.simulate, 'gpio_pin': self.gpio_pin, 'gpio_chip': self.gpio_chip, 'poll_interval': self.poll_interval, 'last_success_time': self._last_success_time, 'last_error': self._last_error } def _read_loop(self): """后台循环读取。""" # 首次启动稍等片刻,让系统其他初始化完成 time.sleep(1) while self._running: try: if self.simulate: temperature, humidity = self._read_simulated() else: temperature, humidity = self._read_real() self._last_success_time = time.strftime('%Y-%m-%d %H:%M:%S') self._last_error = None if self.on_data: try: self.on_data(temperature, humidity) except Exception as cb_err: logger.error(f"DHT11 数据回调异常: {cb_err}") else: logger.info(f"DHT11 读取成功: 温度={temperature}°C, 湿度={humidity}%") except Exception as e: self._last_error = str(e) logger.warning(f"DHT11 读取失败: {e}") # 按间隔休眠,拆分成小段以便快速退出 for _ in range(int(self.poll_interval * 2)): if not self._running: break time.sleep(0.5) def _read_simulated(self): """生成缓慢变化的模拟数据。""" t = time.time() # 温度在 20~30 度之间缓慢波动,湿度在 40~70% 之间缓慢波动 temperature = round(25 + 4 * ((t % 60) / 60 - 0.5), 1) humidity = round(55 + 14 * ((t % 120) / 120 - 0.5), 1) return temperature, humidity def _reader_binary_path(self): """返回 C 语言 DHT11 读取器可执行文件路径。""" # 优先使用与模块同目录下的 scripts/dht11_reader backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) candidates = [ os.path.join(backend_dir, 'scripts', 'dht11_reader'), '/root/dzxj_dtu/backend/scripts/dht11_reader', '/usr/local/bin/dht11_reader', ] for p in candidates: if os.path.isfile(p) and os.access(p, os.X_OK): return p return None def _read_with_binary(self): """ 调用 C 语言读取器获取温湿度。 该二进制使用 open-drain + 高速采样,可避开 Python 调度抖动。 """ binary = self._reader_binary_path() if not binary: raise RuntimeError("DHT11 C 读取器未找到,请确保 dht11_reader 已编译") pin = int(self.gpio_pin) chip = str(self.gpio_chip) cmd = [binary, chip, str(pin)] proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10 ) stderr_text = proc.stderr.strip() if stderr_text: for line in stderr_text.splitlines(): logger.debug(f"dht11_reader: {line}") if proc.returncode != 0: raise RuntimeError(f"dht11_reader 退出码 {proc.returncode}") try: result = json.loads(proc.stdout.strip().splitlines()[-1]) except Exception as e: raise RuntimeError(f"解析 dht11_reader 输出失败: {e}, stdout={proc.stdout!r}") if not result.get('valid'): raise RuntimeError(f"DHT11 校验和错误: raw={result.get('raw')}") return float(result['temperature']), float(result['humidity']) def _read_real(self): """ 读取一次 DHT11。 优先使用 C 语言读取器(时序更稳定),不可用时回退到纯 Python gpiod。 """ if self.gpio_pin is None: raise ValueError("未配置 DHT11 GPIO pin 编号") # 优先使用 C 二进制读取器 if self._reader_binary_path(): last_err = None for attempt in range(3): try: return self._read_with_binary() except Exception as e: last_err = e logger.debug(f"C 读取器尝试 {attempt + 1} 失败: {e}") time.sleep(2.5) # DHT11 两次读取间隔需 >2s logger.warning(f"C 读取器连续失败,回退 Python gpiod: {last_err}") if not _GPIOD_AVAILABLE: raise RuntimeError("gpiod 库不可用,无法读取 DHT11") # 纯 Python gpiod 回退 last_err = None for attempt in range(3): try: return self._read_once() except Exception as e: last_err = e time.sleep(0.2 + attempt * 0.1) raise last_err def _read_once(self): pin = int(self.gpio_pin) chip_name = str(self.gpio_chip) # 尝试提升实时优先级,减少读取期间的调度抖动 old_scheduler = None old_priority = None try: import os old_scheduler = os.sched_getscheduler(0) old_priority = os.sched_getparam(0) # SCHED_FIFO 优先级 50(范围 1~99),失败后静默回退 os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(50)) except Exception: pass chip = gpiod.Chip(chip_name) line = None try: line = chip.get_line(pin) # 安全:如果该 line 已被内核/其他驱动占用,跳过避免系统异常 if hasattr(line, 'is_used') and line.is_used(): raise RuntimeError(f"GPIO {pin} 已被系统占用,无法使用") # 阶段1: 主机起始信号 # DHT11 上电后需要 >1s 稳定时间,每次读取前保持高电平 2s line.request(consumer='dht11', type=gpiod.LINE_REQ_DIR_OUT, default_vals=[1]) line.set_value(1) time.sleep(2.0) line.set_value(0) time.sleep(0.030) # 拉低 30ms,兼容更多模块 line.set_value(1) time.sleep(0.00005) # 释放 50us line.release() # 阶段2: 切换为输入等待传感器响应 # gpiod 1.4.x 不一定支持 BIAS_PULL_UP,不支持则回退 try: line.request( consumer='dht11', type=gpiod.LINE_REQ_DIR_IN, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP ) except Exception: line.request(consumer='dht11', type=gpiod.LINE_REQ_DIR_IN) def wait_level(expected, timeout_us): """忙等待电平变化,返回是否成功。""" deadline = time.perf_counter() + timeout_us / 1_000_000.0 while line.get_value() != expected: if time.perf_counter() > deadline: return False return True # 等待 DHT 拉低(响应开始),超时放宽到 1ms if not wait_level(0, 1000): raise TimeoutError("等待 DHT 响应低电平超时") # 等待 DHT 拉高(响应结束) if not wait_level(1, 1000): raise TimeoutError("等待 DHT 响应高电平超时") # 等待 DHT 再次拉低(数据开始) if not wait_level(0, 1000): raise TimeoutError("等待 DHT 数据开始超时") # 阶段3: 读取 40bit 数据 bits = [] for _ in range(40): # 等待低电平结束(bit 开始) if not wait_level(1, 100): raise TimeoutError("等待 bit 高电平超时") start = time.perf_counter() if not wait_level(0, 100): raise TimeoutError("等待 bit 低电平超时") duration = time.perf_counter() - start # 高电平 > 40us 视为 1,否则为 0 bits.append(1 if duration > 0.00004 else 0) line.release() finally: try: chip.close() except Exception: pass # 恢复原始调度策略 try: if old_scheduler is not None and old_priority is not None: os.sched_setscheduler(0, old_scheduler, old_priority) except Exception: pass # 阶段4: 解析 40bit 数据 data_bits = ''.join(str(b) for b in bits) data_bytes = [int(data_bits[i:i + 8], 2) for i in range(0, 40, 8)] humidity_int, humidity_dec, temp_int, temp_dec, checksum = data_bytes calc_sum = (humidity_int + humidity_dec + temp_int + temp_dec) & 0xFF if calc_sum != checksum: raise ValueError(f"DHT 校验和错误: 计算={calc_sum}, 收到={checksum}") humidity = humidity_int + humidity_dec / 10.0 temperature = temp_int + temp_dec / 10.0 if temp_int & 0x80: temperature = -(temperature & 0x7F) return round(temperature, 1), round(humidity, 1) def create_sensor_from_config(on_data=None): """ 根据环境变量 / 默认值创建 DHT11Sensor 实例。 配置项(环境变量): DHT11_ENABLED: 是否启用,默认 'true'。 DHT11_GPIO_PIN: GPIO line 编号,默认 None(未配置时只能使用模拟模式)。 DHT11_GPIO_CHIP: GPIO chip 名称,默认 'gpiochip0'。 DHT11_POLL_INTERVAL: 读取间隔(秒),默认 5。 DHT11_SIMULATE: 是否模拟,默认 'false'。 """ enabled = os.getenv('DHT11_ENABLED', 'true').lower() in ('1', 'true', 'yes') if not enabled: return None pin_env = os.getenv('DHT11_GPIO_PIN') pin = int(pin_env) if pin_env and pin_env.strip() else None chip = os.getenv('DHT11_GPIO_CHIP', 'gpiochip0') interval = float(os.getenv('DHT11_POLL_INTERVAL', '5')) simulate = os.getenv('DHT11_SIMULATE', 'false').lower() in ('1', 'true', 'yes') return DHT11Sensor( gpio_pin=pin, gpio_chip=chip, poll_interval=interval, simulate=simulate, on_data=on_data )