Skip to content

Commit 76e54d8

Browse files
authored
Update Mqtt5 Library with Latest API changes (#568)
* latest mqtt5client update * update userguide with new API and changes * update doc * improve error handling * update the error handling * update doc * update aws-crt-cpp to v0.20 * clang-format * update API changes for mqtt5 shared subscription sample * update user guide with latest api
1 parent 941da12 commit 76e54d8

File tree

4 files changed

+170
-169
lines changed

4 files changed

+170
-169
lines changed

documents/MQTT5_Userguide.md

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,25 @@ SDK MQTT5 support comes from a separate client implementation. In doing so, we
7676

7777
* With the 311 implementation, there were two separate objects, a client and a connection. With MQTT5, there is only the client.
7878

79+
* The user callbacks in the Mqtt5 do not provide a client reference in the way the Mqtt3 API does.
80+
Example:
81+
```
82+
// Client reference
83+
std::shared_ptr<Mqtt5Client> client = nullptr;
84+
85+
// Create Mqtt5Client Builder
86+
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(...);
87+
88+
// Setup lifecycle callbacks
89+
builder->WithClientConnectionSuccessCallback(
90+
[&client](const Mqtt5::OnConnectionSuccessEventData &eventData) {
91+
// Do mqtt5 client operations
92+
client->Publish(...);
93+
});
94+
95+
// Build Mqtt5Client
96+
client = builder->Build();
97+
```
7998

8099

81100

@@ -133,33 +152,33 @@ The MQTT5 client emits a set of events related to state and network status chang
133152
std::promise<void> stoppedPromise;
134153
135154
// Setup lifecycle callbacks
136-
builder->withClientConnectionSuccessCallback(
137-
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionSuccessEventData &eventData) {
155+
builder->WithClientConnectionSuccessCallback(
156+
[&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
138157
fprintf(
139158
stdout,
140159
"Mqtt5 Client connection succeed, clientid: %s.\n",
141160
eventData.negotiatedSettings->getClientId().c_str());
142161
connectionPromise.set_value(true);
143162
});
144163
145-
builder->withClientConnectionFailureCallback(
146-
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionFailureEventData &eventData) {
164+
builder->WithClientConnectionFailureCallback(
165+
[&connectionPromise](const Mqtt5::OnConnectionFailureEventData &eventData) {
147166
fprintf(
148167
stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
149168
connectionPromise.set_value(false);
150169
});
151170
152-
builder->withClientStoppedCallback([&stoppedPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnStoppedEventData &) {
171+
builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) {
153172
fprintf(stdout, "Mqtt5 Client stopped.\n");
154173
stoppedPromise.set_value();
155174
});
156175
157-
builder->withClientAttemptingConnectCallback([](Mqtt5::Mqtt5Client &, const Mqtt5::OnAttemptingConnectEventData &) {
176+
builder->WithClientAttemptingConnectCallback([](Mqtt5::OnAttemptingConnectEventData &) {
158177
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
159178
});
160179
161-
builder->withClientDisconnectionCallback(
162-
[](Mqtt5::Mqtt5Client &, const Mqtt5::OnDisconnectionEventData &eventData) {
180+
builder->WithClientDisconnectionCallback(
181+
[](const Mqtt5::OnDisconnectionEventData &eventData) {
163182
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
164183
});
165184
@@ -238,7 +257,7 @@ Emitted once the client has shutdown any associated network connection and enter
238257
// Create Mqtt5Client Builder
239258
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(...);
240259

241-
builder->withPublishReceivedCallback([](Mqtt5::Mqtt5Client &, const Mqtt5::PublishReceivedEventData &eventData) {
260+
builder->WithPublishReceivedCallback([](const Mqtt5::PublishReceivedEventData &eventData) {
242261
if (eventData.publishPacket == nullptr)
243262
return;
244263
fprintf(stdout, "Publish received on topic %s:", eventData.publishPacket->getTopic().c_str());
@@ -503,7 +522,7 @@ No matter what your connection transport or authentication method is, you may co
503522
Http::HttpClientConnectionProxyOptions proxyOptions;
504523
proxyOptions.HostName = "<proxyHost>";
505524
proxyOptions.Port = <proxyPort>;
506-
builder->withHttpProxyOptions(proxyOptions);
525+
builder->WithHttpProxyOptions(proxyOptions);
507526

508527
/* You can setup other client options and lifecycle event callbacks before call builder->Build().
509528
** Once the the client get built, you could no longer update the client options or connection options
@@ -548,9 +567,9 @@ The Subscribe operation takes a description of the SUBSCRIBE packet you wish to
548567
subscriptionList.push_back(data2);
549568
subscriptionList.push_back(data3);
550569
551-
// Creaet a SubscribePacket with the subscription list. You can also use packet->withSubscription(subscription) to push_back a single subscription data.
570+
// Creaet a SubscribePacket with the subscription list. You can also use packet->WithSubscription(subscription) to push_back a single subscription data.
552571
std::shared_ptr<Mqtt5::SubscribePacket> packet = std::make_shared<SubscribePacket>();
553-
packet->withSubscriptions(subscriptionList);
572+
packet->WithSubscriptions(subscriptionList);
554573
555574
bool subSuccess = mqtt5Client->Subscribe(
556575
packet,
@@ -584,7 +603,7 @@ The Unsubscribe operation takes a description of the UNSUBSCRIBE packet you wish
584603
topics.push_back(topic1);
585604
topics.push_back(topic2);
586605
std::shared_ptr<UnsubscribePacket> unsub = std::make_shared<UnsubscribePacket>();
587-
unsub->withTopicFilters(topics);
606+
unsub->WithTopicFilters(topics);
588607
bool unsubSuccess = mqtt5Client->Unsubscribe(
589608
packet,
590609
[](std::shared_ptr<Mqtt5::Mqtt5Client>, int, std::shared_ptr<Mqtt5::UnSubAckPacket> unsuback){
@@ -621,7 +640,7 @@ If the PUBLISH was a QoS 1 publish, then the completion callback returns a PubAc
621640
std::shared_ptr<PublishPacket> publish = std::make_shared<PublishPacket>(testTopic, payload, QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
622641
623642
// Setup publish completion callback. The callback will get triggered when the pulbish completes and publish result returned from the server
624-
OnPublishCompletionHandler callback = [](std::shared_ptr<Mqtt5Client> client, int, std::shared_ptr<PublishResult> result){
643+
OnPublishCompletionHandler callback = [](int, std::shared_ptr<PublishResult> result){
625644
if(!result->wasSuccessful())
626645
{
627646
fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode());
@@ -650,3 +669,4 @@ Below are some best practices for the MQTT5 client that are recommended to follo
650669
* If you are getting unexpected disconnects when trying to connect to AWS IoT Core, make sure to check your IoT Core Thing’s policy and permissions to make sure your device is has the permissions it needs to connect!
651670
* For **Publish**, **Subscribe**, and **Unsubscribe**, you can check the reason codes in the CompletionCallbacks to see if the operation actually succeeded.
652671
* You MUST NOT perform blocking operations on any callback, or you will cause a deadlock. For example: in the on_publish_received callback, do not send a publish, and then wait for the future to complete within the callback. The Client cannot do work until your callback returns, so the thread will be stuck.
672+
* You can use `LastError()` and `ErrorDebugString(error_code)` to get the error code and error message.

samples/mqtt5/mqtt5_pubsub/main.cpp

Lines changed: 54 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,21 @@ int main(int argc, char *argv[])
3737
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
3838
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str());
3939

40+
// Check if the builder setup correctly.
4041
if (builder == nullptr)
4142
{
42-
printf("Failed to setup mqtt5 client builder.");
43+
printf(
44+
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
4345
return -1;
4446
}
4547

4648
// Setup connection options
4749
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
48-
connectOptions->withClientId(cmdData.input_clientId);
49-
builder->withConnectOptions(connectOptions);
50+
connectOptions->WithClientId(cmdData.input_clientId);
51+
builder->WithConnectOptions(connectOptions);
5052
if (cmdData.input_port != 0)
5153
{
52-
builder->withPort(static_cast<uint16_t>(cmdData.input_port));
54+
builder->WithPort(static_cast<uint16_t>(cmdData.input_port));
5355
}
5456

5557
std::promise<bool> connectionPromise;
@@ -58,37 +60,34 @@ int main(int argc, char *argv[])
5860
std::promise<bool> subscribeSuccess;
5961

6062
// Setup lifecycle callbacks
61-
builder->withClientConnectionSuccessCallback(
62-
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionSuccessEventData &eventData) {
63+
builder->WithClientConnectionSuccessCallback(
64+
[&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
6365
fprintf(
6466
stdout,
6567
"Mqtt5 Client connection succeed, clientid: %s.\n",
6668
eventData.negotiatedSettings->getClientId().c_str());
6769
connectionPromise.set_value(true);
6870
});
69-
builder->withClientConnectionFailureCallback(
70-
[&connectionPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnConnectionFailureEventData &eventData) {
71-
fprintf(
72-
stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
73-
connectionPromise.set_value(false);
74-
});
75-
builder->withClientStoppedCallback([&stoppedPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnStoppedEventData &) {
71+
builder->WithClientConnectionFailureCallback([&connectionPromise](
72+
const Mqtt5::OnConnectionFailureEventData &eventData) {
73+
fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
74+
connectionPromise.set_value(false);
75+
});
76+
builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) {
7677
fprintf(stdout, "Mqtt5 Client stopped.\n");
7778
stoppedPromise.set_value();
7879
});
79-
builder->withClientAttemptingConnectCallback([](Mqtt5::Mqtt5Client &, const Mqtt5::OnAttemptingConnectEventData &) {
80+
builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) {
8081
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
8182
});
82-
builder->withClientDisconnectionCallback(
83-
[&disconnectPromise](Mqtt5::Mqtt5Client &, const Mqtt5::OnDisconnectionEventData &eventData) {
84-
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
85-
disconnectPromise.set_value();
86-
});
83+
builder->WithClientDisconnectionCallback([&disconnectPromise](const Mqtt5::OnDisconnectionEventData &eventData) {
84+
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
85+
disconnectPromise.set_value();
86+
});
8787

8888
// This is invoked upon the receipt of a Publish on a subscribed topic.
89-
builder->withPublishReceivedCallback(
90-
[&receiveMutex, &receivedCount, &receiveSignal](
91-
Mqtt5::Mqtt5Client & /*client*/, const Mqtt5::PublishReceivedEventData &eventData) {
89+
builder->WithPublishReceivedCallback(
90+
[&receiveMutex, &receivedCount, &receiveSignal](const Mqtt5::PublishReceivedEventData &eventData) {
9291
if (eventData.publishPacket == nullptr)
9392
return;
9493

@@ -113,7 +112,8 @@ int main(int argc, char *argv[])
113112

114113
if (client == nullptr)
115114
{
116-
fprintf(stdout, "Client creation failed.\n");
115+
fprintf(
116+
stdout, "Failed to Init Mqtt5Client with error code %d: %s", LastError(), ErrorDebugString(LastError()));
117117
return -1;
118118
}
119119

@@ -127,41 +127,39 @@ int main(int argc, char *argv[])
127127
return -1;
128128
}
129129

130-
auto onSubAck =
131-
[&subscribeSuccess](
132-
std::shared_ptr<Mqtt5::Mqtt5Client>, int error_code, std::shared_ptr<Mqtt5::SubAckPacket> suback) {
133-
if (error_code != 0)
134-
{
135-
fprintf(
136-
stdout,
137-
"MQTT5 Client Subscription failed with error code: (%d)%s\n",
138-
error_code,
139-
aws_error_debug_str(error_code));
140-
subscribeSuccess.set_value(false);
141-
}
142-
if (suback != nullptr)
130+
auto onSubAck = [&subscribeSuccess](int error_code, std::shared_ptr<Mqtt5::SubAckPacket> suback) {
131+
if (error_code != 0)
132+
{
133+
fprintf(
134+
stdout,
135+
"MQTT5 Client Subscription failed with error code: (%d)%s\n",
136+
error_code,
137+
aws_error_debug_str(error_code));
138+
subscribeSuccess.set_value(false);
139+
}
140+
if (suback != nullptr)
141+
{
142+
for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes())
143143
{
144-
for (Mqtt5::SubAckReasonCode reasonCode : suback->getReasonCodes())
144+
if (reasonCode > Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR)
145145
{
146-
if (reasonCode > Mqtt5::SubAckReasonCode::AWS_MQTT5_SARC_UNSPECIFIED_ERROR)
147-
{
148-
fprintf(
149-
stdout,
150-
"MQTT5 Client Subscription failed with server error code: (%d)%s\n",
151-
reasonCode,
152-
suback->getReasonString()->c_str());
153-
subscribeSuccess.set_value(false);
154-
return;
155-
}
146+
fprintf(
147+
stdout,
148+
"MQTT5 Client Subscription failed with server error code: (%d)%s\n",
149+
reasonCode,
150+
suback->getReasonString()->c_str());
151+
subscribeSuccess.set_value(false);
152+
return;
156153
}
157154
}
158-
subscribeSuccess.set_value(true);
159-
};
155+
}
156+
subscribeSuccess.set_value(true);
157+
};
160158

161159
Mqtt5::Subscription sub1(cmdData.input_topic, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE);
162-
sub1.withNoLocal(false);
160+
sub1.WithNoLocal(false);
163161
std::shared_ptr<Mqtt5::SubscribePacket> subPacket = std::make_shared<Mqtt5::SubscribePacket>();
164-
subPacket->withSubscription(std::move(sub1));
162+
subPacket->WithSubscription(std::move(sub1));
165163

166164
if (client->Subscribe(subPacket, onSubAck))
167165
{
@@ -174,9 +172,7 @@ int main(int argc, char *argv[])
174172
* Setup publish completion callback. The callback will get triggered when the publish completes (when
175173
* the client received the PubAck from the server).
176174
*/
177-
auto onPublishComplete = [](std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client,
178-
int,
179-
std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result) {
175+
auto onPublishComplete = [](int, std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result) {
180176
if (!result->wasSuccessful())
181177
{
182178
fprintf(stdout, "Publish failed with error_code: %d", result->getErrorCode());
@@ -225,11 +221,10 @@ int main(int argc, char *argv[])
225221
// Unsubscribe from the topic.
226222
std::promise<void> unsubscribeFinishedPromise;
227223
std::shared_ptr<Mqtt5::UnsubscribePacket> unsub = std::make_shared<Mqtt5::UnsubscribePacket>();
228-
unsub->withTopicFilter(cmdData.input_topic);
229-
if (!client->Unsubscribe(
230-
unsub, [&](std::shared_ptr<Mqtt5::Mqtt5Client>, int, std::shared_ptr<Mqtt5::UnSubAckPacket>) {
231-
unsubscribeFinishedPromise.set_value();
232-
}))
224+
unsub->WithTopicFilter(cmdData.input_topic);
225+
if (!client->Unsubscribe(unsub, [&](int, std::shared_ptr<Mqtt5::UnSubAckPacket>) {
226+
unsubscribeFinishedPromise.set_value();
227+
}))
233228
{
234229
fprintf(stdout, "Unsubscription failed.\n");
235230
exit(-1);

0 commit comments

Comments
 (0)