diff --git a/CMakeLists.txt b/CMakeLists.txt index 83e18e9..61f98e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,7 +40,7 @@ include_directories(${PROJECT_NAME} lib/ArduinoJson/src ) -file(GLOB SRCFILES +set(SRCFILES Pins.h Config.h Network.cpp diff --git a/lib/espMqttClient/docs/index.md b/lib/espMqttClient/docs/index.md index 2335054..cae13f8 100644 --- a/lib/espMqttClient/docs/index.md +++ b/lib/espMqttClient/docs/index.md @@ -503,3 +503,47 @@ void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, cons // attach callback to MQTT client mqttClient.onMessage(onMqttMessage); ``` + +### onMessage callbacks per topic + +espMqttClient allows only one callback for incoming messages. You might want to have specific ones per topic. This example shows one way on how to achieve this. + +Limitations of this code sample: only the first match is served and no wildcard topics allowed. + +```cpp +#include +#include + +// definitions of the std::map where we will store the topic/callback combinations +struct MatchTopic { + bool operator()(const char* a, const char* b) const { + return strcmp(a, b) < 0; + } +}; +std::map topicCallbacks; + +// callbacks per topic +void onTopic1(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + // received a packet on topic 1 +} +void onTopic2(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + // received a packet on topic 2 +} + +// general callback to dispatch to specific handlers +void onMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + auto it = topicCallbacks.find(topic); + if (it != topicCallbacks.end()) { + // if found, run specific callback + (it->second)(properties, topic, payload, len, index, total); + } else { + // or handle it here + } +} + +// in your Arduino setup() function: +topicCallbacks.emplace("base/topic1", onTopic1); +topicCallbacks.emplace("base/topic2", onTopic2); + +mqttClient.onMessage(onMessage); +``` diff --git a/lib/espMqttClient/examples/largepayload-esp8266/largepayload-esp8266.ino b/lib/espMqttClient/examples/largepayload-esp8266/largepayload-esp8266.ino index ce4123f..f64c9e7 100644 --- a/lib/espMqttClient/examples/largepayload-esp8266/largepayload-esp8266.ino +++ b/lib/espMqttClient/examples/largepayload-esp8266/largepayload-esp8266.ino @@ -1,5 +1,4 @@ #include -#include #include @@ -47,11 +46,13 @@ void connectToMqtt() { } void onWiFiConnect(const WiFiEventStationModeGotIP& event) { + (void) event; Serial.println("Connected to Wi-Fi."); connectToMqtt(); } void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) { + (void) event; Serial.println("Disconnected from Wi-Fi."); } diff --git a/lib/espMqttClient/examples/ota-esp8266/ota-esp8266.ino b/lib/espMqttClient/examples/ota-esp8266/ota-esp8266.ino index f82b36c..90f1326 100644 --- a/lib/espMqttClient/examples/ota-esp8266/ota-esp8266.ino +++ b/lib/espMqttClient/examples/ota-esp8266/ota-esp8266.ino @@ -1,6 +1,5 @@ #include #include -#include #include @@ -37,11 +36,13 @@ void connectToMqtt() { } void onWiFiConnect(const WiFiEventStationModeGotIP& event) { + (void) event; Serial.println("Connected to Wi-Fi."); connectToMqtt(); } void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) { + (void) event; Serial.println("Disconnected from Wi-Fi."); } @@ -109,6 +110,7 @@ void handleUpdate(const uint8_t* payload, size_t length, size_t index, size_t to } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) properties; if (strcmp(UPDATE_TOPIC, topic) != 0) { Serial.println("Topic mismatch"); return; diff --git a/lib/espMqttClient/examples/simple-esp32/simple-esp32.ino b/lib/espMqttClient/examples/simple-esp32/simple-esp32.ino index 4698587..c1bbbe5 100644 --- a/lib/espMqttClient/examples/simple-esp32/simple-esp32.ino +++ b/lib/espMqttClient/examples/simple-esp32/simple-esp32.ino @@ -1,5 +1,4 @@ #include -#include #include @@ -89,6 +88,7 @@ void onMqttUnsubscribe(uint16_t packetId) { } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) payload; Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); diff --git a/lib/espMqttClient/examples/simple-esp8266/simple-esp8266.ino b/lib/espMqttClient/examples/simple-esp8266/simple-esp8266.ino index 6eda510..2d54e12 100644 --- a/lib/espMqttClient/examples/simple-esp8266/simple-esp8266.ino +++ b/lib/espMqttClient/examples/simple-esp8266/simple-esp8266.ino @@ -1,5 +1,4 @@ #include -#include #include @@ -32,11 +31,13 @@ void connectToMqtt() { } void onWiFiConnect(const WiFiEventStationModeGotIP& event) { + (void) event; Serial.println("Connected to Wi-Fi."); connectToMqtt(); } void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) { + (void) event; Serial.println("Disconnected from Wi-Fi."); } @@ -83,6 +84,7 @@ void onMqttUnsubscribe(uint16_t packetId) { } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) payload; Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); diff --git a/lib/espMqttClient/examples/simple-linux/platformio.ini b/lib/espMqttClient/examples/simple-linux/platformio.ini index 54a9443..565336f 100644 --- a/lib/espMqttClient/examples/simple-linux/platformio.ini +++ b/lib/espMqttClient/examples/simple-linux/platformio.ini @@ -18,6 +18,7 @@ build_flags = -pthread -Wall -Wextra + -Werror [env:native] platform = native diff --git a/lib/espMqttClient/examples/simpleAsync-esp32/simpleAsync-esp32.ino b/lib/espMqttClient/examples/simpleAsync-esp32/simpleAsync-esp32.ino index afd2445..50fd5f6 100644 --- a/lib/espMqttClient/examples/simpleAsync-esp32/simpleAsync-esp32.ino +++ b/lib/espMqttClient/examples/simpleAsync-esp32/simpleAsync-esp32.ino @@ -1,5 +1,4 @@ #include -#include #include @@ -89,6 +88,7 @@ void onMqttUnsubscribe(uint16_t packetId) { } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) payload; Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); diff --git a/lib/espMqttClient/examples/simpleAsync-esp8266/simpleAsync-esp8266.ino b/lib/espMqttClient/examples/simpleAsync-esp8266/simpleAsync-esp8266.ino index 08c88b1..804caa1 100644 --- a/lib/espMqttClient/examples/simpleAsync-esp8266/simpleAsync-esp8266.ino +++ b/lib/espMqttClient/examples/simpleAsync-esp8266/simpleAsync-esp8266.ino @@ -1,5 +1,4 @@ #include -#include #include @@ -32,11 +31,13 @@ void connectToMqtt() { } void onWiFiConnect(const WiFiEventStationModeGotIP& event) { + (void) event; Serial.println("Connected to Wi-Fi."); connectToMqtt(); } void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) { + (void) event; Serial.println("Disconnected from Wi-Fi."); } @@ -83,6 +84,7 @@ void onMqttUnsubscribe(uint16_t packetId) { } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) payload; Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); diff --git a/lib/espMqttClient/examples/tls-esp32/tls-esp32.ino b/lib/espMqttClient/examples/tls-esp32/tls-esp32.ino index b759aff..ae6cebb 100644 --- a/lib/espMqttClient/examples/tls-esp32/tls-esp32.ino +++ b/lib/espMqttClient/examples/tls-esp32/tls-esp32.ino @@ -1,5 +1,4 @@ #include -#include #include @@ -16,7 +15,8 @@ const char rootCA[] = \ " add your certificate here \n" \ "-----END CERTIFICATE-----\n"; -espMqttClientSecure mqttClient; +espMqttClientSecure mqttClient(espMqttClientTypes::UseInternalTask::NO); +static TaskHandle_t taskHandle; bool reconnectMqtt = false; uint32_t lastReconnect = 0; @@ -93,6 +93,7 @@ void onMqttUnsubscribe(uint16_t packetId) { } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) payload; Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); @@ -116,6 +117,12 @@ void onMqttPublish(uint16_t packetId) { Serial.println(packetId); } +void networkingTask() { + for (;;) { + mqttClient.loop(); + } +} + void setup() { Serial.begin(115200); Serial.println(); @@ -137,6 +144,8 @@ void setup() { mqttClient.setServer(MQTT_HOST, MQTT_PORT); mqttClient.setCleanSession(true); + xTaskCreatePinnedToCore((TaskFunction_t)networkingTask, "mqttclienttask", 5120, nullptr, 1, &taskHandle, 0); + connectToWiFi(); } diff --git a/lib/espMqttClient/examples/tls-esp8266/tls-esp8266.ino b/lib/espMqttClient/examples/tls-esp8266/tls-esp8266.ino index 5be5b11..f2cb209 100644 --- a/lib/espMqttClient/examples/tls-esp8266/tls-esp8266.ino +++ b/lib/espMqttClient/examples/tls-esp8266/tls-esp8266.ino @@ -35,11 +35,13 @@ void connectToMqtt() { } void onWiFiConnect(const WiFiEventStationModeGotIP& event) { + (void) event; Serial.println("Connected to Wi-Fi."); connectToMqtt(); } void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) { + (void) event; Serial.println("Disconnected from Wi-Fi."); } @@ -86,6 +88,7 @@ void onMqttUnsubscribe(uint16_t packetId) { } void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + (void) payload; Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); diff --git a/lib/espMqttClient/library.json b/lib/espMqttClient/library.json index 24ffbbb..46f28f0 100644 --- a/lib/espMqttClient/library.json +++ b/lib/espMqttClient/library.json @@ -14,17 +14,19 @@ "type": "git", "url": "https://github.com/bertmelis/espMqttClient.git" }, - "version": "1.3.3", + "version": "1.4.1", "frameworks": "arduino", "platforms": ["espressif8266", "espressif32"], "headers": ["espMqttClient.h", "espMqttClientAsync.h"], "dependencies": [ { + "owner": "me-no-dev", "name": "ESPAsyncTCP", "version": ">=1.2.2", "platforms": "espressif8266" }, { + "owner": "me-no-dev", "name": "AsyncTCP", "version": ">=1.1.1", "platforms": "espressif32" @@ -34,4 +36,4 @@ { "libLDFMode": "deep+" } -} \ No newline at end of file +} diff --git a/lib/espMqttClient/library.properties b/lib/espMqttClient/library.properties index ea14fcf..b0b78fd 100644 --- a/lib/espMqttClient/library.properties +++ b/lib/espMqttClient/library.properties @@ -1,5 +1,5 @@ name=espMqttClient -version=1.3.3 +version=1.4.1 author=Bert Melis maintainer=Bert Melis sentence=an MQTT client for the Arduino framework for ESP8266 / ESP32 diff --git a/lib/espMqttClient/scripts/CI/platformio_esp32.ini b/lib/espMqttClient/scripts/CI/platformio_esp32.ini index 29f9ab1..326891d 100644 --- a/lib/espMqttClient/scripts/CI/platformio_esp32.ini +++ b/lib/espMqttClient/scripts/CI/platformio_esp32.ini @@ -16,3 +16,4 @@ build_flags = ;-Werror -Wall -Wextra + -Werror diff --git a/lib/espMqttClient/scripts/CI/platformio_esp8266.ini b/lib/espMqttClient/scripts/CI/platformio_esp8266.ini index 919df73..d523d27 100644 --- a/lib/espMqttClient/scripts/CI/platformio_esp8266.ini +++ b/lib/espMqttClient/scripts/CI/platformio_esp8266.ini @@ -16,3 +16,4 @@ build_flags = ;-Werror -Wall -Wextra + -Werror diff --git a/lib/espMqttClient/src/MqttClient.cpp b/lib/espMqttClient/src/MqttClient.cpp index af7f1f9..9962538 100644 --- a/lib/espMqttClient/src/MqttClient.cpp +++ b/lib/espMqttClient/src/MqttClient.cpp @@ -13,12 +13,11 @@ using espMqttClientInternals::PacketType; using espMqttClientTypes::DisconnectReason; using espMqttClientTypes::Error; +MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority, uint8_t core) #if defined(ARDUINO_ARCH_ESP32) -MqttClient::MqttClient(bool useTask, uint8_t priority, uint8_t core) -: _useTask(useTask) +: _useInternalTask(useInternalTask) , _transport(nullptr) #else -MqttClient::MqttClient() : _transport(nullptr) #endif , _onConnectCallback(nullptr) @@ -31,7 +30,7 @@ MqttClient::MqttClient() , _clientId(nullptr) , _ip() , _host(nullptr) -, _port(1183) +, _port(1883) , _useIp(false) , _keepAlive(15000) , _cleanSession(true) @@ -58,19 +57,21 @@ MqttClient::MqttClient() , _lastServerActivity(0) , _pingSent(false) , _disconnectReason(DisconnectReason::TCP_DISCONNECTED) -#if defined(ARDUINO_ARCH_ESP32) -#if ARDUHAL_LOG_LEVEL >= ARDUHAL_LOG_LEVEL_INFO +#if defined(ARDUINO_ARCH_ESP32) && ARDUHAL_LOG_LEVEL >= ARDUHAL_LOG_LEVEL_INFO , _highWaterMark(4294967295) -#endif #endif { EMC_GENERATE_CLIENTID(_generatedClientId); #if defined(ARDUINO_ARCH_ESP32) _xSemaphore = xSemaphoreCreateMutex(); EMC_SEMAPHORE_GIVE(); // release before first use - if (useTask) { + if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) { xTaskCreatePinnedToCore((TaskFunction_t)_loop, "mqttclient", EMC_TASK_STACK_SIZE, this, priority, &_taskHandle, core); } +#else + (void) useInternalTask; + (void) priority; + (void) core; #endif _clientId = _generatedClientId; } @@ -80,7 +81,7 @@ MqttClient::~MqttClient() { _clearQueue(2); #if defined(ARDUINO_ARCH_ESP32) vSemaphoreDelete(_xSemaphore); - if (_useTask) { + if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) { #if EMC_USE_WATCHDOG esp_task_wdt_delete(_taskHandle); // not sure if this is really needed #endif @@ -114,7 +115,7 @@ bool MqttClient::connect() { (uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally _clientId)) { #if defined(ARDUINO_ARCH_ESP32) - if (_useTask) { + if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) { vTaskResume(_taskHandle); } #endif @@ -197,7 +198,7 @@ void MqttClient::loop() { switch (_state) { case State::disconnected: #if defined(ARDUINO_ARCH_ESP32) - if (_useTask) { + if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) { vTaskSuspend(_taskHandle); } #endif @@ -239,6 +240,7 @@ void MqttClient::loop() { _checkOutbox(); _checkIncoming(); _checkPing(); + _checkTimeout(); } else { _state = State::disconnectingTcp1; _disconnectReason = DisconnectReason::TCP_DISCONNECTED; @@ -259,6 +261,7 @@ void MqttClient::loop() { _checkOutbox(); _checkIncoming(); _checkPing(); + _checkTimeout(); break; case State::disconnectingTcp1: _transport->stop(); @@ -316,70 +319,29 @@ void MqttClient::_checkOutbox() { } } -/* -void MqttClient::_checkOutgoing() { - EMC_SEMAPHORE_TAKE(); - Packet* packet = _outbox.getCurrent(); - - int32_t wantToWrite = 0; - int32_t written = 0; - while (packet && (wantToWrite == written)) { - // mixing signed with unsigned here but safe because of MQTT packet size limits - wantToWrite = packet->available(_bytesSent); - written = _transport->write(packet->data(_bytesSent), wantToWrite); - if (written < 0) { - emc_log_w("Write error, check connection"); - break; - } - _lastClientActivity = millis(); - _bytesSent += written; - emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType()); - if (_bytesSent == packet->size()) { - if ((packet->packetType()) == PacketType.DISCONNECT) { - _state = State::disconnectingTcp1; - _disconnectReason = DisconnectReason::USER_OK; - } - if (packet->removable()) { - _outbox.removeCurrent(); - } else { - // handle with care! millis() returns unsigned 32 bit, token is void* - packet->token = reinterpret_cast(millis()); - if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup(); - _outbox.next(); - } - packet = _outbox.getCurrent(); - _bytesSent = 0; - } - } - EMC_SEMAPHORE_GIVE(); -} -*/ - int MqttClient::_sendPacket() { EMC_SEMAPHORE_TAKE(); - Packet* packet = _outbox.getCurrent(); + OutgoingPacket* packet = _outbox.getCurrent(); int32_t wantToWrite = 0; int32_t written = 0; if (packet && (wantToWrite == written)) { // mixing signed with unsigned here but safe because of MQTT packet size limits - wantToWrite = packet->available(_bytesSent); + wantToWrite = packet->packet.available(_bytesSent); if (wantToWrite == 0) { EMC_SEMAPHORE_GIVE(); return 0; } - written = _transport->write(packet->data(_bytesSent), wantToWrite); + written = _transport->write(packet->packet.data(_bytesSent), wantToWrite); if (written < 0) { emc_log_w("Write error, check connection"); EMC_SEMAPHORE_GIVE(); return -1; } - // handle with care! millis() returns unsigned 32 bit, token is void* - static_assert(sizeof(uint32_t) <= sizeof(void*), "the size of uint32_t must be smaller than or equal to the size of a pointer"); - packet->token = reinterpret_cast(millis()); + packet->timeSent = millis(); _lastClientActivity = millis(); _bytesSent += written; - emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType()); + emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->packet.size(), packet->packet.packetType()); } EMC_SEMAPHORE_GIVE(); return written; @@ -387,17 +349,17 @@ int MqttClient::_sendPacket() { bool MqttClient::_advanceOutbox() { EMC_SEMAPHORE_TAKE(); - Packet* packet = _outbox.getCurrent(); - if (packet && _bytesSent == packet->size()) { - if ((packet->packetType()) == PacketType.DISCONNECT) { + OutgoingPacket* packet = _outbox.getCurrent(); + if (packet && _bytesSent == packet->packet.size()) { + if ((packet->packet.packetType()) == PacketType.DISCONNECT) { _state = State::disconnectingTcp1; _disconnectReason = DisconnectReason::USER_OK; } - if (packet->removable()) { + if (packet->packet.removable()) { _outbox.removeCurrent(); } else { // we already set 'dup' here, in case we have to retry - if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup(); + if ((packet->packet.packetType()) == PacketType.PUBLISH) packet->packet.setDup(); _outbox.next(); } packet = _outbox.getCurrent(); @@ -500,14 +462,17 @@ void MqttClient::_checkPing() { } void MqttClient::_checkTimeout() { - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); - if (it && _bytesSent == 0) { // check that we're not busy sending - if (millis() - *((uint32_t*)&(it.get()->token)) > _timeout) { // NOLINT(readability/casting) - // TODO(bertmelis): fix ugly casting hack + EMC_SEMAPHORE_TAKE(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + // check that we're not busy sending + // don't check when first item hasn't been sent yet + if (it && _bytesSent == 0 && it.get() != _outbox.getCurrent()) { + if (millis() - it.get()->timeSent > _timeout) { emc_log_w("Packet ack timeout, retrying"); _outbox.resetCurrent(); } } + EMC_SEMAPHORE_GIVE(); } void MqttClient::_onConnack() { @@ -545,9 +510,9 @@ void MqttClient::_onPublish() { } } else if (qos == 2) { EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); while (it) { - if ((it.get()->packetType()) == PacketType.PUBREC && it.get()->packetId() == packetId) { + if ((it.get()->packet.packetType()) == PacketType.PUBREC && it.get()->packet.packetId() == packetId) { callback = false; emc_log_e("QoS2 packet previously delivered"); break; @@ -573,12 +538,12 @@ void MqttClient::_onPuback() { bool callback = false; uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId; EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); while (it) { // PUBACKs come in the order PUBs are sent. So we only check the first PUB packet in outbox // if it doesn't match the ID, return - if ((it.get()->packetType()) == PacketType.PUBLISH) { - if (it.get()->packetId() == idToMatch) { + if ((it.get()->packet.packetType()) == PacketType.PUBLISH) { + if (it.get()->packet.packetId() == idToMatch) { callback = true; _outbox.remove(it); break; @@ -600,12 +565,12 @@ void MqttClient::_onPubrec() { bool success = false; uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId; EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); while (it) { // PUBRECs come in the order PUBs are sent. So we only check the first PUB packet in outbox // if it doesn't match the ID, return - if ((it.get()->packetType()) == PacketType.PUBLISH) { - if (it.get()->packetId() == idToMatch) { + if ((it.get()->packet.packetType()) == PacketType.PUBLISH) { + if (it.get()->packet.packetId() == idToMatch) { if (!_addPacket(PacketType.PUBREL, idToMatch)) { emc_log_e("Could not create PUBREL packet"); } @@ -628,12 +593,12 @@ void MqttClient::_onPubrel() { bool success = false; uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId; EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); while (it) { // PUBRELs come in the order PUBRECs are sent. So we only check the first PUBREC packet in outbox // if it doesn't match the ID, return - if ((it.get()->packetType()) == PacketType.PUBREC) { - if (it.get()->packetId() == idToMatch) { + if ((it.get()->packet.packetType()) == PacketType.PUBREC) { + if (it.get()->packet.packetId() == idToMatch) { if (!_addPacket(PacketType.PUBCOMP, idToMatch)) { emc_log_e("Could not create PUBCOMP packet"); } @@ -655,13 +620,13 @@ void MqttClient::_onPubrel() { void MqttClient::_onPubcomp() { bool callback = false; EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId; while (it) { // PUBCOMPs come in the order PUBRELs are sent. So we only check the first PUBREL packet in outbox // if it doesn't match the ID, return - if ((it.get()->packetType()) == PacketType.PUBREL) { - if (it.get()->packetId() == idToMatch) { + if ((it.get()->packet.packetType()) == PacketType.PUBREL) { + if (it.get()->packet.packetId() == idToMatch) { if (!_addPacket(PacketType.PUBCOMP, idToMatch)) { emc_log_e("Could not create PUBCOMP packet"); } @@ -686,9 +651,9 @@ void MqttClient::_onSuback() { bool callback = false; uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId; EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); while (it) { - if (((it.get()->packetType()) == PacketType.SUBSCRIBE) && it.get()->packetId() == idToMatch) { + if (((it.get()->packet.packetType()) == PacketType.SUBSCRIBE) && it.get()->packet.packetId() == idToMatch) { callback = true; _outbox.remove(it); break; @@ -706,10 +671,10 @@ void MqttClient::_onSuback() { void MqttClient::_onUnsuback() { bool callback = false; EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId; while (it) { - if (it.get()->packetId() == idToMatch) { + if (it.get()->packet.packetId() == idToMatch) { callback = true; _outbox.remove(it); break; @@ -727,16 +692,16 @@ void MqttClient::_onUnsuback() { void MqttClient::_clearQueue(int clearData) { emc_log_i("clearing queue (clear session: %d)", clearData); EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); + espMqttClientInternals::Outbox::Iterator it = _outbox.front(); if (clearData == 0) { // keep PUB (qos > 0, aka packetID != 0), PUBREC and PUBREL // Spec only mentions PUB and PUBREL but this lib implements method B from point 4.3.3 (Fig. 4.3) // and stores the packet id in the PUBREC packet. So we also must keep PUBREC. while (it) { - espMqttClientInternals::MQTTPacketType type = it.get()->packetType(); + espMqttClientInternals::MQTTPacketType type = it.get()->packet.packetType(); if (type == PacketType.PUBREC || type == PacketType.PUBREL || - (type == PacketType.PUBLISH && it.get()->packetId() != 0)) { + (type == PacketType.PUBLISH && it.get()->packet.packetId() != 0)) { ++it; } else { _outbox.remove(it); @@ -745,7 +710,7 @@ void MqttClient::_clearQueue(int clearData) { } else if (clearData == 1) { // keep PUB while (it) { - if (it.get()->packetType() == PacketType.PUBLISH) { + if (it.get()->packet.packetType() == PacketType.PUBLISH) { ++it; } else { _outbox.remove(it); diff --git a/lib/espMqttClient/src/MqttClient.h b/lib/espMqttClient/src/MqttClient.h index 382997c..1502fd9 100644 --- a/lib/espMqttClient/src/MqttClient.h +++ b/lib/espMqttClient/src/MqttClient.h @@ -68,12 +68,8 @@ class MqttClient { void loop(); protected: - #if defined(ARDUINO_ARCH_ESP32) - explicit MqttClient(bool useTask, uint8_t priority = 1, uint8_t core = 1); - bool _useTask; - #else - MqttClient(); - #endif + explicit MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority = 1, uint8_t core = 1); + espMqttClientTypes::UseInternalTask _useInternalTask; espMqttClientInternals::Transport* _transport; espMqttClientTypes::OnConnectCallback _onConnectCallback; @@ -130,7 +126,15 @@ class MqttClient { #endif uint8_t _rxBuffer[EMC_RX_BUFFER_SIZE]; - espMqttClientInternals::Outbox _outbox; + struct OutgoingPacket { + uint32_t timeSent; + espMqttClientInternals::Packet packet; + template + OutgoingPacket(uint32_t t, espMqttClientTypes::Error error, Args&&... args) : + timeSent(t), + packet(error, std::forward(args) ...) {} + }; + espMqttClientInternals::Outbox _outbox; size_t _bytesSent; espMqttClientInternals::Parser _parser; uint32_t _lastClientActivity; @@ -142,8 +146,8 @@ class MqttClient { template bool _addPacket(Args&&... args) { - espMqttClientTypes::Error error; - espMqttClientInternals::Outbox::Iterator it = _outbox.emplace(error, std::forward(args) ...); + espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS); + espMqttClientInternals::Outbox::Iterator it = _outbox.emplace(0, error, std::forward(args) ...); if (it && error == espMqttClientTypes::Error::SUCCESS) return true; _outbox.remove(it); return false; @@ -151,8 +155,8 @@ class MqttClient { template bool _addPacketFront(Args&&... args) { - espMqttClientTypes::Error error; - espMqttClientInternals::Outbox::Iterator it = _outbox.emplaceFront(error, std::forward(args) ...); + espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS); + espMqttClientInternals::Outbox::Iterator it = _outbox.emplaceFront(0, error, std::forward(args) ...); if (it && error == espMqttClientTypes::Error::SUCCESS) return true; _outbox.remove(it); return false; diff --git a/lib/espMqttClient/src/MqttClientSetup.h b/lib/espMqttClient/src/MqttClientSetup.h index 40c68af..73458d4 100644 --- a/lib/espMqttClient/src/MqttClientSetup.h +++ b/lib/espMqttClient/src/MqttClientSetup.h @@ -111,11 +111,6 @@ class MqttClientSetup : public MqttClient { */ protected: -#if defined(ESP32) - explicit MqttClientSetup(bool useTask, uint8_t priority = 1, uint8_t core = 1) - : MqttClient(useTask, priority, core) {} -#else - MqttClientSetup() - : MqttClient() {} -#endif + explicit MqttClientSetup(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority = 1, uint8_t core = 1) + : MqttClient(useInternalTask, priority, core) {} }; diff --git a/lib/espMqttClient/src/Packets/Packet.cpp b/lib/espMqttClient/src/Packets/Packet.cpp index 39816df..df463ef 100644 --- a/lib/espMqttClient/src/Packets/Packet.cpp +++ b/lib/espMqttClient/src/Packets/Packet.cpp @@ -66,8 +66,7 @@ Packet::Packet(espMqttClientTypes::Error& error, uint16_t willPayloadLength, uint16_t keepAlive, const char* clientId) -: token(nullptr) -, _packetId(0) +: _packetId(0) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -162,8 +161,7 @@ Packet::Packet(espMqttClientTypes::Error& error, size_t payloadLength, uint8_t qos, bool retain) -: token(nullptr) -, _packetId(packetId) +: _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -200,8 +198,7 @@ Packet::Packet(espMqttClientTypes::Error& error, size_t payloadLength, uint8_t qos, bool retain) -: token(nullptr) -, _packetId(packetId) +: _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -235,8 +232,7 @@ Packet::Packet(espMqttClientTypes::Error& error, } Packet::Packet(espMqttClientTypes::Error& error, uint16_t packetId, const char* topic, uint8_t qos) -: token(nullptr) -, _packetId(packetId) +: _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -248,8 +244,7 @@ Packet::Packet(espMqttClientTypes::Error& error, uint16_t packetId, const char* } Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type, uint16_t packetId) -: token(nullptr) -, _packetId(packetId) +: _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -276,8 +271,7 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type, uint16_t p } Packet::Packet(espMqttClientTypes::Error& error, uint16_t packetId, const char* topic) -: token(nullptr) -, _packetId(packetId) +: _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -289,8 +283,7 @@ Packet::Packet(espMqttClientTypes::Error& error, uint16_t packetId, const char* } Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type) -: token(nullptr) -, _packetId(0) +: _packetId(0) , _data(nullptr) , _size(0) , _payloadIndex(0) diff --git a/lib/espMqttClient/src/Packets/Packet.h b/lib/espMqttClient/src/Packets/Packet.h index b76aada..21d5e0a 100644 --- a/lib/espMqttClient/src/Packets/Packet.h +++ b/lib/espMqttClient/src/Packets/Packet.h @@ -33,8 +33,6 @@ class Packet { MQTTPacketType packetType() const; bool removable() const; - void* token; // native typeless variable to store any additional data - protected: uint16_t _packetId; // save as separate variable: will be accessed frequently uint8_t* _data; @@ -92,8 +90,7 @@ class Packet { const char* topic2, uint8_t qos2, Args&& ... args) - : token(nullptr) - , _packetId(packetId) + : _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) @@ -115,8 +112,7 @@ class Packet { const char* topic1, const char* topic2, Args&& ... args) - : token(nullptr) - , _packetId(packetId) + : _packetId(packetId) , _data(nullptr) , _size(0) , _payloadIndex(0) diff --git a/lib/espMqttClient/src/TypeDefs.h b/lib/espMqttClient/src/TypeDefs.h index 2e765ad..0f15360 100644 --- a/lib/espMqttClient/src/TypeDefs.h +++ b/lib/espMqttClient/src/TypeDefs.h @@ -65,4 +65,9 @@ typedef std::function OnPublishCallback; typedef std::function PayloadCallback; typedef std::function OnErrorCallback; +enum class UseInternalTask { + NO = 0, + YES = 1, +}; + } // end namespace espMqttClientTypes diff --git a/lib/espMqttClient/src/espMqttClient.cpp b/lib/espMqttClient/src/espMqttClient.cpp index bda3038..833ece1 100644 --- a/lib/espMqttClient/src/espMqttClient.cpp +++ b/lib/espMqttClient/src/espMqttClient.cpp @@ -8,26 +8,16 @@ the LICENSE file. #include "espMqttClient.h" -#if defined(ARDUINO_ARCH_ESP32) -espMqttClient::espMqttClient(bool internalTask, uint8_t priority, uint8_t core) -: MqttClientSetup(internalTask, priority, core) -, _client() { -#else +#if defined(ARDUINO_ARCH_ESP8266) espMqttClient::espMqttClient() -: _client() { -#endif +: MqttClientSetup(espMqttClientTypes::UseInternalTask::NO) +, _client() { _transport = &_client; } -#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32) -#if defined(ARDUINO_ARCH_ESP32) -espMqttClientSecure::espMqttClientSecure(bool internalTask, uint8_t priority, uint8_t core) -: MqttClientSetup(internalTask, priority, core) -, _client() { -#else espMqttClientSecure::espMqttClientSecure() -: _client() { -#endif +: MqttClientSetup(espMqttClientTypes::UseInternalTask::NO) +, _client() { _transport = &_client; } @@ -36,27 +26,6 @@ espMqttClientSecure& espMqttClientSecure::setInsecure() { return *this; } -#if defined(ARDUINO_ARCH_ESP32) -espMqttClientSecure& espMqttClientSecure::setCACert(const char* rootCA) { - _client.client.setCACert(rootCA); - return *this; -} - -espMqttClientSecure& espMqttClientSecure::setCertificate(const char* clientCa) { - _client.client.setCertificate(clientCa); - return *this; -} - -espMqttClientSecure& espMqttClientSecure::setPrivateKey(const char* privateKey) { - _client.client.setPrivateKey(privateKey); - return *this; -} - -espMqttClientSecure& espMqttClientSecure::setPreSharedKey(const char* pskIdent, const char* psKey) { - _client.client.setPreSharedKey(pskIdent, psKey); - return *this; -} -#elif defined(ARDUINO_ARCH_ESP8266) espMqttClientSecure& espMqttClientSecure::setFingerprint(const uint8_t fingerprint[20]) { _client.client.setFingerprint(fingerprint); return *this; @@ -83,4 +52,62 @@ espMqttClientSecure& espMqttClientSecure::setCertStore(CertStoreBase *certStore) } #endif +#if defined(ARDUINO_ARCH_ESP32) +espMqttClient::espMqttClient(espMqttClientTypes::UseInternalTask useInternalTask) +: MqttClientSetup(useInternalTask) +, _client() { + _transport = &_client; +} + +espMqttClient::espMqttClient(uint8_t priority, uint8_t core) +: MqttClientSetup(espMqttClientTypes::UseInternalTask::YES, priority, core) +, _client() { + _transport = &_client; +} + +espMqttClientSecure::espMqttClientSecure(espMqttClientTypes::UseInternalTask useInternalTask) +: MqttClientSetup(useInternalTask) +, _client() { + _transport = &_client; +} + +espMqttClientSecure::espMqttClientSecure(uint8_t priority, uint8_t core) +: MqttClientSetup(espMqttClientTypes::UseInternalTask::YES, priority, core) +, _client() { + _transport = &_client; +} + +espMqttClientSecure& espMqttClientSecure::setInsecure() { + _client.client.setInsecure(); + return *this; +} + +espMqttClientSecure& espMqttClientSecure::setCACert(const char* rootCA) { + _client.client.setCACert(rootCA); + return *this; +} + +espMqttClientSecure& espMqttClientSecure::setCertificate(const char* clientCa) { + _client.client.setCertificate(clientCa); + return *this; +} + +espMqttClientSecure& espMqttClientSecure::setPrivateKey(const char* privateKey) { + _client.client.setPrivateKey(privateKey); + return *this; +} + +espMqttClientSecure& espMqttClientSecure::setPreSharedKey(const char* pskIdent, const char* psKey) { + _client.client.setPreSharedKey(pskIdent, psKey); + return *this; +} + +#endif + +#if defined(__linux__) +espMqttClient::espMqttClient() +: MqttClientSetup(espMqttClientTypes::UseInternalTask::NO) +, _client() { + _transport = &_client; +} #endif diff --git a/lib/espMqttClient/src/espMqttClient.h b/lib/espMqttClient/src/espMqttClient.h index a2aba97..4e44801 100644 --- a/lib/espMqttClient/src/espMqttClient.h +++ b/lib/espMqttClient/src/espMqttClient.h @@ -20,46 +20,61 @@ the LICENSE file. #include "MqttClientSetup.h" +#if defined(ARDUINO_ARCH_ESP8266) class espMqttClient : public MqttClientSetup { public: -#if defined(ARDUINO_ARCH_ESP32) - explicit espMqttClient(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1); -#else espMqttClient(); -#endif protected: -#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32) espMqttClientInternals::ClientSync _client; -#elif defined(__linux__) - espMqttClientInternals::ClientPosix _client; -#endif }; -#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32) class espMqttClientSecure : public MqttClientSetup { public: - #if defined(ARDUINO_ARCH_ESP32) - explicit espMqttClientSecure(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1); - #else espMqttClientSecure(); - #endif espMqttClientSecure& setInsecure(); - #if defined(ARDUINO_ARCH_ESP32) - espMqttClientSecure& setCACert(const char* rootCA); - espMqttClientSecure& setCertificate(const char* clientCa); - espMqttClientSecure& setPrivateKey(const char* privateKey); - espMqttClientSecure& setPreSharedKey(const char* pskIdent, const char* psKey); - #else espMqttClientSecure& setFingerprint(const uint8_t fingerprint[20]); espMqttClientSecure& setTrustAnchors(const X509List *ta); espMqttClientSecure& setClientRSACert(const X509List *cert, const PrivateKey *sk); espMqttClientSecure& setClientECCert(const X509List *cert, const PrivateKey *sk, unsigned allowed_usages, unsigned cert_issuer_key_type); espMqttClientSecure& setCertStore(CertStoreBase *certStore); - #endif protected: espMqttClientInternals::ClientSecureSync _client; }; - +#endif + +#if defined(ARDUINO_ARCH_ESP32) +class espMqttClient : public MqttClientSetup { + public: + explicit espMqttClient(espMqttClientTypes::UseInternalTask useInternalTask); + explicit espMqttClient(uint8_t priority = 1, uint8_t core = 1); + + protected: + espMqttClientInternals::ClientSync _client; +}; + +class espMqttClientSecure : public MqttClientSetup { + public: + explicit espMqttClientSecure(espMqttClientTypes::UseInternalTask useInternalTask); + explicit espMqttClientSecure(uint8_t priority = 1, uint8_t core = 1); + espMqttClientSecure& setInsecure(); + espMqttClientSecure& setCACert(const char* rootCA); + espMqttClientSecure& setCertificate(const char* clientCa); + espMqttClientSecure& setPrivateKey(const char* privateKey); + espMqttClientSecure& setPreSharedKey(const char* pskIdent, const char* psKey); + + protected: + espMqttClientInternals::ClientSecureSync _client; +}; +#endif + +#if defined(__linux__) +class espMqttClient : public MqttClientSetup { + public: + espMqttClient(); + + protected: + espMqttClientInternals::ClientPosix _client; +}; #endif diff --git a/lib/espMqttClient/src/espMqttClientAsync.cpp b/lib/espMqttClient/src/espMqttClientAsync.cpp index bc53878..98b7f15 100644 --- a/lib/espMqttClient/src/espMqttClientAsync.cpp +++ b/lib/espMqttClient/src/espMqttClientAsync.cpp @@ -10,17 +10,10 @@ the LICENSE file. #include "espMqttClientAsync.h" -#if defined(ARDUINO_ARCH_ESP32) -espMqttClientAsync::espMqttClientAsync(uint8_t priority, uint8_t core) -: MqttClientSetup(false, priority, core) -, _clientAsync() { -#else espMqttClientAsync::espMqttClientAsync() -: _clientAsync() { -#endif +: MqttClientSetup(espMqttClientTypes::UseInternalTask::NO) +, _clientAsync() { _transport = &_clientAsync; - // _onConnectHook = reinterpret_cast(_setupClient); - // _onConnectHookArg = this; _clientAsync.client.onConnect(onConnectCb, this); _clientAsync.client.onDisconnect(onDisconnectCb, this); _clientAsync.client.onData(onDataCb, this); diff --git a/lib/espMqttClient/src/espMqttClientAsync.h b/lib/espMqttClient/src/espMqttClientAsync.h index f3f321d..1b9ed8b 100644 --- a/lib/espMqttClient/src/espMqttClientAsync.h +++ b/lib/espMqttClient/src/espMqttClientAsync.h @@ -19,11 +19,7 @@ the LICENSE file. class espMqttClientAsync : public MqttClientSetup { public: -#if defined(ARDUINO_ARCH_ESP32) - explicit espMqttClientAsync(uint8_t priority = 1, uint8_t core = 1); -#else espMqttClientAsync(); -#endif bool connect(); protected: diff --git a/networkDevices/EthLan8720Device.cpp b/networkDevices/EthLan8720Device.cpp index 24c0684..9053514 100644 --- a/networkDevices/EthLan8720Device.cpp +++ b/networkDevices/EthLan8720Device.cpp @@ -33,7 +33,7 @@ EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* preferen { Log->println(F("MQTT over TLS.")); Log->println(_ca); - _mqttClientSecure = new espMqttClientSecure(false); + _mqttClientSecure = new espMqttClientSecure(espMqttClientTypes::UseInternalTask::NO); _mqttClientSecure->setCACert(_ca); if(crtLength > 1 && keyLength > 1) // length is 1 when empty { @@ -46,7 +46,7 @@ EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* preferen } else { Log->println(F("MQTT without TLS.")); - _mqttClient = new espMqttClient(false); + _mqttClient = new espMqttClient(espMqttClientTypes::UseInternalTask::NO); } if(preferences->getBool(preference_mqtt_log_enabled)) diff --git a/networkDevices/WifiDevice.cpp b/networkDevices/WifiDevice.cpp index 5ad4df6..d614778 100644 --- a/networkDevices/WifiDevice.cpp +++ b/networkDevices/WifiDevice.cpp @@ -25,7 +25,7 @@ WifiDevice::WifiDevice(const String& hostname, Preferences* _preferences, const { Log->println(F("MQTT over TLS.")); Log->println(_ca); - _mqttClientSecure = new espMqttClientSecure(false); + _mqttClientSecure = new espMqttClientSecure(espMqttClientTypes::UseInternalTask::NO); _mqttClientSecure->setCACert(_ca); if(crtLength > 1 && keyLength > 1) // length is 1 when empty { @@ -38,7 +38,7 @@ WifiDevice::WifiDevice(const String& hostname, Preferences* _preferences, const } else { Log->println(F("MQTT without TLS.")); - _mqttClient = new espMqttClient(false); + _mqttClient = new espMqttClient(espMqttClientTypes::UseInternalTask::NO); } if(_preferences->getBool(preference_mqtt_log_enabled)) diff --git a/networkDevices/espMqttClientW5500.cpp b/networkDevices/espMqttClientW5500.cpp index aa4bd84..f4ca520 100644 --- a/networkDevices/espMqttClientW5500.cpp +++ b/networkDevices/espMqttClientW5500.cpp @@ -1,7 +1,7 @@ #include "espMqttClientW5500.h" -espMqttClientW5500::espMqttClientW5500(uint8_t priority, uint8_t core) -: MqttClientSetup(false, priority, core), +espMqttClientW5500::espMqttClientW5500() +: MqttClientSetup(espMqttClientTypes::UseInternalTask::NO), _client() { _transport = &_client; diff --git a/networkDevices/espMqttClientW5500.h b/networkDevices/espMqttClientW5500.h index 76c3520..55afc59 100644 --- a/networkDevices/espMqttClientW5500.h +++ b/networkDevices/espMqttClientW5500.h @@ -6,7 +6,7 @@ class espMqttClientW5500 : public MqttClientSetup { public: #if defined(ARDUINO_ARCH_ESP32) - explicit espMqttClientW5500(uint8_t priority = 1, uint8_t core = 1); + explicit espMqttClientW5500(); #else espMqttClient(); #endif