coreMQTT Basic Multithreaded Demo
Note: We recommend to always use mutual authentication in building any Internet of Things (IoT) application. The demo on this page is only meant for educational purposes and demonstrates MQTT communication prior to introducing encryption and authentication. It is not intended to be suitable for production use.
Single Threaded Vs Multi Threaded
There are two coreMQTT usage models,
single threaded and
multithreaded (multitasking). The demo documented on this page provides an example of creating your own multithreading scheme. There is also a multithreaded example that instead executes the MQTT protocol in the background within
an agent (or daemon) task. Executing the MQTT protocol in an agent task removes the need for the application writer to explicitly manage any MQTT state or call the
MQTT_ProcessLoop()
API function. Using an agent task also enables multiple application tasks to share a single MQTT connection without the need for synchronization primitives such as mutexes.
Demo introduction
The coreMQTT Basic Multithreaded demo project uses the
FreeRTOS Windows port, so you can
build and evaluate it with the free Community version of Visual Studios on Windows, without the need for any particular MCU hardware.
This demo uses a thread safe queue to hold commands for interacting with the MQTT API. There are four tasks to note in
this demo:
- A command (main) task processes the commands from the command queue while the other tasks enqueue them. This task
enters a loop, during which it processes commands from the command queue. If a termination command is received, it
will break out of the loop.
- A synchronous publisher task creates a series of publish operations which it pushes to the command queue, which are
then executed by the command task. This task uses synchronous publishing, which means it will wait for each publish
operation to complete before scheduling the next one.
- An asynchronous publisher task creates a series of publish operations which it pushes to the command queue, which are
then executed by the command task. The difference between this task and the previous one is that it will not wait for
a publish operation to complete before scheduling the next publish operation, and it checks the status of each after
all publish operations have been enqueued. Note that the distinction between synchronous and asynchronous publishing
is only in the behavior of the task, not in the actual publish command.
- A subscriber task creates an MQTT subscription to a topic filter that matches the topics of all messages which the
publisher tasks publish. It loops, waiting to receive back the messages published by the other tasks.
Tasks can have queues to hold received messages, and the command task will push incoming messages to the queue of each task
that is subscribed to the incoming topic.
The basic Multithreaded demo can be configured to use either a TLS connection with mutual authentication, or a plaintext TCP connection. By default, the demo uses TLS. If the network unexpectedly disconnects during the demo, then the client will attempt to reconnect with exponential backoff logic. Additionally, if the reconnection succeeds, but the broker cannot resume the prior session, then the client will resubscribe to the previously subscribed topics.
Source Code Organization
The Visual Studio solution for the Multithreaded MQTT demo is called
mqtt_multitask_demo.sln
and is located in the
/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask directory
of the main FreeRTOS download.
Click to enlarge
Building the Demo Project
The demo project uses the
free community edition
of Visual Studio. To build the demo:
- Open the '
mqtt_multitask_demo.sln
' Visual Studio solution file from within the Visual Studio IDE.
- Select '
Build Solution
' from the IDE's 'Build
' menu.
Note: If you are using Microsoft Visual Studio 2017 or earlier, then you must select a 'Platform
Toolset
' compatible with your version: 'Project -> RTOSDemos Properties -> Platform Toolset
'.
Configuring the Demo Project
The demo uses the
FreeRTOS+TCP TCP/IP stack, so follow the instructions provided for the
TCP/IP starter project to ensure you:
- Have the pre-requisite
components installed (such as WinPCap).
- Optionally set a static or
dynamic IP address, gateway address and netmask.
- Optionally set a MAC address.
- Select an Ethernet
network interface on your host machine.
- ...and importantly
test your network connection before attempting to run the MQTT demo.
All of these settings should be changed in the MQTT LTS rc1 demo project, not the TCP/IP starter project referred to in the
pages linked to above! As delivered, the TCP/IP stack is configured to use a dynamic IP address.
Configuring the MQTT Broker Connection
Option 1: TLS with Mutual Authentication (default):
This demo supports the same configuration options as the MQTT Mutual Auth demo. Please see that demo's documentation for
all of the available options.
Option 2: Plaintext:
To enable quick setup with no certificate configuration required, the basic Multithreaded demo allows plaintext TCP
connections to be used in lieu of mutually authenticated TLS. To disable TLS, The macro 'democonfigUSE_TLS
'
should be set to 0
in 'demo_config
', or simply not defined at all. Then, the demo may be used
with any unencrypted MQTT broker (for example, Eclipse Mosquitto) by following the same instructions as the Plaintext demo.
Functionality
The demo creates four tasks in total: three that request MQTT API calls, and one that processes those requests (the primary
task). In this demo, the primary task loops through creating the three subtasks, calling the processing loop, and cleaning up
afterwards. This primary task creates a single MQTT connection to the broker that is then shared among the subtasks. Two of
the tasks publish messages to the broker, and the third receives the messages back using an MQTT subscription to a topic
filter that matches all of those published.
Typedefs
The demo defines the following structures, enums, and function pointers:
Commands
Rather than making the MQTT API calls directly, the tasks use 'Command_t
' structures to create commands that
instruct the main task to call the appropriate API for them. Commands may be of type 'PROCESSLOOP
',
'PUBLISH
', 'SUBSCRIBE
', 'UNSUBSCRIBE
', 'PING
', 'DISCONNECT
',
'RECONNECT
', or 'TERMINATE
'. The 'TERMINATE
' command does not have a corresponding MQTT
API; it is used in the demo to instruct the main task to stop processing commands and begin cleanup operations. Because some
additional information (for example, publish or subscribe info) is required for some MQTT commands ('MQTT_Publish
',
'MQTT_Subscribe
', and 'MQTT_Unsubscribe
') we use the 'CommandContext_t
' field. This
field is required for those three commands just mentioned, but optional for the others.
Since this context is required for these commands, it MUST NOT be changed once the command has been enqueued, and until the command has completed. When a
command completes, an optional callback may be invoked. In this demo, we use a callback that creates a task notification to
inform the calling task that the command has completed. For MQTT operations that require acknowledgments (subscribes,
unsubscribes, and publishes with QoS greater than 0), the command is considered completed once the acknowledgment has been
received. Otherwise, the command is completed once the corresponding MQTT API call has returned.
typedef struct Command
{
CommandType_t xCommandType;
CommandContext_t * pxCmdContext;
CommandCallback_t vCallback;
} Command_t;
typedef struct CommandContext
{
MQTTPublishInfo_t * pxPublishInfo;
MQTTSubscribeInfo_t * pxSubscribeInfo;
size_t ulSubscriptionCount;
MQTTStatus_t xReturnStatus;
bool xIsComplete;
TaskHandle_t xTaskToNotify;
uint32_t ulNotificationBit;
QueueHandle_t pxResponseQueue;
} CommandContext_t;
typedef enum CommandType
{
PROCESSLOOP,
PUBLISH,
SUBSCRIBE,
UNSUBSCRIBE,
PING,
DISCONNECT,
RECONNECT,
TERMINATE
} CommandType_t;
typedef void (* CommandCallback_t )( CommandContext_t * );
Acknowledgments
As some MQTT operations require an acknowledgment, an array of 'AckInfo_t
' is used that contains the packet
identifier of the expected acknowledgment, and the original command that is expecting it (so that its completion callback
may be invoked).
typedef struct ackInfo
{
uint16_t usPacketId;
Command_t xOriginalCommand;
} AckInfo_t;
Subscriptions
This demo is able to track subscriptions for each task. In order to do so, each task that requests a subscription must provide
a message queue where it will receive back the published messages. Multiple tasks may subscribe to the same topic filter, as
they are expected to use separate response queues.
typedef struct subscriptionElement
{
char pcSubscriptionFilter[ mqttexampleDEMO_BUFFER_SIZE ];
uint16_t usFilterLength;
QueueHandle_t pxResponseQueue;
} SubscriptionElement_t;
Received Published Messages
Since tasks execute in parallel with the main task, it would be difficult and time consuming for the main task to have to
wait for each subscribed task to read a received published message. Therefore, each message received is copied to the response
queue of any task subscribed to the published message's topic. Since publish packets received from the MQTT client contain
pointers to the client's network buffer, the payload and topic name of the incoming message are copied into separate buffers
before insertion into a response queue. This way, the subscribed task may still read the received information after the MQTT
client has cleared its network buffer.
typedef struct publishElement
{
MQTTPublishInfo_t xPublishInfo;
uint8_t pcPayloadBuf[ mqttexampleDEMO_BUFFER_SIZE ];
uint8_t pcTopicNameBuf[ mqttexampleDEMO_BUFFER_SIZE ];
} PublishElement_t;
Main Task
The main application task establishes a persistent MQTT session, creates three subtasks, and executes the processing loop
until a termination command is received. A persistent session is used, so if the network unexpectedly disconnects, the demo
will reconnect to the broker in the background, without losing subscriptions or any incoming published messages from the
broker. In order to create a new persistent session for every run, the demo connects to the broker with the "clean session"
flag set, then disconnects and reconnects with the flag unset. After the processing loop has terminated, it disconnects from
the broker, and loops again from the network reconnection. The structure of the main task is shown here:
static void prvMQTTDemoTask( void * pvParameters )
{
BaseType_t xNetworkStatus = pdFAIL;
MQTTStatus_t xMQTTStatus;
BaseType_t xResult = pdFALSE;
uint32_t ulNotification = 0;
uint32_t ulExpectedNotifications = mqttexamplePUBLISHER_SYNC_COMPLETE_BIT |
mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT |
mqttexamplePUBLISHER_ASYNC_COMPLETE_BIT;
( void ) pvParameters;
ulGlobalEntryTimeMs = prvGetTimeMs();
xCommandQueue = xQueueCreate( mqttexampleCOMMAND_QUEUE_SIZE, sizeof( Command_t ) );
xSubscriberResponseQueue = xQueueCreate( mqttexamplePUBLISH_QUEUE_SIZE, sizeof( PublishElement_t ) );
xDefaultResponseQueue = xQueueCreate( 1, sizeof( PublishElement_t ) );
prvCleanExistingPersistentSession();
for( ; ; )
{
memset( pxPendingAcks, 0x00, mqttexamplePENDING_ACKS_MAX_SIZE * sizeof( AckInfo_t ) );
memset( pxSubscriptions, 0x00, mqttexampleSUBSCRIPTIONS_MAX_COUNT * sizeof( SubscriptionElement_t ) );
xNetworkStatus = prvSocketConnect( &xNetworkContext );
configASSERT( xNetworkStatus == pdPASS );
xMQTTStatus = prvMQTTConnect( &globalMqttContext, false );
configASSERT( xMQTTStatus == MQTTSuccess );
configASSERT( globalMqttContext.connectStatus == MQTTConnected );
xResult = xTaskCreate( prvSubscribeTask, "Subscriber", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY + 1, &xSubscribeTask );
configASSERT( xResult == pdPASS );
xResult = xTaskCreate( prvSyncPublishTask, "SyncPublisher", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY, &xSyncPublisherTask );
configASSERT( xResult == pdPASS );
xResult = xTaskCreate( prvAsyncPublishTask, "AsyncPublisher", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY, &xAsyncPublisherTask );
configASSERT( xResult == pdPASS );
LogInfo( ( "Running command loop" ) );
prvCommandLoop();
LogInfo( ( "Waiting for tasks to exit." ) );
( void ) prvNotificationWaitLoop( &ulNotification, ulExpectedNotifications, false );
configASSERT( ( ulNotification & ulExpectedNotifications ) == ulExpectedNotifications );
xQueueReset( xCommandQueue );
xQueueReset( xDefaultResponseQueue );
xQueueReset( xSubscriberResponseQueue );
ulNotification = ulTaskNotifyValueClear( NULL, ~( 0U ) );
xNetworkStatus = prvSocketDisconnect( &xNetworkContext );
configASSERT( xNetworkStatus == pdPASS );
LogInfo( ( "\r\n\r\nprvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully.\r\n" ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
}
}
Click to enlarge
Command loop
The command loop waits for commands to be enqueued, and then calls the appropriate MQTT API. Note that all commands except
for 'DISCONNECT
' and 'TERMINATE
' will result in 'MQTT_ProcessLoop
' being called as well.
This demo sets a socket wakeup callback to add a 'PROCESSLOOP
' command to the queue when data is available on the
socket. However, there may be many commands ahead of it in the queue at that point. So, to ensure that we do not neglect
incoming data while other commands are processed, 'MQTT_ProcessLoop
' is called for a single iteration after each
command.
static void prvCommandLoop( void )
{
Command_t xCommand;
Command_t xNewCommand;
MQTTStatus_t xStatus = MQTTSuccess;
static int lNumProcessed = 0;
bool xTerminateReceived = false;
BaseType_t xCommandAdded = pdTRUE;
for( ; ; )
{
if( xQueueReceive( xCommandQueue, &xCommand, mqttexampleDEMO_TICKS_TO_WAIT ) == pdFALSE )
{
LogInfo( ( "No commands in the queue. Trying again." ) );
continue;
}
xStatus = prvProcessCommand( &xCommand );
if( xStatus != MQTTSuccess )
{
LogError( ( "MQTT operation failed with status %s\n",
MQTT_Status_strerror( xStatus ) ) );
prvCreateCommand( RECONNECT, NULL, NULL, &xNewCommand );
xCommandAdded = xQueueSendToFront( xCommandQueue, &xNewCommand, mqttexampleDEMO_TICKS_TO_WAIT );
configASSERT( xCommandAdded == pdTRUE );
}
lNumProcessed++;
if( xCommand.xCommandType == SUBSCRIBE )
{
LogDebug( ( "Sleeping for %d ms after sending SUBSCRIBE packet.", mqttexampleSUBSCRIBE_TASK_DELAY_MS ) );
vTaskDelay( mqttexampleSUBSCRIBE_TASK_DELAY_MS );
}
if( xCommand.xCommandType == TERMINATE )
{
xTerminateReceived = true;
break;
}
LogDebug( ( "Processed %d operations.", lNumProcessed ) );
}
configASSERT( xTerminateReceived == true );
LogInfo( ( "Creating Disconnect operation." ) );
MQTT_Disconnect( &globalMqttContext );
LogInfo( ( "Disconnected from broker." ) );
}
Processing commands
static MQTTStatus_t prvProcessCommand( Command_t * pxCommand )
{
MQTTStatus_t xStatus = MQTTSuccess;
uint16_t usPacketId = MQTT_PACKET_ID_INVALID;
bool xAddAckToList = false, xAckAdded = false;
BaseType_t xNetworkResult = pdFAIL;
MQTTPublishInfo_t * pxPublishInfo;
MQTTSubscribeInfo_t * pxSubscribeInfo;
switch( pxCommand->xCommandType )
{
case PROCESSLOOP:
LogDebug( ( "Running Process Loop." ) );
break;
case PUBLISH:
configASSERT( pxCommand->pxCmdContext != NULL );
pxPublishInfo = pxCommand->pxCmdContext->pxPublishInfo;
configASSERT( pxPublishInfo != NULL );
if( pxPublishInfo->qos != MQTTQoS0 )
{
usPacketId = MQTT_GetPacketId( &globalMqttContext );
}
LogDebug( ( "Publishing message to %.*s.", ( int ) pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) );
xStatus = MQTT_Publish( &globalMqttContext, pxPublishInfo, usPacketId );
pxCommand->pxCmdContext->xReturnStatus = xStatus;
xAddAckToList = ( pxPublishInfo->qos != MQTTQoS0 ) && ( xStatus == MQTTSuccess );
break;
case SUBSCRIBE:
case UNSUBSCRIBE:
configASSERT( pxCommand->pxCmdContext != NULL );
pxSubscribeInfo = pxCommand->pxCmdContext->pxSubscribeInfo;
configASSERT( pxSubscribeInfo != NULL );
configASSERT( pxSubscribeInfo->pTopicFilter != NULL );
usPacketId = MQTT_GetPacketId( &globalMqttContext );
if( pxCommand->xCommandType == SUBSCRIBE )
{
xStatus = MQTT_Subscribe( &globalMqttContext,
pxSubscribeInfo,
pxCommand->pxCmdContext->ulSubscriptionCount,
usPacketId );
}
else
{
xStatus = MQTT_Unsubscribe( &globalMqttContext,
pxSubscribeInfo,
pxCommand->pxCmdContext->ulSubscriptionCount,
usPacketId );
}
pxCommand->pxCmdContext->xReturnStatus = xStatus;
xAddAckToList = ( xStatus == MQTTSuccess );
break;
case PING:
xStatus = MQTT_Ping( &globalMqttContext );
if( pxCommand->pxCmdContext != NULL )
{
pxCommand->pxCmdContext->xReturnStatus = xStatus;
}
break;
case DISCONNECT:
xStatus = MQTT_Disconnect( &globalMqttContext );
if( pxCommand->pxCmdContext != NULL )
{
pxCommand->pxCmdContext->xReturnStatus = xStatus;
}
break;
case RECONNECT:
xNetworkResult = prvSocketDisconnect( &xNetworkContext );
configASSERT( xNetworkResult == pdPASS );
xNetworkResult = prvSocketConnect( &xNetworkContext );
configASSERT( xNetworkResult == pdPASS );
xStatus = prvMQTTConnect( &globalMqttContext, false );
break;
case TERMINATE:
LogInfo( ( "Terminating command loop." ) );
default:
break;
}
if( xAddAckToList )
{
xAckAdded = prvAddAwaitingOperation( usPacketId, pxCommand );
if( !xAckAdded )
{
LogError( ( "No memory to wait for acknowledgment for packet %u\n", usPacketId ) );
configASSERT( pxCommand->pxCmdContext != NULL );
pxCommand->pxCmdContext->xReturnStatus = MQTTNoMemory;
}
}
if( !xAckAdded )
{
if( pxCommand->vCallback != NULL )
{
pxCommand->vCallback( pxCommand->pxCmdContext );
}
}
if( ( xStatus == MQTTSuccess ) && ( globalMqttContext.connectStatus == MQTTConnected ) )
{
xStatus = MQTT_ProcessLoop( &globalMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
}
return xStatus;
}
Synchronous Publisher Task
The synchronous publisher task creates 'PUBLISH
' operations synchronously, waiting for each operation to
complete before scheduling the next one. This demo uses QoS 1 to publish messages, which means that these operations are not
considered completed until the publish acknowledgment packet has been received.
void prvSyncPublishTask( void * pvParameters )
{
( void ) pvParameters;
Command_t xCommand;
MQTTPublishInfo_t xPublishInfo = { 0 };
char payloadBuf[ mqttexampleDEMO_BUFFER_SIZE ];
char topicBuf[ mqttexampleDEMO_BUFFER_SIZE ];
CommandContext_t xContext;
uint32_t ulNotification = 0U;
BaseType_t xCommandAdded = pdTRUE;
xPublishInfo.qos = MQTTQoS1;
xPublishInfo.pTopicName = topicBuf;
xPublishInfo.pPayload = payloadBuf;
for( int i = 0; i < ( ( mqttexamplePUBLISH_COUNT + 1 ) / 2 ); i++ )
{
snprintf( payloadBuf, mqttexampleDEMO_BUFFER_SIZE, mqttexamplePUBLISH_PAYLOAD_FORMAT, "Sync", i + 1 );
xPublishInfo.payloadLength = ( uint16_t ) strlen( payloadBuf );
snprintf( topicBuf, mqttexampleDEMO_BUFFER_SIZE, mqttexamplePUBLISH_TOPIC_FORMAT_STRING, "sync", i + 1 );
xPublishInfo.topicNameLength = ( uint16_t ) strlen( topicBuf );
memset( ( void * ) &xContext, 0x00, sizeof( &xContext ) );
xContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xContext.ulNotificationBit = 1 << i;
xContext.pxPublishInfo = &xPublishInfo;
LogInfo( ( "Adding publish operation for message %s \non topic %.*s", payloadBuf, xPublishInfo.topicNameLength, xPublishInfo.pTopicName ) );
prvCreateCommand( PUBLISH, &xContext, prvCommandCallback, &xCommand );
xCommandAdded = prvAddCommandToQueue( &xCommand );
configASSERT( xCommandAdded == pdTRUE );
LogInfo( ( "Waiting for publish %d to complete.", i + 1 ) );
if( prvNotificationWaitLoop( &ulNotification, ( 1U << i ), true ) != true )
{
LogError( ( "Synchronous publish loop iteration %d"
" exceeded maximum wait time.\n", ( i + 1 ) ) );
}
configASSERT( ( ulNotification & ( 1U << i ) ) == ( 1U << i ) );
LogInfo( ( "Publish operation complete. Sleeping for %d ms.\n", mqttexamplePUBLISH_DELAY_SYNC_MS ) );
vTaskDelay( pdMS_TO_TICKS( mqttexamplePUBLISH_DELAY_SYNC_MS ) );
}
LogInfo( ( "Finished sync publishes.\n" ) );
xTaskNotifyStateClear( NULL );
ulNotification = ulTaskNotifyValueClear( NULL, ~( 0U ) );
xTaskNotify( xMainTask, mqttexamplePUBLISHER_SYNC_COMPLETE_BIT, eSetBits );
LogInfo( ( "Deleting Sync Publisher task." ) );
vTaskDelete( NULL );
}
Asynchronous Publisher Task
The asynchronous publisher does not wait for a publish to complete before it enqueues the next one. This demonstrates that
a task does not need to wait for an MQTT operation to complete before it resumes execution. Instead, it waits only when
necessary. Because each publish command requires its own context struct, this task cannot reuse a single context structure
as the synchronous publisher task does, as a previous command may still need it. Therefore, it allocates memory for each
context structure, and then waits to free all allocated memory after all messages to be published have been enqueued.
void prvAsyncPublishTask( void * pvParameters )
{
( void ) pvParameters;
Command_t xCommand;
MQTTPublishInfo_t pxPublishes[ mqttexamplePUBLISH_COUNT / 2 ];
uint32_t ulNotification = 0U;
uint32_t ulExpectedNotifications = 0U;
BaseType_t xCommandAdded = pdTRUE;
char * payloadBuffers[ mqttexamplePUBLISH_COUNT / 2 ];
char * topicBuffers[ mqttexamplePUBLISH_COUNT / 2 ];
CommandContext_t * pxContexts[ mqttexamplePUBLISH_COUNT / 2 ] = { 0 };
vTaskDelay( mqttexampleSUBSCRIBE_TASK_DELAY_MS );
for( int i = 0; i < mqttexamplePUBLISH_COUNT / 2; i++ )
{
pxContexts[ i ] = ( CommandContext_t * ) pvPortMalloc( sizeof( CommandContext_t ) );
memset( ( void * ) pxContexts[ i ], 0x00, sizeof( CommandContext_t ) );
pxContexts[ i ]->xTaskToNotify = xTaskGetCurrentTaskHandle();
pxContexts[ i ]->ulNotificationBit = 1U << i;
ulExpectedNotifications |= 1U << i;
payloadBuffers[ i ] = ( char * ) pvPortMalloc( mqttexampleDYNAMIC_BUFFER_SIZE );
topicBuffers[ i ] = ( char * ) pvPortMalloc( mqttexampleDYNAMIC_BUFFER_SIZE );
snprintf( payloadBuffers[ i ], mqttexampleDYNAMIC_BUFFER_SIZE, mqttexamplePUBLISH_PAYLOAD_FORMAT, "Async", i + 1 );
snprintf( topicBuffers[ i ], mqttexampleDYNAMIC_BUFFER_SIZE, mqttexamplePUBLISH_TOPIC_FORMAT_STRING, "async", i + 1 );
memset( &( pxPublishes[ i ] ), 0x00, sizeof( MQTTPublishInfo_t ) );
pxPublishes[ i ].pPayload = payloadBuffers[ i ];
pxPublishes[ i ].payloadLength = strlen( payloadBuffers[ i ] );
pxPublishes[ i ].pTopicName = topicBuffers[ i ];
pxPublishes[ i ].topicNameLength = ( uint16_t ) strlen( topicBuffers[ i ] );
pxPublishes[ i ].qos = MQTTQoS1;
pxContexts[ i ]->pxPublishInfo = &( pxPublishes[ i ] );
LogInfo( ( "Adding publish operation for message %s \non topic %.*s",
payloadBuffers[ i ],
pxPublishes[ i ].topicNameLength,
pxPublishes[ i ].pTopicName ) );
prvCreateCommand( PUBLISH, pxContexts[ i ], prvCommandCallback, &xCommand );
xCommandAdded = prvAddCommandToQueue( &xCommand );
configASSERT( xCommandAdded == pdTRUE );
LogInfo( ( "Publish operation queued. Sleeping for %d ms.\n", mqttexamplePUBLISH_DELAY_ASYNC_MS ) );
vTaskDelay( pdMS_TO_TICKS( mqttexamplePUBLISH_DELAY_ASYNC_MS ) );
}
LogInfo( ( "Finished async publishes.\n" ) );
( void ) prvNotificationWaitLoop( &ulNotification, ulExpectedNotifications, false );
for( int i = 0; i < mqttexamplePUBLISH_COUNT / 2; i++ )
{
configASSERT( ( ulNotification & ( 1U << i ) ) == ( 1U << i ) );
LogInfo( ( "Freeing publish context %d.", i + 1 ) );
vPortFree( pxContexts[ i ] );
vPortFree( topicBuffers[ i ] );
vPortFree( payloadBuffers[ i ] );
LogInfo( ( "Publish context %d freed.", i + 1 ) );
pxContexts[ i ] = NULL;
}
xTaskNotifyStateClear( NULL );
ulNotification = ulTaskNotifyValueClear( NULL, ~( 0U ) );
xTaskNotify( xMainTask, mqttexamplePUBLISHER_ASYNC_COMPLETE_BIT, eSetBits );
LogInfo( ( "Deleting Async Publisher task." ) );
vTaskDelete( NULL );
}
Subscriber Task
This task subscribes to a topic filter that matches all the topics of the messages published from the synchronous and
asynchronous tasks. It then waits to receive back all those published messages before it unsubscribes. This task is also
responsible for creating the 'TERMINATE
' operation that signals the main task to end the command loop.
void prvSubscribeTask( void * pvParameters )
{
( void ) pvParameters;
MQTTSubscribeInfo_t xSubscribeInfo;
Command_t xCommand;
BaseType_t xCommandAdded = pdTRUE;
MQTTPublishInfo_t * pxReceivedPublish = NULL;
uint16_t usNumReceived = 0;
uint32_t ulNotification = 0;
CommandContext_t xContext;
PublishElement_t xReceivedPublish;
uint32_t ulWaitCounter = 0;
xSubscribeInfo.qos = MQTTQoS1;
xSubscribeInfo.pTopicFilter = mqttexampleSUBSCRIBE_TOPIC_FILTER;
xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( xSubscribeInfo.pTopicFilter );
LogInfo( ( "Topic filter: %.*s", xSubscribeInfo.topicFilterLength, xSubscribeInfo.pTopicFilter ) );
memset( &xContext, 0x00, sizeof( xContext ) );
xContext.pxResponseQueue = xSubscriberResponseQueue;
xContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xContext.ulNotificationBit = mqttexampleSUBSCRIBE_COMPLETE_BIT;
xContext.pxSubscribeInfo = &xSubscribeInfo;
xContext.ulSubscriptionCount = 1;
LogInfo( ( "Adding subscribe operation" ) );
prvCreateCommand( SUBSCRIBE, &xContext, prvCommandCallback, &xCommand );
xCommandAdded = prvAddCommandToQueue( &xCommand );
configASSERT( xCommandAdded == pdTRUE );
LogInfo( ( "Waiting for subscribe operation to complete." ) );
( void ) prvNotificationWaitLoop( &ulNotification, mqttexampleSUBSCRIBE_COMPLETE_BIT, true );
configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_COMPLETE_BIT );
configASSERT( xContext.xReturnStatus == MQTTSuccess );
LogInfo( ( "Operation wait complete.\n" ) );
for( ; ; )
{
while( xQueueReceive( xSubscriberResponseQueue, &xReceivedPublish, mqttexampleDEMO_TICKS_TO_WAIT ) != pdFALSE )
{
pxReceivedPublish = &( xReceivedPublish.xPublishInfo );
pxReceivedPublish->pTopicName = ( const char * ) xReceivedPublish.pcTopicNameBuf;
pxReceivedPublish->pPayload = xReceivedPublish.pcPayloadBuf;
LogInfo( ( "Received publish on topic %.*s\nMessage payload: %.*s\n",
pxReceivedPublish->topicNameLength,
pxReceivedPublish->pTopicName,
( int ) pxReceivedPublish->payloadLength,
( const char * ) pxReceivedPublish->pPayload ) );
usNumReceived++;
ulWaitCounter = 0;
}
if( usNumReceived >= mqttexamplePUBLISH_COUNT )
{
break;
}
if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError( ( "Publish receive loop exceeded maximum wait time.\n" ) );
break;
}
LogInfo( ( "No messages queued, received %u publish%s, sleeping for %d ms\n",
usNumReceived,
( usNumReceived == 1 ) ? "" : "es",
mqttexampleSUBSCRIBE_TASK_DELAY_MS ) );
vTaskDelay( pdMS_TO_TICKS( mqttexampleSUBSCRIBE_TASK_DELAY_MS ) );
}
LogInfo( ( "Finished receiving\n" ) );
prvCreateCommand( UNSUBSCRIBE, &xContext, prvCommandCallback, &xCommand );
memset( &xContext, 0x00, sizeof( xContext ) );
xContext.pxResponseQueue = xSubscriberResponseQueue;
xContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xContext.ulNotificationBit = mqttexampleUNSUBSCRIBE_COMPLETE_BIT;
xContext.pxSubscribeInfo = &xSubscribeInfo;
xContext.ulSubscriptionCount = 1;
LogInfo( ( "Adding unsubscribe operation\n" ) );
xCommandAdded = prvAddCommandToQueue( &xCommand );
configASSERT( xCommandAdded == pdTRUE );
LogInfo( ( "Waiting for unsubscribe operation to complete." ) );
( void ) prvNotificationWaitLoop( &ulNotification, mqttexampleUNSUBSCRIBE_COMPLETE_BIT, true );
configASSERT( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) == mqttexampleUNSUBSCRIBE_COMPLETE_BIT );
LogInfo( ( "Operation wait complete.\n" ) );
LogInfo( ( "Beginning command queue termination." ) );
prvCreateCommand( TERMINATE, NULL, NULL, &xCommand );
xCommandAdded = prvAddCommandToQueue( &xCommand );
configASSERT( xCommandAdded == pdTRUE );
xTaskNotify( xMainTask, mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT, eSetBits );
LogInfo( ( "Deleting Subscriber task." ) );
vTaskDelete( NULL );
}
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.