| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- """
- DHT11 温湿度传感器本地读取模块
- 通过 libgpiod 操作 GPIO,定时读取 DHT11 数据并上报到应用层。
- 支持真实传感器读取和模拟模式(用于无硬件环境测试)。
- """
- import os
- import time
- import json
- import logging
- import threading
- import subprocess
- logger = logging.getLogger('dht11_sensor')
- # 尝试导入 gpiod;如果缺失,真实读取模式会不可用,但模拟模式仍可运行
- try:
- import gpiod
- _GPIOD_AVAILABLE = True
- # 简单判断 gpiod 1.x/2.x:1.x 有 Chip/get_line,2.x 没有 get_line
- _GPIOD_V1 = hasattr(gpiod, 'Chip') and hasattr(gpiod.Chip, 'get_line')
- _GPIOD_V2 = hasattr(gpiod, 'Chip') and not _GPIOD_V1
- if _GPIOD_V2:
- logger.warning("检测到 gpiod 2.x,当前 DHT11 驱动使用 gpiod 1.x API;"
- "请在目标设备使用系统包管理器安装 python3-libgpiod (1.x)")
- _GPIOD_AVAILABLE = False
- except Exception as _e: # pragma: no cover
- gpiod = None
- _GPIOD_AVAILABLE = False
- logger.warning(f"gpiod 库未安装或加载失败,DHT11 真实读取不可用: {_e}")
- class DHT11Sensor:
- """
- DHT11 传感器读取器。
- 参数:
- gpio_pin: GPIO line 编号(在 gpiochip0 上)。
- 例如 Rockchip 的 GPIO2_A0 通常对应 sysfs 编号 64,
- 但具体编号取决于内核 gpio 描述。请按实际接线配置。
- gpio_chip: GPIO chip 名称,默认 'gpiochip0'。
- poll_interval: 读取间隔(秒),默认 5。
- simulate: 是否使用模拟数据(无真实硬件时测试用)。
- on_data: 数据回调函数,签名 on_data(temperature, humidity) -> None。
- """
- def __init__(self, gpio_pin=None, gpio_chip='gpiochip0', poll_interval=5,
- simulate=False, on_data=None):
- self.gpio_pin = gpio_pin
- self.gpio_chip = gpio_chip
- self.poll_interval = poll_interval
- self.simulate = simulate
- self.on_data = on_data
- self._running = False
- self._thread = None
- self._last_error = None
- self._last_success_time = None
- def start(self):
- """启动后台读取线程。"""
- if self._running:
- return
- self._running = True
- self._thread = threading.Thread(target=self._read_loop, daemon=True)
- self._thread.start()
- mode = '模拟' if self.simulate else '真实'
- logger.info(f"DHT11 读取线程已启动 (模式={mode}, pin={self.gpio_pin}, chip={self.gpio_chip}, 间隔={self.poll_interval}s)")
- def stop(self):
- """停止后台读取线程。"""
- self._running = False
- if self._thread and self._thread.is_alive():
- self._thread.join(timeout=2)
- logger.info("DHT11 读取线程已停止")
- def get_status(self):
- """返回当前传感器状态。"""
- return {
- 'running': self._running,
- 'simulate': self.simulate,
- 'gpio_pin': self.gpio_pin,
- 'gpio_chip': self.gpio_chip,
- 'poll_interval': self.poll_interval,
- 'last_success_time': self._last_success_time,
- 'last_error': self._last_error
- }
- def _read_loop(self):
- """后台循环读取。"""
- # 首次启动稍等片刻,让系统其他初始化完成
- time.sleep(1)
- while self._running:
- try:
- if self.simulate:
- temperature, humidity = self._read_simulated()
- else:
- temperature, humidity = self._read_real()
- self._last_success_time = time.strftime('%Y-%m-%d %H:%M:%S')
- self._last_error = None
- if self.on_data:
- try:
- self.on_data(temperature, humidity)
- except Exception as cb_err:
- logger.error(f"DHT11 数据回调异常: {cb_err}")
- else:
- logger.info(f"DHT11 读取成功: 温度={temperature}°C, 湿度={humidity}%")
- except Exception as e:
- self._last_error = str(e)
- logger.warning(f"DHT11 读取失败: {e}")
- # 按间隔休眠,拆分成小段以便快速退出
- for _ in range(int(self.poll_interval * 2)):
- if not self._running:
- break
- time.sleep(0.5)
- def _read_simulated(self):
- """生成缓慢变化的模拟数据。"""
- t = time.time()
- # 温度在 20~30 度之间缓慢波动,湿度在 40~70% 之间缓慢波动
- temperature = round(25 + 4 * ((t % 60) / 60 - 0.5), 1)
- humidity = round(55 + 14 * ((t % 120) / 120 - 0.5), 1)
- return temperature, humidity
- def _reader_binary_path(self):
- """返回 C 语言 DHT11 读取器可执行文件路径。"""
- # 优先使用与模块同目录下的 scripts/dht11_reader
- backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- candidates = [
- os.path.join(backend_dir, 'scripts', 'dht11_reader'),
- '/root/dzxj_dtu/backend/scripts/dht11_reader',
- '/usr/local/bin/dht11_reader',
- ]
- for p in candidates:
- if os.path.isfile(p) and os.access(p, os.X_OK):
- return p
- return None
- def _read_with_binary(self):
- """
- 调用 C 语言读取器获取温湿度。
- 该二进制使用 open-drain + 高速采样,可避开 Python 调度抖动。
- """
- binary = self._reader_binary_path()
- if not binary:
- raise RuntimeError("DHT11 C 读取器未找到,请确保 dht11_reader 已编译")
- pin = int(self.gpio_pin)
- chip = str(self.gpio_chip)
- cmd = [binary, chip, str(pin)]
- proc = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True,
- timeout=10
- )
- stderr_text = proc.stderr.strip()
- if stderr_text:
- for line in stderr_text.splitlines():
- logger.debug(f"dht11_reader: {line}")
- if proc.returncode != 0:
- raise RuntimeError(f"dht11_reader 退出码 {proc.returncode}")
- try:
- result = json.loads(proc.stdout.strip().splitlines()[-1])
- except Exception as e:
- raise RuntimeError(f"解析 dht11_reader 输出失败: {e}, stdout={proc.stdout!r}")
- if not result.get('valid'):
- raise RuntimeError(f"DHT11 校验和错误: raw={result.get('raw')}")
- return float(result['temperature']), float(result['humidity'])
- def _read_real(self):
- """
- 读取一次 DHT11。
- 优先使用 C 语言读取器(时序更稳定),不可用时回退到纯 Python gpiod。
- """
- if self.gpio_pin is None:
- raise ValueError("未配置 DHT11 GPIO pin 编号")
- # 优先使用 C 二进制读取器
- if self._reader_binary_path():
- last_err = None
- for attempt in range(3):
- try:
- return self._read_with_binary()
- except Exception as e:
- last_err = e
- logger.debug(f"C 读取器尝试 {attempt + 1} 失败: {e}")
- time.sleep(2.5) # DHT11 两次读取间隔需 >2s
- logger.warning(f"C 读取器连续失败,回退 Python gpiod: {last_err}")
- if not _GPIOD_AVAILABLE:
- raise RuntimeError("gpiod 库不可用,无法读取 DHT11")
- # 纯 Python gpiod 回退
- last_err = None
- for attempt in range(3):
- try:
- return self._read_once()
- except Exception as e:
- last_err = e
- time.sleep(0.2 + attempt * 0.1)
- raise last_err
- def _read_once(self):
- pin = int(self.gpio_pin)
- chip_name = str(self.gpio_chip)
- # 尝试提升实时优先级,减少读取期间的调度抖动
- old_scheduler = None
- old_priority = None
- try:
- import os
- old_scheduler = os.sched_getscheduler(0)
- old_priority = os.sched_getparam(0)
- # SCHED_FIFO 优先级 50(范围 1~99),失败后静默回退
- os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(50))
- except Exception:
- pass
- chip = gpiod.Chip(chip_name)
- line = None
- try:
- line = chip.get_line(pin)
- # 安全:如果该 line 已被内核/其他驱动占用,跳过避免系统异常
- if hasattr(line, 'is_used') and line.is_used():
- raise RuntimeError(f"GPIO {pin} 已被系统占用,无法使用")
- # 阶段1: 主机起始信号
- # DHT11 上电后需要 >1s 稳定时间,每次读取前保持高电平 2s
- line.request(consumer='dht11', type=gpiod.LINE_REQ_DIR_OUT, default_vals=[1])
- line.set_value(1)
- time.sleep(2.0)
- line.set_value(0)
- time.sleep(0.030) # 拉低 30ms,兼容更多模块
- line.set_value(1)
- time.sleep(0.00005) # 释放 50us
- line.release()
- # 阶段2: 切换为输入等待传感器响应
- # gpiod 1.4.x 不一定支持 BIAS_PULL_UP,不支持则回退
- try:
- line.request(
- consumer='dht11',
- type=gpiod.LINE_REQ_DIR_IN,
- flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP
- )
- except Exception:
- line.request(consumer='dht11', type=gpiod.LINE_REQ_DIR_IN)
- def wait_level(expected, timeout_us):
- """忙等待电平变化,返回是否成功。"""
- deadline = time.perf_counter() + timeout_us / 1_000_000.0
- while line.get_value() != expected:
- if time.perf_counter() > deadline:
- return False
- return True
- # 等待 DHT 拉低(响应开始),超时放宽到 1ms
- if not wait_level(0, 1000):
- raise TimeoutError("等待 DHT 响应低电平超时")
- # 等待 DHT 拉高(响应结束)
- if not wait_level(1, 1000):
- raise TimeoutError("等待 DHT 响应高电平超时")
- # 等待 DHT 再次拉低(数据开始)
- if not wait_level(0, 1000):
- raise TimeoutError("等待 DHT 数据开始超时")
- # 阶段3: 读取 40bit 数据
- bits = []
- for _ in range(40):
- # 等待低电平结束(bit 开始)
- if not wait_level(1, 100):
- raise TimeoutError("等待 bit 高电平超时")
- start = time.perf_counter()
- if not wait_level(0, 100):
- raise TimeoutError("等待 bit 低电平超时")
- duration = time.perf_counter() - start
- # 高电平 > 40us 视为 1,否则为 0
- bits.append(1 if duration > 0.00004 else 0)
- line.release()
- finally:
- try:
- chip.close()
- except Exception:
- pass
- # 恢复原始调度策略
- try:
- if old_scheduler is not None and old_priority is not None:
- os.sched_setscheduler(0, old_scheduler, old_priority)
- except Exception:
- pass
- # 阶段4: 解析 40bit 数据
- data_bits = ''.join(str(b) for b in bits)
- data_bytes = [int(data_bits[i:i + 8], 2) for i in range(0, 40, 8)]
- humidity_int, humidity_dec, temp_int, temp_dec, checksum = data_bytes
- calc_sum = (humidity_int + humidity_dec + temp_int + temp_dec) & 0xFF
- if calc_sum != checksum:
- raise ValueError(f"DHT 校验和错误: 计算={calc_sum}, 收到={checksum}")
- humidity = humidity_int + humidity_dec / 10.0
- temperature = temp_int + temp_dec / 10.0
- if temp_int & 0x80:
- temperature = -(temperature & 0x7F)
- return round(temperature, 1), round(humidity, 1)
- def create_sensor_from_config(on_data=None):
- """
- 根据环境变量 / 默认值创建 DHT11Sensor 实例。
- 配置项(环境变量):
- DHT11_ENABLED: 是否启用,默认 'true'。
- DHT11_GPIO_PIN: GPIO line 编号,默认 None(未配置时只能使用模拟模式)。
- DHT11_GPIO_CHIP: GPIO chip 名称,默认 'gpiochip0'。
- DHT11_POLL_INTERVAL: 读取间隔(秒),默认 5。
- DHT11_SIMULATE: 是否模拟,默认 'false'。
- """
- enabled = os.getenv('DHT11_ENABLED', 'true').lower() in ('1', 'true', 'yes')
- if not enabled:
- return None
- pin_env = os.getenv('DHT11_GPIO_PIN')
- pin = int(pin_env) if pin_env and pin_env.strip() else None
- chip = os.getenv('DHT11_GPIO_CHIP', 'gpiochip0')
- interval = float(os.getenv('DHT11_POLL_INTERVAL', '5'))
- simulate = os.getenv('DHT11_SIMULATE', 'false').lower() in ('1', 'true', 'yes')
- return DHT11Sensor(
- gpio_pin=pin,
- gpio_chip=chip,
- poll_interval=interval,
- simulate=simulate,
- on_data=on_data
- )
|