test_mqtt_client.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. import pytest
  2. import sys
  3. import os
  4. from unittest.mock import MagicMock, patch
  5. # 添加项目根目录到Python路径
  6. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  7. from modules.mqtt_client import MQTTClient
  8. class TestMQTTClient:
  9. def setup_method(self):
  10. """每个测试方法执行前的设置"""
  11. # 创建MQTTClient实例
  12. self.mqtt_client = MQTTClient()
  13. # 模拟数据处理和状态处理回调
  14. self.data_callback = MagicMock()
  15. self.status_callback = MagicMock()
  16. self.mqtt_client.set_data_callback(self.data_callback)
  17. self.mqtt_client.set_status_callback(self.status_callback)
  18. def teardown_method(self):
  19. """每个测试方法执行后的清理"""
  20. # 确保断开连接
  21. self.mqtt_client.disconnect()
  22. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  23. def test_connect_success(self, mock_client_class):
  24. """测试成功连接MQTT服务器"""
  25. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  26. mock_client = MagicMock()
  27. mock_client.connect.return_value = 0 # 0表示连接成功
  28. mock_client_class.return_value = mock_client
  29. # 调用connect方法
  30. config = {
  31. 'broker': 'localhost',
  32. 'port': 1883,
  33. 'username': '',
  34. 'password': '',
  35. 'client_id': 'test_client',
  36. 'keepalive': 60,
  37. 'use_tls': False
  38. }
  39. success, message = self.mqtt_client.connect(config)
  40. # 验证结果
  41. assert success is True
  42. assert message == "MQTT连接成功"
  43. mock_client.connect.assert_called_once_with(
  44. 'localhost', 1883, 60
  45. )
  46. mock_client.loop_start.assert_called_once()
  47. # 验证状态回调被调用,且状态为True
  48. self.status_callback.assert_called_once_with(True)
  49. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  50. def test_connect_failure(self, mock_client_class):
  51. """测试连接MQTT服务器失败"""
  52. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象,但connect返回错误码
  53. mock_client = MagicMock()
  54. mock_client.connect.return_value = 1 # 非0表示连接失败
  55. mock_client_class.return_value = mock_client
  56. # 调用connect方法
  57. config = {
  58. 'broker': 'localhost',
  59. 'port': 1883,
  60. 'username': '',
  61. 'password': '',
  62. 'client_id': 'test_client',
  63. 'keepalive': 60,
  64. 'use_tls': False
  65. }
  66. success, message = self.mqtt_client.connect(config)
  67. # 验证结果
  68. assert success is False
  69. assert "连接失败" in message
  70. mock_client.connect.assert_called_once()
  71. mock_client.loop_start.assert_not_called()
  72. # 验证状态回调被调用,且状态为False
  73. self.status_callback.assert_called_once_with(False)
  74. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  75. def test_connect_with_tls(self, mock_client_class):
  76. """测试使用TLS连接MQTT服务器"""
  77. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  78. mock_client = MagicMock()
  79. mock_client.connect.return_value = 0
  80. mock_client_class.return_value = mock_client
  81. # 调用connect方法,启用TLS
  82. config = {
  83. 'broker': 'localhost',
  84. 'port': 8883,
  85. 'username': '',
  86. 'password': '',
  87. 'client_id': 'test_client',
  88. 'keepalive': 60,
  89. 'use_tls': True,
  90. 'ca_certs': 'path/to/ca.crt'
  91. }
  92. success, message = self.mqtt_client.connect(config)
  93. # 验证结果
  94. assert success is True
  95. mock_client.tls_set.assert_called_once_with('path/to/ca.crt')
  96. mock_client.connect.assert_called_once_with(
  97. 'localhost', 8883, 60
  98. )
  99. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  100. def test_connect_with_auth(self, mock_client_class):
  101. """测试使用用户名密码连接MQTT服务器"""
  102. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  103. mock_client = MagicMock()
  104. mock_client.connect.return_value = 0
  105. mock_client_class.return_value = mock_client
  106. # 调用connect方法,设置用户名密码
  107. config = {
  108. 'broker': 'localhost',
  109. 'port': 1883,
  110. 'username': 'test_user',
  111. 'password': 'test_pass',
  112. 'client_id': 'test_client',
  113. 'keepalive': 60,
  114. 'use_tls': False
  115. }
  116. success, message = self.mqtt_client.connect(config)
  117. # 验证结果
  118. assert success is True
  119. mock_client.username_pw_set.assert_called_once_with('test_user', 'test_pass')
  120. mock_client.connect.assert_called_once()
  121. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  122. def test_disconnect(self, mock_client_class):
  123. """测试断开MQTT连接"""
  124. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  125. mock_client = MagicMock()
  126. mock_client.connect.return_value = 0
  127. mock_client_class.return_value = mock_client
  128. # 先连接,再断开
  129. config = {
  130. 'broker': 'localhost',
  131. 'port': 1883,
  132. 'username': '',
  133. 'password': '',
  134. 'client_id': 'test_client',
  135. 'keepalive': 60,
  136. 'use_tls': False
  137. }
  138. self.mqtt_client.connect(config)
  139. # 重置状态回调的调用记录
  140. self.status_callback.reset_mock()
  141. # 调用disconnect方法
  142. self.mqtt_client.disconnect()
  143. # 验证结果
  144. mock_client.loop_stop.assert_called_once()
  145. mock_client.disconnect.assert_called_once()
  146. # 验证状态回调被调用,且状态为False
  147. self.status_callback.assert_called_once_with(False)
  148. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  149. def test_publish_success(self, mock_client_class):
  150. """测试成功发布消息"""
  151. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  152. mock_client = MagicMock()
  153. mock_client.connect.return_value = 0
  154. # 模拟publish返回一个成功的结果
  155. mock_client.publish.return_value = (0, 1)
  156. mock_client_class.return_value = mock_client
  157. # 先连接
  158. config = {
  159. 'broker': 'localhost',
  160. 'port': 1883,
  161. 'username': '',
  162. 'password': '',
  163. 'client_id': 'test_client',
  164. 'keepalive': 60,
  165. 'use_tls': False
  166. }
  167. self.mqtt_client.connect(config)
  168. # 调用publish方法
  169. topic = "test/topic"
  170. payload = "test message"
  171. qos = 0
  172. retain = False
  173. success, message = self.mqtt_client.publish(topic, payload, qos, retain)
  174. # 验证结果
  175. assert success is True
  176. assert message == "消息发布成功"
  177. mock_client.publish.assert_called_once_with(topic, payload, qos, retain)
  178. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  179. def test_publish_failure(self, mock_client_class):
  180. """测试发布消息失败"""
  181. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  182. mock_client = MagicMock()
  183. mock_client.connect.return_value = 0
  184. # 模拟publish返回一个失败的结果
  185. mock_client.publish.return_value = (1, None)
  186. mock_client_class.return_value = mock_client
  187. # 先连接
  188. config = {
  189. 'broker': 'localhost',
  190. 'port': 1883,
  191. 'username': '',
  192. 'password': '',
  193. 'client_id': 'test_client',
  194. 'keepalive': 60,
  195. 'use_tls': False
  196. }
  197. self.mqtt_client.connect(config)
  198. # 调用publish方法
  199. topic = "test/topic"
  200. payload = "test message"
  201. success, message = self.mqtt_client.publish(topic, payload)
  202. # 验证结果
  203. assert success is False
  204. assert "发布失败" in message
  205. mock_client.publish.assert_called_once_with(topic, payload, 0, False)
  206. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  207. def test_publish_not_connected(self, mock_client_class):
  208. """测试未连接时发布消息"""
  209. # 不连接直接发布消息
  210. topic = "test/topic"
  211. payload = "test message"
  212. success, message = self.mqtt_client.publish(topic, payload)
  213. # 验证结果
  214. assert success is False
  215. assert message == "MQTT未连接"
  216. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  217. def test_subscribe_success(self, mock_client_class):
  218. """测试成功订阅主题"""
  219. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  220. mock_client = MagicMock()
  221. mock_client.connect.return_value = 0
  222. # 模拟subscribe返回一个成功的结果
  223. mock_client.subscribe.return_value = (0, 1)
  224. mock_client_class.return_value = mock_client
  225. # 先连接
  226. config = {
  227. 'broker': 'localhost',
  228. 'port': 1883,
  229. 'username': '',
  230. 'password': '',
  231. 'client_id': 'test_client',
  232. 'keepalive': 60,
  233. 'use_tls': False
  234. }
  235. self.mqtt_client.connect(config)
  236. # 调用subscribe方法
  237. topic = "test/topic"
  238. qos = 0
  239. success, message = self.mqtt_client.subscribe(topic, qos)
  240. # 验证结果
  241. assert success is True
  242. assert message == "主题订阅成功"
  243. mock_client.subscribe.assert_called_once_with(topic, qos)
  244. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  245. def test_subscribe_failure(self, mock_client_class):
  246. """测试订阅主题失败"""
  247. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  248. mock_client = MagicMock()
  249. mock_client.connect.return_value = 0
  250. # 模拟subscribe返回一个失败的结果
  251. mock_client.subscribe.return_value = (1, None)
  252. mock_client_class.return_value = mock_client
  253. # 先连接
  254. config = {
  255. 'broker': 'localhost',
  256. 'port': 1883,
  257. 'username': '',
  258. 'password': '',
  259. 'client_id': 'test_client',
  260. 'keepalive': 60,
  261. 'use_tls': False
  262. }
  263. self.mqtt_client.connect(config)
  264. # 调用subscribe方法
  265. topic = "test/topic"
  266. success, message = self.mqtt_client.subscribe(topic)
  267. # 验证结果
  268. assert success is False
  269. assert "订阅失败" in message
  270. mock_client.subscribe.assert_called_once_with(topic, 0)
  271. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  272. def test_unsubscribe(self, mock_client_class):
  273. """测试取消订阅主题"""
  274. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  275. mock_client = MagicMock()
  276. mock_client.connect.return_value = 0
  277. mock_client_class.return_value = mock_client
  278. # 先连接
  279. config = {
  280. 'broker': 'localhost',
  281. 'port': 1883,
  282. 'username': '',
  283. 'password': '',
  284. 'client_id': 'test_client',
  285. 'keepalive': 60,
  286. 'use_tls': False
  287. }
  288. self.mqtt_client.connect(config)
  289. # 调用unsubscribe方法
  290. topic = "test/topic"
  291. self.mqtt_client.unsubscribe(topic)
  292. # 验证结果
  293. mock_client.unsubscribe.assert_called_once_with(topic)
  294. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  295. def test_get_status(self, mock_client_class):
  296. """测试获取MQTT客户端状态"""
  297. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  298. mock_client = MagicMock()
  299. mock_client.connect.return_value = 0
  300. mock_client_class.return_value = mock_client
  301. # 初始状态应该是False
  302. assert self.mqtt_client.get_status() is False
  303. # 连接后状态应该是True
  304. config = {
  305. 'broker': 'localhost',
  306. 'port': 1883,
  307. 'username': '',
  308. 'password': '',
  309. 'client_id': 'test_client',
  310. 'keepalive': 60,
  311. 'use_tls': False
  312. }
  313. self.mqtt_client.connect(config)
  314. assert self.mqtt_client.get_status() is True
  315. # 断开后状态应该是False
  316. self.mqtt_client.disconnect()
  317. assert self.mqtt_client.get_status() is False
  318. @patch('modules.mqtt_client.paho.mqtt.client.Client')
  319. def test_on_message_callback(self, mock_client_class):
  320. """测试MQTT消息回调处理"""
  321. # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
  322. mock_client = MagicMock()
  323. mock_client.connect.return_value = 0
  324. mock_client_class.return_value = mock_client
  325. # 连接MQTT
  326. config = {
  327. 'broker': 'localhost',
  328. 'port': 1883,
  329. 'username': '',
  330. 'password': '',
  331. 'client_id': 'test_client',
  332. 'keepalive': 60,
  333. 'use_tls': False
  334. }
  335. self.mqtt_client.connect(config)
  336. # 获取回调函数
  337. on_message_callback = mock_client.on_message
  338. # 模拟一个接收到的消息
  339. mock_message = MagicMock()
  340. mock_message.topic = "test/topic"
  341. mock_message.payload = b"test payload"
  342. # 调用回调函数
  343. on_message_callback(mock_client, None, mock_message)
  344. # 验证数据回调被调用
  345. self.data_callback.assert_called_once_with({
  346. 'topic': 'test/topic',
  347. 'payload': 'test payload'
  348. })
  349. if __name__ == "__main__":
  350. pytest.main(["-v", __file__])