| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- import pytest
- import sys
- import os
- from unittest.mock import MagicMock, patch
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from modules.mqtt_client import MQTTClient
- class TestMQTTClient:
-
- def setup_method(self):
- """每个测试方法执行前的设置"""
- # 创建MQTTClient实例
- self.mqtt_client = MQTTClient()
- # 模拟数据处理和状态处理回调
- self.data_callback = MagicMock()
- self.status_callback = MagicMock()
- self.mqtt_client.set_data_callback(self.data_callback)
- self.mqtt_client.set_status_callback(self.status_callback)
-
- def teardown_method(self):
- """每个测试方法执行后的清理"""
- # 确保断开连接
- self.mqtt_client.disconnect()
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_connect_success(self, mock_client_class):
- """测试成功连接MQTT服务器"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0 # 0表示连接成功
- mock_client_class.return_value = mock_client
-
- # 调用connect方法
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- success, message = self.mqtt_client.connect(config)
-
- # 验证结果
- assert success is True
- assert message == "MQTT连接成功"
- mock_client.connect.assert_called_once_with(
- 'localhost', 1883, 60
- )
- mock_client.loop_start.assert_called_once()
- # 验证状态回调被调用,且状态为True
- self.status_callback.assert_called_once_with(True)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_connect_failure(self, mock_client_class):
- """测试连接MQTT服务器失败"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象,但connect返回错误码
- mock_client = MagicMock()
- mock_client.connect.return_value = 1 # 非0表示连接失败
- mock_client_class.return_value = mock_client
-
- # 调用connect方法
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- success, message = self.mqtt_client.connect(config)
-
- # 验证结果
- assert success is False
- assert "连接失败" in message
- mock_client.connect.assert_called_once()
- mock_client.loop_start.assert_not_called()
- # 验证状态回调被调用,且状态为False
- self.status_callback.assert_called_once_with(False)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_connect_with_tls(self, mock_client_class):
- """测试使用TLS连接MQTT服务器"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- mock_client_class.return_value = mock_client
-
- # 调用connect方法,启用TLS
- config = {
- 'broker': 'localhost',
- 'port': 8883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': True,
- 'ca_certs': 'path/to/ca.crt'
- }
- success, message = self.mqtt_client.connect(config)
-
- # 验证结果
- assert success is True
- mock_client.tls_set.assert_called_once_with('path/to/ca.crt')
- mock_client.connect.assert_called_once_with(
- 'localhost', 8883, 60
- )
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_connect_with_auth(self, mock_client_class):
- """测试使用用户名密码连接MQTT服务器"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- mock_client_class.return_value = mock_client
-
- # 调用connect方法,设置用户名密码
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': 'test_user',
- 'password': 'test_pass',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- success, message = self.mqtt_client.connect(config)
-
- # 验证结果
- assert success is True
- mock_client.username_pw_set.assert_called_once_with('test_user', 'test_pass')
- mock_client.connect.assert_called_once()
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_disconnect(self, mock_client_class):
- """测试断开MQTT连接"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- mock_client_class.return_value = mock_client
-
- # 先连接,再断开
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
- # 重置状态回调的调用记录
- self.status_callback.reset_mock()
-
- # 调用disconnect方法
- self.mqtt_client.disconnect()
-
- # 验证结果
- mock_client.loop_stop.assert_called_once()
- mock_client.disconnect.assert_called_once()
- # 验证状态回调被调用,且状态为False
- self.status_callback.assert_called_once_with(False)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_publish_success(self, mock_client_class):
- """测试成功发布消息"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- # 模拟publish返回一个成功的结果
- mock_client.publish.return_value = (0, 1)
- mock_client_class.return_value = mock_client
-
- # 先连接
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
-
- # 调用publish方法
- topic = "test/topic"
- payload = "test message"
- qos = 0
- retain = False
- success, message = self.mqtt_client.publish(topic, payload, qos, retain)
-
- # 验证结果
- assert success is True
- assert message == "消息发布成功"
- mock_client.publish.assert_called_once_with(topic, payload, qos, retain)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_publish_failure(self, mock_client_class):
- """测试发布消息失败"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- # 模拟publish返回一个失败的结果
- mock_client.publish.return_value = (1, None)
- mock_client_class.return_value = mock_client
-
- # 先连接
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
-
- # 调用publish方法
- topic = "test/topic"
- payload = "test message"
- success, message = self.mqtt_client.publish(topic, payload)
-
- # 验证结果
- assert success is False
- assert "发布失败" in message
- mock_client.publish.assert_called_once_with(topic, payload, 0, False)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_publish_not_connected(self, mock_client_class):
- """测试未连接时发布消息"""
- # 不连接直接发布消息
- topic = "test/topic"
- payload = "test message"
- success, message = self.mqtt_client.publish(topic, payload)
-
- # 验证结果
- assert success is False
- assert message == "MQTT未连接"
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_subscribe_success(self, mock_client_class):
- """测试成功订阅主题"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- # 模拟subscribe返回一个成功的结果
- mock_client.subscribe.return_value = (0, 1)
- mock_client_class.return_value = mock_client
-
- # 先连接
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
-
- # 调用subscribe方法
- topic = "test/topic"
- qos = 0
- success, message = self.mqtt_client.subscribe(topic, qos)
-
- # 验证结果
- assert success is True
- assert message == "主题订阅成功"
- mock_client.subscribe.assert_called_once_with(topic, qos)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_subscribe_failure(self, mock_client_class):
- """测试订阅主题失败"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- # 模拟subscribe返回一个失败的结果
- mock_client.subscribe.return_value = (1, None)
- mock_client_class.return_value = mock_client
-
- # 先连接
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
-
- # 调用subscribe方法
- topic = "test/topic"
- success, message = self.mqtt_client.subscribe(topic)
-
- # 验证结果
- assert success is False
- assert "订阅失败" in message
- mock_client.subscribe.assert_called_once_with(topic, 0)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_unsubscribe(self, mock_client_class):
- """测试取消订阅主题"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- mock_client_class.return_value = mock_client
-
- # 先连接
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
-
- # 调用unsubscribe方法
- topic = "test/topic"
- self.mqtt_client.unsubscribe(topic)
-
- # 验证结果
- mock_client.unsubscribe.assert_called_once_with(topic)
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_get_status(self, mock_client_class):
- """测试获取MQTT客户端状态"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- mock_client_class.return_value = mock_client
-
- # 初始状态应该是False
- assert self.mqtt_client.get_status() is False
-
- # 连接后状态应该是True
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
- assert self.mqtt_client.get_status() is True
-
- # 断开后状态应该是False
- self.mqtt_client.disconnect()
- assert self.mqtt_client.get_status() is False
-
- @patch('modules.mqtt_client.paho.mqtt.client.Client')
- def test_on_message_callback(self, mock_client_class):
- """测试MQTT消息回调处理"""
- # 模拟paho.mqtt.client.Client返回一个有效的客户端对象
- mock_client = MagicMock()
- mock_client.connect.return_value = 0
- mock_client_class.return_value = mock_client
-
- # 连接MQTT
- config = {
- 'broker': 'localhost',
- 'port': 1883,
- 'username': '',
- 'password': '',
- 'client_id': 'test_client',
- 'keepalive': 60,
- 'use_tls': False
- }
- self.mqtt_client.connect(config)
-
- # 获取回调函数
- on_message_callback = mock_client.on_message
-
- # 模拟一个接收到的消息
- mock_message = MagicMock()
- mock_message.topic = "test/topic"
- mock_message.payload = b"test payload"
-
- # 调用回调函数
- on_message_callback(mock_client, None, mock_message)
-
- # 验证数据回调被调用
- self.data_callback.assert_called_once_with({
- 'topic': 'test/topic',
- 'payload': 'test payload'
- })
- if __name__ == "__main__":
- pytest.main(["-v", __file__])
|