FoodDelivery/pubsubclient-master/tests/testcases/mqtt_basic.py
tk b708b47c4b 【类 型】:fix
【主	题】:添加库文件
【描	述】:
	[原因]:
	[过程]:
	[影响]:
【结	束】

# 类型 包含:
# feat:新功能(feature)
# fix:修补bug
# docs:文档(documentation)
# style: 格式(不影响代码运行的变动)
# refactor:重构(即不是新增功能,也不是修改bug的代码变动)
# test:增加测试
# chore:构建过程或辅助工具的变动
2024-06-14 17:16:36 +08:00

40 lines
1.2 KiB
Python

import unittest
import settings
import time
import mosquitto
def on_message(mosq, obj, msg):
obj.message_queue.append(msg)
class mqtt_basic(unittest.TestCase):
message_queue = []
@classmethod
def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self)
self.client.connect(settings.server_ip)
self.client.on_message = on_message
self.client.subscribe("outTopic", 0)
@classmethod
def tearDownClass(self):
self.client.disconnect()
def test_one(self):
i = 30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue[0]
self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload, "hello world")
self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain, False, "message retain flag incorrect")