Quality RTOS & Embedded Software

CATEGORIES

Why Write Another MQTT Library?

Libraries capture a set of decisions about how the world should work. If you're lucky, the model in the library is a suitable match for your needs and constraints. If you're not lucky, you either end up with something that's wasteful, or requires changes, or you must search for a different library that's a better fit. Any of the unlucky outcomes has a cost, either in development efforts, in the final bill of materials, or in dealing with pernicious bugs after your product ships. When we library authors take decisions out of the library and put them in your hands, there's a better chance the library will meet your needs with a minimum of effort or waste.

The LTS release of FreeRTOS contains libraries forged to meet the changing needs of the Internet of Things and embedded devices. Chief among these is coreMQTT, setting the standard for the rest of the core libraries, and putting decisions in your hands.

To take a concrete example, an MQTT library with a direct dependency on software-based TLS is likely be a poor match for a cellular module having intrinsic TCP and TLS functionality. As the number of ways to connect a device to the cloud grows, so too does the variety of modules available from vendors, with options ranging from the mundane to the exotic, including 802.11 Wi-Fi, 802.15.4 6LoWPAN, LTE-M, NB-IoT, and LoRa. Many of these modules offload networking functionality and supply AT commands for control, perhaps wrapped by a socket library. Decoupling networking from coreMQTT means it may prove equally useful regardless of the underlying transport.

CoreMQTT implements an MQTT 3.1.1 client for all QoS levels, covered by the MIT open source license, and complying with ISO C90 and MISRA C:2012. The library emphasizes a small footprint, freedom from dependencies, and composability. The library accommodates static-only applications, since it does not use heap memory, and provides memory-safety proofs.

Read and Write Interface

The coreMQTT library interacts with the network through two functions, a read function and a write function. You provide these functions to the library. Use or adapt an example provided with the demos covering common scenarios such as TLS-based mutual authentication using mbedTLS. Or wrap the functions provided by an offload module, as done in the FreeRTOS cellular demo. Or implement a novel approach, such as a Bluetooth Low Energy proxy that pairs with a smartphone. You can even use multiple kinds of connections within the same application.

To have this flexibility, you give each MQTT connection its own function pointers for read and write in a structure along with an opaque pointer representing a network context. We call this the transport interface. This departs from the conventional platform abstraction layer approach, where a library expects a (usually large) fixed set of functions and data types, that you must implement, each used without discrimination. The premise embodied in coreMQTT is that small interfaces are more useful than large ones, may solve more problems, and may be more widely sharable. We share the transport interface with the coreHTTP library. The docstrings below describe the types that make up the transport interface.


/**

* NetworkContext_t is the incomplete type struct NetworkContext.

* The implemented struct NetworkContext must contain all of the information

* that is needed to receive and send data with the TransportRecv_t

* and the TransportSend_t implementations.

* In the case of TLS over TCP, struct NetworkContext is typically implemented

* with the TCP socket context and a TLS context.

*

* Example code:

*

* struct NetworkContext

* {

* struct MyTCPSocketContext tcpSocketContext;

* struct MyTLSContext tlsContext;

* };

*/


/**

* @brief Transport interface for receiving data on the network.

*

* This function is expected to populate a buffer, with bytes received from the

* transport, and return the number of bytes placed in the buffer.

* In the case of TLS over TCP, TransportRecv_t is typically implemented by

* calling the TLS layer function to receive data. In case of plaintext TCP

* without TLS, it is typically implemented by calling the TCP layer receive

* function. TransportRecv_t may be invoked multiple times by the protocol

* library, if fewer bytes than were requested to receive are returned.

*

* @param[in] pNetworkContext Implementation-defined network context.

* @param[out] pBuffer Buffer to receive the data into.

* @param[in] bytesToRecv Number of bytes requested from the network.

*

* @return The number of bytes received or a negative value to indicate

* error.

*/

typedef int32_t ( * TransportRecv_t )( NetworkContext_t * pNetworkContext,
void * pBuffer,
size_t bytesToRecv );

/**

* @brief Transport interface for sending data over the network.

*

* This function is expected to send the bytes in the given buffer over the

* transport, and return the number of bytes sent.

* In the case of TLS over TCP, TransportSend_t is typically implemented by

* calling the TLS layer function to send data. In case of plaintext TCP

* without TLS, it is typically implemented by calling the TCP layer send

* function. TransportSend_t may be invoked multiple times by the protocol

* library, if fewer bytes than were requested to send are returned.

*

* @param[in] pNetworkContext Implementation-defined network context.

* @param[in] pBuffer Buffer containing the bytes to send over the network stack.

* @param[in] bytesToSend Number of bytes to send over the network.

*

* @return The number of bytes sent or a negative value to indicate error.

*/

typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext,
const void * pBuffer,
size_t bytesToSend );

typedef struct TransportInterface
{
TransportRecv_t recv; /**< Transport receive interface. */
TransportSend_t send; /**< Transport send interface. */
NetworkContext_t * pNetworkContext; /**< Implementation-defined network context. */
} TransportInterface_t;

Connect with Backoff

The manner by which it creates connections is another decision reserved for you and your application. This helps to keep the library simple and the interfaces small. Be aware of the pitfalls waiting around connection retries. Naïve retries coming from a large fleet of devices may essentially be a denial-of-service attack or may have unexpected failure modes due to service throttling. To reduce this risk, FreeRTOS provides the backoffAlgorithm library to calculate a delay between retries based on a capped exponential value with jitter. This demo code shows connection establishment using OpenSSL and the backoffAlgorithm library. Note, the BackoffAlgorithm_GetNextBackoff() does not itself call any sleep function. You call a sleep function directly using the value returned.

/* Initialize reconnect attempts and interval. */
BackoffAlgorithm_InitializeParams( &reconnectParams,
CONNECTION_RETRY_BACKOFF_BASE_MS,
CONNECTION_RETRY_MAX_BACKOFF_DELAY_MS,
CONNECTION_RETRY_MAX_ATTEMPTS );

/* Attempt to connect to MQTT broker. If connection fails, retry after

* a timeout until maximum attempts are reached.

*/

do
{
LogInfo( ( "Establishing a TLS session to %.*s:%d.",
BROKER_ENDPOINT_LENGTH,
BROKER_ENDPOINT,
BROKER_PORT ) );
opensslStatus = Openssl_Connect( pNetworkContext,
&serverInfo,
&opensslCredentials,
TRANSPORT_SEND_RECV_TIMEOUT_MS,
TRANSPORT_SEND_RECV_TIMEOUT_MS );

if( opensslStatus != OPENSSL_SUCCESS )
{
/* Generate a random number and get back-off value (in milliseconds) for the next connection retry. */
backoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &reconnectParams, generateRandomNumber(), &nextRetryBackOff );

if( backoffAlgStatus == BackoffAlgorithmRetriesExhausted )
{
LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
returnStatus = EXIT_FAILURE;
}
else if( backoffAlgStatus == BackoffAlgorithmSuccess )
{
LogWarn( ( "Connection to the broker failed. Retrying connection "
"after %hu ms backoff.",
( unsigned short ) nextRetryBackOff ) );
Clock_SleepMs( nextRetryBackOff );
}
}
} while( ( opensslStatus != OPENSSL_SUCCESS ) && ( backoffAlgStatus == BackoffAlgorithmSuccess ) );

Composability

Composability is a central tenet of the coreMQTT design. This design principle means that functionality lives as small pieces which combine into richer features. The coreMQTT library provides both rich features and the pieces used to make them. You can use a feature as-is, or recombine the pieces for a customized behavior, or add in your own pieces for even more possibilities.

As an example, the serialization performed by the MQTT_Publish() function is separately available in the functions, MQTT_GetPublishPacketSize() and MQTT_SerializePublishHeader(). An alternate composition of these small serialize and deserialize functions exists in the ultralight weight MQTT client demo. While the full featured MQTT_Publish() function interacts with the state engine to support QoS 1 and QoS 2, the ultralight weight demo supports only QoS 0, with no need for sessions or the state engine. The publish function from the ultralight weight demo shows the serialize functions in action. MQTT_GetPublishPacketSize() returns the number of bytes necessary to serialize the message header. If that number is smaller that the size of the supplied buffer, MQTT_SerializePublishHeader() writes the header to the buffer. Two calls use the transport interface to first send the header, and then the payload.


static void mqttPublishToTopic( NetworkContext_t * pNetworkContext,
MQTTFixedBuffer_t * pFixedBuffer )
{
MQTTStatus_t result;
MQTTPublishInfo_t mqttPublishInfo;
size_t remainingLength;
size_t packetSize = 0;
size_t headerSize = 0;
int status;

/* Suppress unused variable warnings when asserts are disabled in build. */
( void ) status;
( void ) result;

/***

* For readability, error handling in this function is restricted to the use of

* asserts().

***/


/* Some fields not used by this demo so start with everything as 0. */
memset( ( void * ) &mqttPublishInfo, 0x00, sizeof( mqttPublishInfo ) );

/* This demo uses QOS0 */
mqttPublishInfo.qos = MQTTQoS0;
mqttPublishInfo.retain = false;
mqttPublishInfo.pTopicName = MQTT_EXAMPLE_TOPIC;
mqttPublishInfo.topicNameLength = ( uint16_t ) strlen( MQTT_EXAMPLE_TOPIC );
mqttPublishInfo.pPayload = MQTT_EXAMPLE_MESSAGE;
mqttPublishInfo.payloadLength = strlen( MQTT_EXAMPLE_MESSAGE );

/* Find out length of Publish packet size. */
result = MQTT_GetPublishPacketSize( &mqttPublishInfo, &remainingLength, &packetSize );
assert( result == MQTTSuccess );

/* Make sure the packet size is less than static buffer size. */
assert( packetSize < pFixedBuffer->size );

/* Serialize MQTT Publish packet header. The publish message payload will

* be sent directly in order to avoid copying it into the buffer.

* QOS0 does not make use of packet identifier, therefore value of 0 is used */

result = MQTT_SerializePublishHeader( &mqttPublishInfo,
0,
remainingLength,
pFixedBuffer,
&headerSize );
LogDebug( ( "Serialized PUBLISH header size is %lu.",
( unsigned long ) headerSize ) );
assert( result == MQTTSuccess );
/* Send Publish header to the broker. */
status = Plaintext_Send( pNetworkContext, ( void * ) pFixedBuffer->pBuffer, headerSize );
assert( status == ( int ) headerSize );
/* Send Publish payload to the broker */
status = Plaintext_Send( pNetworkContext, ( void * ) mqttPublishInfo.pPayload, mqttPublishInfo.payloadLength );
assert( status == ( int ) mqttPublishInfo.payloadLength );
}

Handling Received Messages

One of the biggest decisions reserved for the application is how to handle a newly received message. You may have a simple workflow where only one type of message needs handling, or a more involved approach where a variety of messages need multiplexing across the application. CoreMQTT starts with the simple case, with MQTT_Init() accepting a single callback function to invoke for each PUBISH or ACK message received. The docstrings for the MQTTEventCallback_t type and the MQTT_Init() function describe the values passed to the callback, and show an example of calling MQTT_Init().

/**

* @ingroup mqtt_callback_types

* @brief Application callback for receiving incoming publishes and incoming

* acks.

*

* @note This callback will be called only if packets are deserialized with a

* result of #MQTTSuccess or #MQTTServerRefused. The latter can be obtained

* when deserializing a SUBACK, indicating a broker's rejection of a subscribe.

*

* @param[in] pContext Initialized MQTT context.

* @param[in] pPacketInfo Information on the type of incoming MQTT packet.

* @param[in] pDeserializedInfo Deserialized information from incoming packet.

*/

typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext,
struct MQTTPacketInfo * pPacketInfo,
struct MQTTDeserializedInfo * pDeserializedInfo );

/**

* @brief Initialize an MQTT context.

*

* This function must be called on a #MQTTContext_t before any other function.

*

* @note The #MQTTGetCurrentTimeFunc_t function for querying time must be defined. If

* there is no time implementation, it is the responsibility of the application

* to provide a dummy function to always return 0, provide 0 timeouts for

* all calls to #MQTT_Connect, #MQTT_ProcessLoop, and #MQTT_ReceiveLoop and configure

* the #MQTT_RECV_POLLING_TIMEOUT_MS and #MQTT_SEND_RETRY_TIMEOUT_MS configurations

* to be 0. This will result in loop functions running for a single iteration, and

* #MQTT_Connect relying on #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT to receive the CONNACK packet.

*

* @param[in] pContext The context to initialize.

* @param[in] pTransportInterface The transport interface to use with the context.

* @param[in] getTimeFunction The time utility function to use with the context.

* @param[in] userCallback The user callback to use with the context to

* notify about incoming packet events.

* @param[in] pNetworkBuffer Network buffer provided for the context.

*

* @return #MQTTBadParameter if invalid parameters are passed;

* #MQTTSuccess otherwise.

*

* Example

*

* // Function for obtaining a timestamp.

* uint32_t getTimeStampMs();

* // Callback function for receiving packets.

* void eventCallback(

* MQTTContext_t * pContext,

* MQTTPacketInfo_t * pPacketInfo,

* MQTTDeserializedInfo_t * pDeserializedInfo

* );

* // Network send.

* int32_t networkSend( NetworkContext_t * pContext, const void * pBuffer, size_t bytes );

* // Network receive.

* int32_t networkRecv( NetworkContext_t * pContext, void * pBuffer, size_t bytes );

*

* MQTTContext_t mqttContext;

* TransportInterface_t transport;

* MQTTFixedBuffer_t fixedBuffer;

* uint8_t buffer[ 1024 ];

*

* // Clear context.

* memset( ( void * ) &mqttContext, 0x00, sizeof( MQTTContext_t ) );

*

* // Set transport interface members.

* transport.pNetworkContext = &someTransportContext;

* transport.send = networkSend;

* transport.recv = networkRecv;

*

* // Set buffer members.

* fixedBuffer.pBuffer = buffer;

* fixedBuffer.size = 1024;

*

* status = MQTT_Init( &mqttContext, &transport, getTimeStampMs, eventCallback, &fixedBuffer );

*

* if( status == MQTTSuccess )

* {

* // Do something with mqttContext. The transport and fixedBuffer structs were

* // copied into the context, so the original structs do not need to stay in scope.

* }

*/


MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
const TransportInterface_t * pTransportInterface,
MQTTGetCurrentTimeFunc_t getTimeFunction,
MQTTEventCallback_t userCallback,
const MQTTFixedBuffer_t * pNetworkBuffer );

A very simple callback might ignore all ACKs and parse all PUBLISH messages for a desired value. If you need messages handled by different functions based on topic, compose your callback using the MQTT_MatchTopic() function. The docstring for MQTT_MatchTopic() below includes a trivial example.


/**

* @brief A utility function that determines whether the passed topic filter and

* topic name match according to the MQTT 3.1.1 protocol specification.

*

* @param[in] pTopicName The topic name to check.

* @param[in] topicNameLength Length of the topic name.

* @param[in] pTopicFilter The topic filter to check.

* @param[in] topicFilterLength Length of topic filter.

* @param[out] pIsMatch This is filled with the whether there

* exists a match or not.

*

* @note The API assumes that the passed topic name is valid to meet the

* requirements of the MQTT 3.1.1 specification. Invalid topic names (for example,

* containing wildcard characters) should not be passed to the function.

* Also, the API checks validity of topic filter for wildcard characters ONLY if

* the passed topic name and topic filter do not have an exact string match.

*

* @return Returns one of the following:

* - #MQTTBadParameter, if any of the input parameters is invalid.

* - #MQTTSuccess, if the matching operation was performed.

*

* Example

*

* // Variables used in this example.

* const char * pTopic = "topic/match/1";

* const char * pFilter = "topic/#";

* MQTTStatus_t status = MQTTSuccess;

* bool match = false;

*

* status = MQTT_MatchTopic( pTopic, strlen( pTopic ), pFilter, strlen( pFilter ), &match );

* // Our parameters were valid, so this will return success.

* assert( status == MQTTSuccess );

*

* // For this specific example, we already know this value is true. This

* // check is placed here as an example for use with variable topic names.

* if( match )

* {

* // Application can decide what to do with the matching topic name.

* }

*/

MQTTStatus_t MQTT_MatchTopic( const char * pTopicName,
const uint16_t topicNameLength,
const char * pTopicFilter,
const uint16_t topicFilterLength,
bool * pIsMatch );

The MQTT_MatchTopic() is composable to make a fully featured subscription manager, as seen in the subscription manager demo. The two functions and typedef below, shown with their docstrings, describe tying callback functions to matching topic strings.


/**

* @brief Callback type to be registered for a topic filter with the subscription manager.

*

* For incoming PUBLISH messages received on topics that match the registered topic filter,

* the callback would be invoked by the subscription manager.

*

* @param[in] pContext The context associated with the MQTT connection.

* @param[in] pPublishInfo The incoming PUBLISH message information.

*/

typedef void (* SubscriptionManagerCallback_t )( MQTTContext_t * pContext,
MQTTPublishInfo_t * pPublishInfo );

/**

* @brief Dispatches the incoming PUBLISH message to the callbacks that have their

* registered topic filters matching the incoming PUBLISH topic name. The dispatch

* handler will invoke all these callbacks with matching topic filters.

*

* @param[in] pContext The context associated with the MQTT connection.

* @param[in] pPublishInfo The incoming PUBLISH message information.

*/

void SubscriptionManager_DispatchHandler( MQTTContext_t * pContext,
MQTTPublishInfo_t * pPublishInfo );

/**

* @brief Utility to register a callback for a topic filter in the subscription manager.

*

* The callback will be invoked when an incoming PUBLISH message is received on

* a topic that matches the topic filter, @a pTopicFilter. The subscription manager

* accepts wildcard topic filters.

*

* @param[in] pTopicFilter The topic filter to register the callback for.

* @param[in] topicFilterLength The length of the topic filter string.

* @param[in] callback The callback to be registered for the topic filter.

*

* @note The subscription manager does not allow more than one callback to be registered

* for the same topic filter.

* @note The passed topic filter, @a pTopicFilter, is saved in the registry.

* The application must not free or alter the content of the topic filter memory

* until the callback for the topic filter is removed from the subscription manager.

*

* @return Returns one of the following:

* - #SUBSCRIPTION_MANAGER_SUCCESS if registration of the callback is successful.

* - #SUBSCRIPTION_MANAGER_REGISTRY_FULL if the registration failed due to registry

* being already full.

* - #SUBSCRIPTION_MANAGER_RECORD_EXISTS, if a registered callback already exists for

* the requested topic filter in the subscription manager.

*/

SubscriptionManagerStatus_t SubscriptionManager_RegisterCallback( const char * pTopicFilter,
uint16_t topicFilterLength,
SubscriptionManagerCallback_t pCallback );

The demo code shows the combination of subscribing, using the subscribeToTopic() function, and registering a callback, with SubscriptionManager_RegisterCallback(). The demo registers the callback first, and removes it if the subscription fails. This approach covers the case that a publish arrives on the topic while the MQTT_ProcessLoop() function called by subscribeToTopic() waits for the SUBACK message — a real possibility.


static int subscribeToAndRegisterTopicFilter( MQTTContext_t * pContext,
const char * pTopicFilter,
uint16_t topicFilterLength,
SubscriptionManagerCallback_t callback )
{
int returnStatus = EXIT_SUCCESS;
SubscriptionManagerStatus_t managerStatus = 0u;

/* Register the topic filter and its callback with subscription manager.

* On an incoming PUBLISH message whose topic name that matches the topic filter

* being registered, its callback will be invoked. */

managerStatus = SubscriptionManager_RegisterCallback( pTopicFilter,
topicFilterLength,
callback );

if( managerStatus != SUBSCRIPTION_MANAGER_SUCCESS )
{
returnStatus = EXIT_FAILURE;
}
else
{
LogInfo( ( "Subscribing to the MQTT topic %.*s.",
topicFilterLength,
pTopicFilter ) );

returnStatus = subscribeToTopic( pContext,
pTopicFilter,
topicFilterLength );
}

if( returnStatus != EXIT_SUCCESS )
{
/* Remove the registered callback for the temperature topic filter as

* the subscription operation for the topic filter did not succeed. */

( void ) SubscriptionManager_RemoveCallback( pTopicFilter,
topicFilterLength );
}

return returnStatus;
}

Concurrency

Should you organize your application as a simple super loop or as a set of tasks managed by an RTOS or scheduler? coreMQTT is agnostic on this choice. If you opt for concurrency, then you must take care that your code is safe. A useful approach when writing for FreeRTOS is to dedicate a task to handle MQTT and pass commands to the task over safe FreeRTOS queues. The FreeRTOS MQTT Agent uses this approach. The agent provides an independent daemon task for FreeRTOS to handle all MQTT interactions. Look for a future article about it.

Summary

Alan Kay once said, "Simple things should be simple; complex things should be possible." coreMQTT delivers by providing library functions useful in the simplest super loops, or composable into sophisticated multitasking real-time applications, with examples to show you how. The crucial decisions are yours for the taking.

About the author

View articles by this author
FreeRTOS forums Get industry-leading support from experts and collaborate with peers around the globe. View Forums
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.
Close Bitnami banner
Bitnami