dht11_sensor.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. """
  2. DHT11 温湿度传感器本地读取模块
  3. 通过 libgpiod 操作 GPIO,定时读取 DHT11 数据并上报到应用层。
  4. 支持真实传感器读取和模拟模式(用于无硬件环境测试)。
  5. """
  6. import os
  7. import time
  8. import json
  9. import logging
  10. import threading
  11. import subprocess
  12. logger = logging.getLogger('dht11_sensor')
  13. # 尝试导入 gpiod;如果缺失,真实读取模式会不可用,但模拟模式仍可运行
  14. try:
  15. import gpiod
  16. _GPIOD_AVAILABLE = True
  17. # 简单判断 gpiod 1.x/2.x:1.x 有 Chip/get_line,2.x 没有 get_line
  18. _GPIOD_V1 = hasattr(gpiod, 'Chip') and hasattr(gpiod.Chip, 'get_line')
  19. _GPIOD_V2 = hasattr(gpiod, 'Chip') and not _GPIOD_V1
  20. if _GPIOD_V2:
  21. logger.warning("检测到 gpiod 2.x,当前 DHT11 驱动使用 gpiod 1.x API;"
  22. "请在目标设备使用系统包管理器安装 python3-libgpiod (1.x)")
  23. _GPIOD_AVAILABLE = False
  24. except Exception as _e: # pragma: no cover
  25. gpiod = None
  26. _GPIOD_AVAILABLE = False
  27. logger.warning(f"gpiod 库未安装或加载失败,DHT11 真实读取不可用: {_e}")
  28. class DHT11Sensor:
  29. """
  30. DHT11 传感器读取器。
  31. 参数:
  32. gpio_pin: GPIO line 编号(在 gpiochip0 上)。
  33. 例如 Rockchip 的 GPIO2_A0 通常对应 sysfs 编号 64,
  34. 但具体编号取决于内核 gpio 描述。请按实际接线配置。
  35. gpio_chip: GPIO chip 名称,默认 'gpiochip0'。
  36. poll_interval: 读取间隔(秒),默认 5。
  37. simulate: 是否使用模拟数据(无真实硬件时测试用)。
  38. on_data: 数据回调函数,签名 on_data(temperature, humidity) -> None。
  39. """
  40. def __init__(self, gpio_pin=None, gpio_chip='gpiochip0', poll_interval=5,
  41. simulate=False, on_data=None):
  42. self.gpio_pin = gpio_pin
  43. self.gpio_chip = gpio_chip
  44. self.poll_interval = poll_interval
  45. self.simulate = simulate
  46. self.on_data = on_data
  47. self._running = False
  48. self._thread = None
  49. self._last_error = None
  50. self._last_success_time = None
  51. def start(self):
  52. """启动后台读取线程。"""
  53. if self._running:
  54. return
  55. self._running = True
  56. self._thread = threading.Thread(target=self._read_loop, daemon=True)
  57. self._thread.start()
  58. mode = '模拟' if self.simulate else '真实'
  59. logger.info(f"DHT11 读取线程已启动 (模式={mode}, pin={self.gpio_pin}, chip={self.gpio_chip}, 间隔={self.poll_interval}s)")
  60. def stop(self):
  61. """停止后台读取线程。"""
  62. self._running = False
  63. if self._thread and self._thread.is_alive():
  64. self._thread.join(timeout=2)
  65. logger.info("DHT11 读取线程已停止")
  66. def get_status(self):
  67. """返回当前传感器状态。"""
  68. return {
  69. 'running': self._running,
  70. 'simulate': self.simulate,
  71. 'gpio_pin': self.gpio_pin,
  72. 'gpio_chip': self.gpio_chip,
  73. 'poll_interval': self.poll_interval,
  74. 'last_success_time': self._last_success_time,
  75. 'last_error': self._last_error
  76. }
  77. def _read_loop(self):
  78. """后台循环读取。"""
  79. # 首次启动稍等片刻,让系统其他初始化完成
  80. time.sleep(1)
  81. while self._running:
  82. try:
  83. if self.simulate:
  84. temperature, humidity = self._read_simulated()
  85. else:
  86. temperature, humidity = self._read_real()
  87. self._last_success_time = time.strftime('%Y-%m-%d %H:%M:%S')
  88. self._last_error = None
  89. if self.on_data:
  90. try:
  91. self.on_data(temperature, humidity)
  92. except Exception as cb_err:
  93. logger.error(f"DHT11 数据回调异常: {cb_err}")
  94. else:
  95. logger.info(f"DHT11 读取成功: 温度={temperature}°C, 湿度={humidity}%")
  96. except Exception as e:
  97. self._last_error = str(e)
  98. logger.warning(f"DHT11 读取失败: {e}")
  99. # 按间隔休眠,拆分成小段以便快速退出
  100. for _ in range(int(self.poll_interval * 2)):
  101. if not self._running:
  102. break
  103. time.sleep(0.5)
  104. def _read_simulated(self):
  105. """生成缓慢变化的模拟数据。"""
  106. t = time.time()
  107. # 温度在 20~30 度之间缓慢波动,湿度在 40~70% 之间缓慢波动
  108. temperature = round(25 + 4 * ((t % 60) / 60 - 0.5), 1)
  109. humidity = round(55 + 14 * ((t % 120) / 120 - 0.5), 1)
  110. return temperature, humidity
  111. def _reader_binary_path(self):
  112. """返回 C 语言 DHT11 读取器可执行文件路径。"""
  113. # 优先使用与模块同目录下的 scripts/dht11_reader
  114. backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  115. candidates = [
  116. os.path.join(backend_dir, 'scripts', 'dht11_reader'),
  117. '/root/dzxj_dtu/backend/scripts/dht11_reader',
  118. '/usr/local/bin/dht11_reader',
  119. ]
  120. for p in candidates:
  121. if os.path.isfile(p) and os.access(p, os.X_OK):
  122. return p
  123. return None
  124. def _read_with_binary(self):
  125. """
  126. 调用 C 语言读取器获取温湿度。
  127. 该二进制使用 open-drain + 高速采样,可避开 Python 调度抖动。
  128. """
  129. binary = self._reader_binary_path()
  130. if not binary:
  131. raise RuntimeError("DHT11 C 读取器未找到,请确保 dht11_reader 已编译")
  132. pin = int(self.gpio_pin)
  133. chip = str(self.gpio_chip)
  134. cmd = [binary, chip, str(pin)]
  135. proc = subprocess.run(
  136. cmd,
  137. stdout=subprocess.PIPE,
  138. stderr=subprocess.PIPE,
  139. text=True,
  140. timeout=10
  141. )
  142. stderr_text = proc.stderr.strip()
  143. if stderr_text:
  144. for line in stderr_text.splitlines():
  145. logger.debug(f"dht11_reader: {line}")
  146. if proc.returncode != 0:
  147. raise RuntimeError(f"dht11_reader 退出码 {proc.returncode}")
  148. try:
  149. result = json.loads(proc.stdout.strip().splitlines()[-1])
  150. except Exception as e:
  151. raise RuntimeError(f"解析 dht11_reader 输出失败: {e}, stdout={proc.stdout!r}")
  152. if not result.get('valid'):
  153. raise RuntimeError(f"DHT11 校验和错误: raw={result.get('raw')}")
  154. return float(result['temperature']), float(result['humidity'])
  155. def _read_real(self):
  156. """
  157. 读取一次 DHT11。
  158. 优先使用 C 语言读取器(时序更稳定),不可用时回退到纯 Python gpiod。
  159. """
  160. if self.gpio_pin is None:
  161. raise ValueError("未配置 DHT11 GPIO pin 编号")
  162. # 优先使用 C 二进制读取器
  163. if self._reader_binary_path():
  164. last_err = None
  165. for attempt in range(3):
  166. try:
  167. return self._read_with_binary()
  168. except Exception as e:
  169. last_err = e
  170. logger.debug(f"C 读取器尝试 {attempt + 1} 失败: {e}")
  171. time.sleep(2.5) # DHT11 两次读取间隔需 >2s
  172. logger.warning(f"C 读取器连续失败,回退 Python gpiod: {last_err}")
  173. if not _GPIOD_AVAILABLE:
  174. raise RuntimeError("gpiod 库不可用,无法读取 DHT11")
  175. # 纯 Python gpiod 回退
  176. last_err = None
  177. for attempt in range(3):
  178. try:
  179. return self._read_once()
  180. except Exception as e:
  181. last_err = e
  182. time.sleep(0.2 + attempt * 0.1)
  183. raise last_err
  184. def _read_once(self):
  185. pin = int(self.gpio_pin)
  186. chip_name = str(self.gpio_chip)
  187. # 尝试提升实时优先级,减少读取期间的调度抖动
  188. old_scheduler = None
  189. old_priority = None
  190. try:
  191. import os
  192. old_scheduler = os.sched_getscheduler(0)
  193. old_priority = os.sched_getparam(0)
  194. # SCHED_FIFO 优先级 50(范围 1~99),失败后静默回退
  195. os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(50))
  196. except Exception:
  197. pass
  198. chip = gpiod.Chip(chip_name)
  199. line = None
  200. try:
  201. line = chip.get_line(pin)
  202. # 安全:如果该 line 已被内核/其他驱动占用,跳过避免系统异常
  203. if hasattr(line, 'is_used') and line.is_used():
  204. raise RuntimeError(f"GPIO {pin} 已被系统占用,无法使用")
  205. # 阶段1: 主机起始信号
  206. # DHT11 上电后需要 >1s 稳定时间,每次读取前保持高电平 2s
  207. line.request(consumer='dht11', type=gpiod.LINE_REQ_DIR_OUT, default_vals=[1])
  208. line.set_value(1)
  209. time.sleep(2.0)
  210. line.set_value(0)
  211. time.sleep(0.030) # 拉低 30ms,兼容更多模块
  212. line.set_value(1)
  213. time.sleep(0.00005) # 释放 50us
  214. line.release()
  215. # 阶段2: 切换为输入等待传感器响应
  216. # gpiod 1.4.x 不一定支持 BIAS_PULL_UP,不支持则回退
  217. try:
  218. line.request(
  219. consumer='dht11',
  220. type=gpiod.LINE_REQ_DIR_IN,
  221. flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP
  222. )
  223. except Exception:
  224. line.request(consumer='dht11', type=gpiod.LINE_REQ_DIR_IN)
  225. def wait_level(expected, timeout_us):
  226. """忙等待电平变化,返回是否成功。"""
  227. deadline = time.perf_counter() + timeout_us / 1_000_000.0
  228. while line.get_value() != expected:
  229. if time.perf_counter() > deadline:
  230. return False
  231. return True
  232. # 等待 DHT 拉低(响应开始),超时放宽到 1ms
  233. if not wait_level(0, 1000):
  234. raise TimeoutError("等待 DHT 响应低电平超时")
  235. # 等待 DHT 拉高(响应结束)
  236. if not wait_level(1, 1000):
  237. raise TimeoutError("等待 DHT 响应高电平超时")
  238. # 等待 DHT 再次拉低(数据开始)
  239. if not wait_level(0, 1000):
  240. raise TimeoutError("等待 DHT 数据开始超时")
  241. # 阶段3: 读取 40bit 数据
  242. bits = []
  243. for _ in range(40):
  244. # 等待低电平结束(bit 开始)
  245. if not wait_level(1, 100):
  246. raise TimeoutError("等待 bit 高电平超时")
  247. start = time.perf_counter()
  248. if not wait_level(0, 100):
  249. raise TimeoutError("等待 bit 低电平超时")
  250. duration = time.perf_counter() - start
  251. # 高电平 > 40us 视为 1,否则为 0
  252. bits.append(1 if duration > 0.00004 else 0)
  253. line.release()
  254. finally:
  255. try:
  256. chip.close()
  257. except Exception:
  258. pass
  259. # 恢复原始调度策略
  260. try:
  261. if old_scheduler is not None and old_priority is not None:
  262. os.sched_setscheduler(0, old_scheduler, old_priority)
  263. except Exception:
  264. pass
  265. # 阶段4: 解析 40bit 数据
  266. data_bits = ''.join(str(b) for b in bits)
  267. data_bytes = [int(data_bits[i:i + 8], 2) for i in range(0, 40, 8)]
  268. humidity_int, humidity_dec, temp_int, temp_dec, checksum = data_bytes
  269. calc_sum = (humidity_int + humidity_dec + temp_int + temp_dec) & 0xFF
  270. if calc_sum != checksum:
  271. raise ValueError(f"DHT 校验和错误: 计算={calc_sum}, 收到={checksum}")
  272. humidity = humidity_int + humidity_dec / 10.0
  273. temperature = temp_int + temp_dec / 10.0
  274. if temp_int & 0x80:
  275. temperature = -(temperature & 0x7F)
  276. return round(temperature, 1), round(humidity, 1)
  277. def create_sensor_from_config(on_data=None):
  278. """
  279. 根据环境变量 / 默认值创建 DHT11Sensor 实例。
  280. 配置项(环境变量):
  281. DHT11_ENABLED: 是否启用,默认 'true'。
  282. DHT11_GPIO_PIN: GPIO line 编号,默认 None(未配置时只能使用模拟模式)。
  283. DHT11_GPIO_CHIP: GPIO chip 名称,默认 'gpiochip0'。
  284. DHT11_POLL_INTERVAL: 读取间隔(秒),默认 5。
  285. DHT11_SIMULATE: 是否模拟,默认 'false'。
  286. """
  287. enabled = os.getenv('DHT11_ENABLED', 'true').lower() in ('1', 'true', 'yes')
  288. if not enabled:
  289. return None
  290. pin_env = os.getenv('DHT11_GPIO_PIN')
  291. pin = int(pin_env) if pin_env and pin_env.strip() else None
  292. chip = os.getenv('DHT11_GPIO_CHIP', 'gpiochip0')
  293. interval = float(os.getenv('DHT11_POLL_INTERVAL', '5'))
  294. simulate = os.getenv('DHT11_SIMULATE', 'false').lower() in ('1', 'true', 'yes')
  295. return DHT11Sensor(
  296. gpio_pin=pin,
  297. gpio_chip=chip,
  298. poll_interval=interval,
  299. simulate=simulate,
  300. on_data=on_data
  301. )