已更新 2025年7月
使用 coreMQTT 的 MQTT Agent 及其演示
包括 Over the Air (OTA) 更新
注意:本页已作废。请使用采用 coreMQTT Agent 的线程安全 coreMQTT Agent 演示 。
- 本页内容:
引言
coreMQTT 是一个 MIT 授权的开源 MQTT 客户端 C 库,适用于基于微控制器和小型微处理器的 IoT 设备。该客户端库设计简洁,确保其不依赖任何其他库或操作系统, 能更好地执行静态分析,如内存安全证明。正因为这种简单性 和独立于操作系统的特性(coreMQTT 完全不需要多线程),coreMQTT 不会直接在实现过程中构建线程安全。相反,线程安全必须 由更高级别的软件提供。本页所演示的 coreMQTT 扩展 以 MQTT agent(或 MQTT 守护进程)的形式提供更高级的功能。虽然此处演示的实现 目前专用于 FreeRTOS,但对 FreeRTOS 的依赖项并不多, 也就是说可以轻松调整该实现,以适配其他操作系统。
实现情况概览和使用模型
MQTT Agent 是独立的任务(或执行线程)。它是获准访问 MQTT 库 API 的唯一任务, 因此可轻松实现线程安全。隔离对单个任务的所有 MQTT API 调用 可以实现访问序列化,进而无需信号量或任何其他同步原语。
使用 Agent 时,如果应用程序任务想要执行 MQTT 操作,例如发布消息, 其会调用 MQTT Agent 的 MQTTAgent_Publish() API ,而不是调用 coreMQTT 的 MQTT_Publish() API。MQTTAgent_Publish() 将完成 Publish 操作所需的信息打包到结构体中, 然后通过队列将该结构体发送到 MQTT Agent 任务。MQTT Agent 任务 接收该结构体,然后代表应用程序调用基础 MQTT 库的 MQTT_Publish() API。
应用程序写入器可借助向 MQTT Agent 发送命令的 API, 选择性指定回调函数以及要传递给回调的参数(称为“回调上下文”), 以便 Agent 在生成的 MQTT 操作完成时执行。如此一来,应用程序任务可以 选择进入阻塞状态(因此不会占用任何 CPU 时间)等待回调执行, 或者在 MQTT 操作正在进行时继续执行 。请参阅本页末尾的示例。
如果在 MQTT Agent API 调用时队列已满, 应用程序写入器还可借助向 MQTT Agent 发送命令的 API, 指定调用任务在阻塞状态下等待用于向 MQTT Agent 发送命令的队列中存在可用空间的最长时间。 同样,请参阅本页末尾的示例。
演示项目
序言
**注意:**MQTT agent 和相关示例项目可以正常运行,但尚未完成。请注意, 该 Agent 尚不符合我们的代码质量标准,也尚未经过全面测试。API 在正式首次发布之前不太可能发生变化。
功能
main() 会先初始化 TCP/IP 堆栈,然后启动 FreeRTOS 内核调度器。 有网络连接时,网络事件钩子可创建单项 RTOS 任务。该 任务可选择性创建多项其他任务,每项任务都会连接到 MQTT 代理,然后发送和接收各种不同大小 和不同服务质量 (QoS) 级别的 MQTT 数据包。原始线程 随后成为 MQTT Agent 线程。
以下常量
用于定义创建的演示任务。描述中的链接指向注释,
这些注释位于每个实现源文件的顶部,可提供更多信息。
-
#define democonfigCREATE_LARGE_MESSAGE_SUB_PUB_TASK [1 或 0]
设置为 1 时,可创建 运行在 large_message_sub_pub_demo.c
中实现的 MQTT 演示的任务; 设置为 0 时,可从构建中省略该任务。
-
#define democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE [n]
设置 simple_pub_sub_demo.c
中实现的任务要创建的实例数, 该值可以为 0。
-
#define democonfigCREATE_CODE_SIGNING_OTA_DEMO [1 或 0]
设置为 1 时,可包括 Over the Air (OTA) 更新功能
; 设置为 0 时,则忽略 OTA。我们提供单独的页面,说明如何使用 OTA 和该演示。
-
#define democonfigCREATE_DEFENDER_DEMO [1 或 0]
设置为 1 时,可生成 AWS Device Defender 演示;设置为 0 时,则忽略此功能。有关 Device Defender 演示的说明,请参阅此处。
获取源代码
coreMQTT-Agent-Demos 存储库(位于 FreeRTOS GitHub 账号中)
演示了如何在 coreMQTT 上使用 Agent,还演示了如何
将其他 FreeRTOS 库以子模块形式纳入到项目中。请勿使用 GitHub 中的 "Download Zip" 链接来获取代码,
因为该 zip 文件不包括子模块化库。请改用以下 Git 命令将存储库
及其子模块克隆到本地计算机上:
使用 HTTPS 进行克隆:
1git clone https://github.com/FreeRTOS/coreMQTT-Agent-Demos.git --recurse-submodules
使用 SSH:
1git clone git@github.com:FreeRTOS/coreMQTT-Agent-Demos.git --recurse-submodules
如果下载存储库时未使用 --recurse-submodules 实参,则需运行:
1git submodule update --init --recursive
撰写本文时,项目使用的是
FreeRTOS Windows 移植,
FreeRTOS-Plus-TCP TCP/IP 堆栈,并使用
Visual Studio 免费社区版构建。然而,
其目录结构经过精心编排,旨在将来针对其他开发工具添加项目。
源代码组织
Git 存储库的组织结构如下:
1+-build2| |3| +-VisualStudio **Contains the Visual Studio project for this demo**4|5+-lib6| |7| +-AWS **Contains a sub-module for the OTA library along with an OTA PAL port for Windows.**8| +-FreeRTOS **Contains sub-modules of the FreeRTOS libraries used by the demo**9| +-ThirdParty **Contains submodules of third party libraries used by the demo**10|11+-source12 |13 +-configuration-files **Contains configuration files for the demo and libraries**14 +- . **Contains source files that implement the various demos**15
配置 FreeRTOS-Plus-TCP
此演示使用 FreeRTOS-Plus-TCP TCP/IP 堆栈,因此 请按照 为 TCP/IP 入门项目提供的说明操作 以确保您:
-
安装了必备组件(例如 WinPCap)。
-
设置了 MAC 地址(可选)。
-
最重要的是,在尝试运行 MQTT 演示之前,测试了网络连接。
每个演示项目都有自己的配置设置。当你按照网络配置说明进行操作时, 确保应用 MQTT 演示项目中的设置,而不是 TCP/IP 入门项目中的设置。默认情况下,TCP/IP 堆栈被配置为使用动态 IP 地址。
配置 MQTT 代理连接
MQTT Agent 可以本地或远程使用 TLS 或纯文本连接到任何 MQTT 代理 。非 Agent 纯文本演示文档页面上的“配置 MQTT 代理”部分 详细介绍了一些本地和远程纯文本 MQTT 代理选项。服务器 身份验证和相互身份验证页面上的“配置 MQTT 代理”部分 则详细介绍了一些 TLS 加密的 MQTT 代理选项。
AWS IoT 用户其他注意事项: 如果希望连接到 AWS IoT, 则可以创建必要的云端和设备端配置, 请按照记录单独相互身份验证演示的页面中“使用 AWS IoT 消息代理”部分提供的说明进行操作 。该部分引用的脚本 可在 MQTT Agent 存储亏的 /lib/AWS/tools 目录中找到。
确定要连接到的代理之后,请参阅要点下方的代码片段,这些片段介绍了 配置 MQTT 连接的编译时配置常量。这些常量位于 /Source/configuration-files/demo_config.h 中。 请将 TLS 连接所需的所有密钥放置在同一文件中。
重要提示:
-
纯文本连接可用于了解 MQTT 的工作原理和调试连接,这是因为 可以在 WireShark
等网络嗅探器中查看 MQTT 对话。 但是,实际 IoT 设备不应使用未经加密的纯文本连接。切勿 通过纯文本连接发送私人数据,强烈建议 所有 IoT 设备使用经过加密和相互身份验证的连接。
-
将按键放在头文件中仅为方便演示。强烈建议 实际设备将密钥存储在安全位置,如 Secure Element 或 Secure Enclave 中。 此外,建议实际 IoT 设备通过不会泄露密钥的 API (例如 PKCS #11 或 PSA API)访问密钥和其他加密对象。
1/**2 * The MQTT client identifier used in this example. Each client identifier3 * must be unique so edit as required to ensure no two clients connecting to the4 * same broker use the same client identifier.5 *6 *!!! Please note a #defined constant is used for convenience of demonstration7 *!!! only. Production devices can use something unique to the device that can8 *!!! be read by software, such as a production serial number, instead of a9 *!!! hard coded constant.10 *11 * Below is an example only.12 */1314#define democonfigCLIENT_IDENTIFIER "Thing1"151617/**18 * MQTT broker endpoint to connect to.19 *20 * This demo application can be run with any MQTT broker, although it is21 * recommended to use one that supports mutual authentication. If mutual22 * authentication is not used, then #democonfigUSE_TLS should be set to 0.23 *24 * Not for AWS users: Your AWS IoT Core endpoint can be found in the AWS IoT25 * console under Settings/Custom Endpoint, or using the describe-endpoint REST26 * API (with AWS CLI command line tool).27 *28 * #define democonfigMQTT_BROKER_ENDPOINT "insert here."29 *30 * Examples include:31 * #define democonfigMQTT_BROKER_ENDPOINT "test.mosquitto.org"32 * #define democonfigMQTT_BROKER_ENDPOINT "x-ats.iot.us-west-2.amazonaws.com"33 */3435#define democonfigMQTT_BROKER_ENDPOINT "192.168.0.100" /* Local broker */363738/**39 * The TCP port to connect to.40 *41 * In general, port 8883 is for secured MQTT connections, and port 1883 if not42 * using TLS.43 *44 * #define democonfigMQTT_BROKER_PORT ( insert here. )45 *46 * Below is an example only.47 */4849#define democonfigMQTT_BROKER_PORT ( 8883 )505152/**53 * Whether to use mutual authentication. If this macro is not set to 1 or not54 * defined, then plaintext TCP will be used instead of TLS over TCP.55 */5657#define democonfigUSE_TLS 158
MQTT Agent 配置常量
以下代码片段显示了与 MQTT Agent 相关的编译时常量,
及其默认值(在未定义的情况下)。MQTT Agent 没有自己的配置文件,
但会使用
core_mqtt_config.h 中定义的任意宏。
该函数中定义的宏将替代默认值。
1/**2 * The maximum number of pending acknowledgments to track for a single3 * connection.4 *5 * The MQTT agent tracks MQTT commands (such as PUBLISH and SUBSCRIBE) that6 * are still waiting to be acknowledged. MQTT_AGENT_MAX_OUTSTANDING_ACKS set7 * the maximum number of acknowledgments that can be outstanding at any one time.8 * If this threshold is reached, packets will still be sent, but reported as failure.9 * The higher this number is the greater the agent’s RAM consumption will be.10 * Each entry uses up to 8 bytes on a 32-bit microprocessor.11 */1213#ifndef MQTT_AGENT_MAX_OUTSTANDING_ACKS14 #define MQTT_AGENT_MAX_OUTSTANDING_ACKS ( 20 )15#endif161718/**19 * Time in MS that the MQTT agent task will wait in the Blocked state (so not20 * using any CPU time) for a command to arrive in its command queue before21 * exiting the blocked state so it can call MQTT_ProcessLoop().22 *23 * It is important MQTT_ProcessLoop() is called often if there is known MQTT24 * traffic, but calling it too often can take processing time away from lower25 * priority tasks and waste CPU time and power.26 */2728#ifndef MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME29 #define MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME ( 1000 )30#endif31
MQTT Agent API 使用示例
关于以下代码的重要提示:
-
在 MQTT PUBLISH 得到 MQTT 代理的确认之前, MQTT PUBLISH 有效载荷和主题字符串必须保持有效。在下面的示例中,主题字符串是一个静态常量, 因此无论如何都是有效的。
-
在本示例中,调用
的任务 需等待收到服务器已确认 MQTT PUBLISH 命令的通知。任务可以选择性传递完成回调 (该回调在收到确认后执行),但这一操作很有用, 因为可发出信号,表明用于保存 MQTT 发布信息的缓冲区何时可以重用。MQTTAgent_Publish()
1/* Application defined structure that will get typedef'ed to CommandContext_t. */2struct CommandContext3{4 TaskHandle_t xTaskToNotify;5 MQTTStatus_t xReturnStatus;6};78static const char * const pcTopicName = "/my/topic/x";910/* The information associated with a single MQTT agent. Each agent context11 * holds the information for a single MQTT connection. This context must be12 * initialized with a call to MQTTAgent_Init() prior to its use. */13static MQTTAgentContext_t xAgentContext;141516/* Callback executed when the MQTT PUBLISH is acknowledged by the MQTT broker. */17static void prvCommandCallback( void *pxCommandContext,18 MQTTStatus_t xReturnStatus )19{20CommandContext_t *pxApplicationDefinedContext = ( CommandContext_t * ) pxCommandContext;2122 /* Store the result in the application defined context and send a notification23 to the initiating task. Since this callback executes in the thread context of24 the MQTT agent task, the notification is necessary to signal the caller. */25 pxApplicationDefinedContext->xReturnStatus = xReturnStatus;2627 xTaskNotify( pxApplicationDefinedContext->xTaskToNotify,28 xReturnStatus,29 eSetValueWithOverwrite );30}313233void ExampleOfCallingAgentPublish( char *pcPayload, uint16_t usPayloadLength )34{35 MQTTPublishInfo_t xPublishInfo;36 /* The context for the completion callback must stay in scope until37 * the completion callback is invoked. In this example, using a stack variable38 * is safe because the function waits for the callback to execute before returning. */39 CommandContext_t xCommandContext;40 MQTTStatus_t xCommandAdded;41 BaseType_t xReturn;42 CommandInfo_t xCommandInformation;4344 /* Complete an MQTTPublishInfo_t structure describing the publish operation.45 MQTTPublishInfo_t is defined by coreMQTT and is the same as would be passed46 into MQTT_Publish. */47 xPublishInfo.qos = MQTTQoS1;48 xPublishInfo.pTopicName = pcTopicName;49 xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicName );50 xPublishInfo.pPayload = pcPayload;51 xPublishInfo.payloadLength = usPayloadLength;5253 /* Complete the CommandContext_t structure that will get passed into the54 callback that executes when the publish command is ACKed by the MQTT55 broker. The application writer defines this structure, and can put whatever56 they like in it. This simple example stores the handle of the calling task57 and provides a variable to store the status of the operation. See the58 implementation of prvCommandCallback() above. */59 xCommandContext.xTaskToNotify = [xTaskGetCurrentTaskHandle](/Documentation/02-Kernel/04-API-references/03-Task-utilities/00-Task-utilities/#xtaskgetcurrenttaskhandle)();606162 /* Callback to execute the PUBLISH is acknowledged by the server. */63 xCommandInformation.cmdCompleteCallback = prvCommandCallback;6465 /* The parameter to pass into the callback. In this case the parameter just66 * holds the handle of the task calling this function. */67 xCommandInformation.pCmdCompleteCallbackContext = &xCommandContext;6869 /* Maximum time to wait enqueue the command for the agent. */70 xCommandInformation.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS;717273 /* Send the command to the MQTT agent. */74 xCommandAdded = MQTTAgent_Publish( /* The MQTT context to use. */75 &xAgentContext,76 /* Structure that defined the MQTT PUBLISH77 operation. */78 &xPublishInfo,79 /* Command Information. */80 &xCommandInformation );8182 if( xCommandAdded == MQTTSuccess )83 {84 /* The command was successfully sent to the agent. At this point the85 application writer can opt to wait for the callback to be executed,86 meaning the MQTT broker acknowledged the MQTT PUBLISH. Note the data87 pointed to by xPublishInfo.pTopicName and xPublishInfo.pPayload must88 remain valid (not be lost from a stack frame or overwritten in a buffer)89 until the PUBLISH is acknowledged. In this simple example the callback90 sends a notification to this task. */91 xReturn = xTaskNotifyWait( 0,92 0,93 NULL,94 portMAX_DELAY ); /* Wait indefinitely. */9596 if( xReturn != pdFAIL )97 {98 /* The message was acknowledged and99 xCommandContext.xReturnStatus holds the result of the operation. */100 }101 }102}103
MQTT agent API
目前,MQTT Agent API 记录在
mqtt_agent.h
头文件和 MQTT Agent API 引用文件
中。