Created
May 11, 2021 14:59
-
-
Save tomlankhorst/64ee0443565ebb20bd89631ddc240cb7 to your computer and use it in GitHub Desktop.
MQTT mosquitto demo (with fmt/7.1.3 mosquitto/2.0.10)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <csignal> | |
#include <thread> | |
#include <chrono> | |
#include <string_view> | |
#include <fmt/core.h> | |
#include <fmt/color.h> | |
#include <mosquitto.h> | |
#include <openssl/ssl.h> | |
bool exit_flag = false; | |
bool debug_log = false; | |
using namespace std::chrono_literals; | |
int main(int argc, char** argv) { | |
struct sigaction act {}; | |
act.sa_handler = [](int s){ | |
fmt::print("Interrupted\n"); | |
exit_flag = true; | |
}; | |
sigaction(SIGINT, &act, nullptr); | |
if (argc != 3 && argc != 4 && argc != 6) { | |
fmt::print("Usage: mqtt hostname port (cafile (username password))\n"); | |
return 0; | |
} | |
fmt::print("mqtt demo\n"); | |
mosquitto* mosq = nullptr; | |
int rc = MOSQ_ERR_SUCCESS; | |
// Init lib | |
mosquitto_lib_init(); | |
// Say hello | |
int major, minor, rev; | |
mosquitto_lib_version(&major, &minor, &rev); | |
fmt::print(fmt::fg(fmt::color::blue_violet), "libmosquitto {}.{}.{}\n{}\n", major, minor, rev, OPENSSL_VERSION_TEXT); | |
// Create new instance | |
mosq = mosquitto_new(nullptr, true, nullptr); | |
if (!mosq) { | |
fmt::print("Out of memory\n"); | |
return -1; | |
} | |
// Callbacks | |
mosquitto_connect_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int reason_code){ | |
fmt::print("on_connect: {}\n", mosquitto_connack_string(reason_code)); | |
if (reason_code != 0) | |
mosquitto_disconnect(mosq); | |
}); | |
mosquitto_publish_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int mid){ | |
}); | |
mosquitto_subscribe_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos){ | |
bool have_subscription = false; | |
for(auto i = 0; i < qos_count; i++){ | |
fmt::print("on_subscribe: {}, granted qos = {}\n", i, granted_qos[i]); | |
if(granted_qos[i] <= 2){ | |
have_subscription = true; | |
} | |
} | |
if (!have_subscription){ | |
/* The broker rejected all of our subscriptions, we know we only sent | |
* the one SUBSCRIBE, so there is no point remaining connected. */ | |
fmt::print(fmt::fg(fmt::color::red), "Error: All subscriptions rejected.\n"); | |
mosquitto_disconnect(mosq); | |
} | |
}); | |
if (debug_log) { | |
mosquitto_log_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int level, const char *str){ | |
fmt::print(fmt::fg(fmt::color::light_gray), "level {}: {}\n", level, str); | |
}); | |
} | |
mosquitto_message_callback_set(mosq, [](struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { | |
fmt::print(fmt::fg(fmt::color::olive), "Received on {}: {} ({})\n", msg->topic, std::string_view{(char*)msg->payload, (size_t)msg->payloadlen}, msg->qos); | |
}); | |
if (argc >= 6) { | |
rc = mosquitto_username_pw_set(mosq, argv[4], argv[5]); | |
if (rc != MOSQ_ERR_SUCCESS) { | |
mosquitto_destroy(mosq); | |
fmt::print("Error setting username and password: {}\n", mosquitto_strerror(rc)); | |
return 1; | |
} | |
} | |
if (argc >= 4) { | |
rc = mosquitto_tls_set(mosq, argv[3], nullptr, nullptr, nullptr, nullptr); | |
if (rc != MOSQ_ERR_SUCCESS) { | |
mosquitto_destroy(mosq); | |
fmt::print("Error setting TLS: {}\n", mosquitto_strerror(rc)); | |
return 1; | |
} | |
// verify server certificate | |
mosquitto_tls_opts_set(mosq, SSL_VERIFY_PEER, NULL, NULL); | |
} | |
// Connect | |
int port = std::stoi(argv[2]); | |
std::string host = argv[1]; | |
fmt::print(fmt::fg(fmt::color::gray), "Connecting to {}:{}\n", host, port); | |
rc = mosquitto_connect(mosq, host.c_str(), port, 60); | |
if(rc != MOSQ_ERR_SUCCESS){ | |
mosquitto_destroy(mosq); | |
fmt::print("Error: {}\n", mosquitto_strerror(rc)); | |
return 1; | |
} | |
rc = mosquitto_loop_start(mosq); | |
if(rc != MOSQ_ERR_SUCCESS){ | |
mosquitto_destroy(mosq); | |
fmt::print("Error: {}\n", mosquitto_strerror(rc)); | |
return 1; | |
} | |
mosquitto_subscribe(mosq, nullptr, "#", 1); | |
auto start = std::chrono::system_clock::now(); | |
while (!exit_flag) { | |
std::string payload = fmt::format("Hello from MQTT client {}", (std::chrono::system_clock::now() - start).count()); | |
rc = mosquitto_publish(mosq, nullptr, "hello/world", payload.size(), payload.c_str(), 1, false); | |
if(rc != MOSQ_ERR_SUCCESS){ | |
fmt::print("Error publishing: {}\n", mosquitto_strerror(rc)); | |
} | |
std::this_thread::sleep_for(10ms); | |
} | |
// Clean-up lib | |
mosquitto_lib_cleanup(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
gives