Merge branch 'technyon:master' into readme-update

This commit is contained in:
iranl
2024-03-03 14:51:09 +01:00
committed by GitHub
37 changed files with 667 additions and 210 deletions

View File

@@ -1,6 +1,6 @@
#pragma once #pragma once
#define NUKI_HUB_VERSION "8.32" #define NUKI_HUB_VERSION "8.33"
#define GITHUB_LATEST_RELEASE_URL "https://github.com/technyon/nuki_hub/releases/latest" #define GITHUB_LATEST_RELEASE_URL "https://github.com/technyon/nuki_hub/releases/latest"
#define GITHUB_LATEST_RELEASE_API_URL "https://api.github.com/repos/technyon/nuki_hub/releases/latest" #define GITHUB_LATEST_RELEASE_API_URL "https://api.github.com/repos/technyon/nuki_hub/releases/latest"

View File

@@ -0,0 +1,17 @@
set(COMPONENT_SRCDIRS
"src" "src/Packets" "src/Transport"
)
set(COMPONENT_ADD_INCLUDEDIRS
"src" "src/Packets" "src/Transport"
)
set(COMPONENT_REQUIRES
"arduino-esp32"
"AsyncTCP"
)
register_component()
target_compile_definitions(${COMPONENT_TARGET} PUBLIC -DESP32)
target_compile_options(${COMPONENT_TARGET} PRIVATE -fno-rtti)

View File

@@ -15,12 +15,13 @@ Aims to be a non-blocking, fully compliant MQTT 3.1.1 client.
- TCP and TCP/TLS using standard WiFiClient and WiFiClientSecure connections - TCP and TCP/TLS using standard WiFiClient and WiFiClientSecure connections
- Virtually unlimited incoming and outgoing payload sizes - Virtually unlimited incoming and outgoing payload sizes
- Readable and understandable code - Readable and understandable code
- Fully async clients available via [AsyncTCP](https://github.com/me-no-dev/AsyncTCP) or [ESPAsnycTCP](https://github.com/me-no-dev/ESPAsyncTCP) (no TLS supported) - Fully async clients available via [AsyncTCP](https://github.com/esphome/AsyncTCP) or [ESPAsnycTCP](https://github.com/esphome/ESPAsyncTCP) (no TLS supported)
- Supported platforms: - Supported platforms:
- Espressif ESP8266 and ESP32 using the Arduino framework - Espressif ESP8266 and ESP32 using the Arduino framework
- Espressif ESP32 using the ESP IDF, see [esp idf component](https://docs.espressif.com/projects/arduino-esp32/en/latest/esp-idf_component.html)
- Basic Linux compatibility*. This includes WSL on Windows - Basic Linux compatibility*. This includes WSL on Windows
> Linux compatibility is mainly for automatic testing. It relies on a quick and dirty Arduino-style `Client` with a POSIX TCP client underneath and Arduino-style `IPAddress` class. These are lacking many features needed for proper Linux support. > Linux compatibility is mainly for automatic testing. It relies on a quick and dirty Arduino-style `Client` with a POSIX TCP client underneath and Arduino-style `ClientPosixIPAddress` class. These are lacking many features needed for proper Linux support.
# Documentation # Documentation

View File

@@ -0,0 +1,3 @@
COMPONENT_ADD_INCLUDEDIRS := src
COMPONENT_SRCDIRS := src
CXXFLAGS += -fno-rtti

View File

@@ -10,7 +10,7 @@
- TCP and TCP/TLS using standard WiFiClient and WiFiClientSecure connections - TCP and TCP/TLS using standard WiFiClient and WiFiClientSecure connections
- Virtually unlimited incoming and outgoing payload sizes - Virtually unlimited incoming and outgoing payload sizes
- Readable and understandable code - Readable and understandable code
- Fully async clients available via [AsyncTCP](https://github.com/me-no-dev/AsyncTCP) or [ESPAsnycTCP](https://github.com/me-no-dev/ESPAsyncTCP) (no TLS supported) - Fully async clients available via [AsyncTCP](https://github.com/esphome/AsyncTCP) or [ESPAsnycTCP](https://github.com/esphome/ESPAsyncTCP) (no TLS supported)
- Supported platforms: - Supported platforms:
- Espressif ESP8266 and ESP32 using the Arduino framework - Espressif ESP8266 and ESP32 using the Arduino framework
- Basic Linux compatibility*. This includes WSL on Windows - Basic Linux compatibility*. This includes WSL on Windows
@@ -232,25 +232,25 @@ Add a publish acknowledged event handler. Function signature: `void(uint16_t pac
bool connected() bool connected()
``` ```
Returns if the client is currently fully connected to the broker or not. During connecting or disconnecting, it will return false. Returns `true` if the client is currently fully connected to the broker. During connecting or disconnecting, it will return `false`.
```cpp ```cpp
bool disconnected() bool disconnected()
``` ```
Returns if the client is currently disconnected to the broker or not. During disconnecting or connecting, it will return false. Returns `true` if the client is currently disconnected from the broker. During disconnecting or connecting, it will return `false`.
```cpp ```cpp
void connect() bool connect()
``` ```
Connect to the server. Start the connect procedure. Returns `true` if successful. A positive return value doesn not mean the client is already connected.
```cpp ```cpp
void disconnect(bool force = false) bool disconnect(bool force = false)
``` ```
Disconnect from the server. Start the disconnect procedure, return `true` if successful. A positive return value doesn not mean the client is already disconnected.
When disconnecting with `force` false, the client first tries to handle all the outgoing messages in the queue and disconnect cleanly afterwards. During this time, no incoming PUBLISH messages are handled. When disconnecting with `force` false, the client first tries to handle all the outgoing messages in the queue and disconnect cleanly afterwards. During this time, no incoming PUBLISH messages are handled.
- **`force`**: Whether to force the disconnection. Defaults to `false` (clean disconnection). - **`force`**: Whether to force the disconnection. Defaults to `false` (clean disconnection).
@@ -341,10 +341,20 @@ const char* getClientId() const
Retuns the client ID. Retuns the client ID.
```cpp
size_t queueSize();
```
Returns the amount of elements, regardless of type, in the queue.
# Compile time configuration # Compile time configuration
A number of constants which influence the behaviour of the client can be set at compile time. You can set these options in the `Config.h` file or pass the values as compiler flags. Because these options are compile-time constants, they are used for all instances of `espMqttClient` you create in your program. A number of constants which influence the behaviour of the client can be set at compile time. You can set these options in the `Config.h` file or pass the values as compiler flags. Because these options are compile-time constants, they are used for all instances of `espMqttClient` you create in your program.
### EMC_TX_TIMEOUT 10000
Timeout in milliseconds before a (qos > 0) message will be retransmitted.
### EMC_RX_BUFFER_SIZE 1440 ### EMC_RX_BUFFER_SIZE 1440
The client copies incoming data into a buffer before parsing. This sets the buffer size. The client copies incoming data into a buffer before parsing. This sets the buffer size.
@@ -388,6 +398,10 @@ The (maximum) length of the client ID. (Keep in mind that this is a c-string. Yo
Only used on ESP32. Sets the stack size (in words) of the MQTT client worker task. Only used on ESP32. Sets the stack size (in words) of the MQTT client worker task.
### EMC_MULTIPLE_CALLBACKS
This macro is by default not enabled so you can add a single callbacks to an event. Assigning a second will overwrite the existing callback. When enabling multiple callbacks, multiple callbacks (with uint32_t id) can be assigned. Removing is done by referencing the id.
### EMC_USE_WATCHDOG 0 ### EMC_USE_WATCHDOG 0
(ESP32 only) (ESP32 only)

View File

@@ -0,0 +1,148 @@
#include <WiFi.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
#define WIFI_PASSWORD "yourpass"
#define MQTT_HOST IPAddress(192, 168, 1, 10)
#define MQTT_PORT 1883
espMqttClient mqttClient(espMqttClientTypes::UseInternalTask::NO);
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void WiFiEvent(WiFiEvent_t event) {
Serial.printf("[WiFi-event] event: %d\n", event);
switch(event) {
case SYSTEM_EVENT_STA_GOT_IP:
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
break;
default:
break;
}
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
uint16_t packetIdSub = mqttClient.subscribe("foo/bar", 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
mqttClient.publish("foo/bar", 0, true, "test 1");
Serial.println("Publishing at QoS 0");
uint16_t packetIdPub1 = mqttClient.publish("foo/bar", 1, true, "test 2");
Serial.print("Publishing at QoS 1, packetId: ");
Serial.println(packetIdPub1);
uint16_t packetIdPub2 = mqttClient.publish("foo/bar", 2, true, "test 3");
Serial.print("Publishing at QoS 2, packetId: ");
Serial.println(packetIdPub2);
}
void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectMqtt = true;
lastReconnect = millis();
}
}
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}
void onMqttUnsubscribe(uint16_t packetId) {
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
(void) payload;
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
}
void onMqttPublish(uint16_t packetId) {
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void setup() {
Serial.begin(115200);
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
WiFi.onEvent(WiFiEvent);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
connectToWiFi();
}
void loop() {
static uint32_t currentMillis = millis();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
// We used to option not to use the internal task
// so we need to call the loop-method ourselves.
// During connecting it may block.
// Creating a separate task yourself is obviously
// also a possibility.
mqttClient.loop();
}

View File

@@ -0,0 +1,8 @@
# The following lines of boilerplate have to be in your project's
# CMakeLists in this exact order for cmake to work correctly
cmake_minimum_required(VERSION 3.5)
SET(SDKCONFIG ${CMAKE_BINARY_DIR}/sdkconfig)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
project(simple-esp32-idf)

View File

@@ -0,0 +1,3 @@
This example is for use with [Arduino as a component](https://espressif-docs.readthedocs-hosted.com/projects/arduino-esp32/en/latest/esp-idf_component.html) in the ESP-IDF framework.
Be sure to follow [this section](https://espressif-docs.readthedocs-hosted.com/projects/arduino-esp32/en/latest/esp-idf_component.html#adding-local-library) about adding libraries to your project.

View File

@@ -0,0 +1,3 @@
idf_component_register(
SRCS "main.cpp"
INCLUDE_DIRS "")

View File

@@ -0,0 +1,142 @@
#include <Arduino.h>
#include <WiFi.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
#define WIFI_PASSWORD "yourpass"
#define MQTT_HOST IPAddress(192, 168, 1, 10)
#define MQTT_PORT 1883
espMqttClient mqttClient;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void WiFiEvent(WiFiEvent_t event) {
Serial.printf("[WiFi-event] event: %d\n", event);
switch(event) {
case SYSTEM_EVENT_STA_GOT_IP:
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
break;
default:
break;
}
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
uint16_t packetIdSub = mqttClient.subscribe("foo/bar", 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
mqttClient.publish("foo/bar", 0, true, "test 1");
Serial.println("Publishing at QoS 0");
uint16_t packetIdPub1 = mqttClient.publish("foo/bar", 1, true, "test 2");
Serial.print("Publishing at QoS 1, packetId: ");
Serial.println(packetIdPub1);
uint16_t packetIdPub2 = mqttClient.publish("foo/bar", 2, true, "test 3");
Serial.print("Publishing at QoS 2, packetId: ");
Serial.println(packetIdPub2);
}
void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectMqtt = true;
lastReconnect = millis();
}
}
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}
void onMqttUnsubscribe(uint16_t packetId) {
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
(void) payload;
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
}
void onMqttPublish(uint16_t packetId) {
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void setup() {
Serial.begin(115200);
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
WiFi.onEvent(WiFiEvent);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
connectToWiFi();
}
void loop() {
static uint32_t currentMillis = millis();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -0,0 +1,39 @@
#
# Bootloader config
#
CONFIG_BOOTLOADER_COMPILER_OPTIMIZATION_SIZE=y
CONFIG_BOOTLOADER_LOG_LEVEL_NONE=y
CONFIG_BOOTLOADER_LOG_LEVEL=0
#
# Serial flasher config
#
CONFIG_ESPTOOLPY_FLASHMODE_DIO=y
CONFIG_ESPTOOLPY_FLASHMODE="dio"
CONFIG_ESPTOOLPY_FLASHFREQ_40M=y
CONFIG_ESPTOOLPY_FLASHFREQ="40m"
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y
CONFIG_ESPTOOLPY_FLASHSIZE="4MB"
#
# Partition Table
#
CONFIG_PARTITION_TABLE_CUSTOM=n
#
# Arduino Configuration
#
CONFIG_ARDUINO_VARIANT="esp32"
CONFIG_ENABLE_ARDUINO_DEPENDS=y
CONFIG_AUTOSTART_ARDUINO=y
#
# FreeRTOS
#
# 1000 require for Arduino
CONFIG_FREERTOS_HZ=1000
#ASYNC_TCP
CONFIG_ASYNC_TCP_RUN_NO_AFFINITY=y
#MBEDTLS
CONFIG_MBEDTLS_PSK_MODES=y

View File

@@ -44,6 +44,7 @@ publish KEYWORD2
clearQueue KEYWORD2 clearQueue KEYWORD2
loop KEYWORD2 loop KEYWORD2
getClientId KEYWORD2 getClientId KEYWORD2
queueSize KEYWORD2
# Structures (KEYWORD3) # Structures (KEYWORD3)
espMqttClientTypes KEYWORD3 espMqttClientTypes KEYWORD3

View File

@@ -14,21 +14,21 @@
"type": "git", "type": "git",
"url": "https://github.com/bertmelis/espMqttClient.git" "url": "https://github.com/bertmelis/espMqttClient.git"
}, },
"version": "1.4.2", "version": "1.6.0",
"frameworks": "arduino", "frameworks": "arduino",
"platforms": ["espressif8266", "espressif32"], "platforms": ["espressif8266", "espressif32"],
"headers": ["espMqttClient.h", "espMqttClientAsync.h"], "headers": ["espMqttClient.h", "espMqttClientAsync.h"],
"dependencies": [ "dependencies": [
{ {
"owner": "me-no-dev", "owner": "esphome",
"name": "ESPAsyncTCP", "name": "ESPAsyncTCP-esphome",
"version": ">=1.2.2", "version": ">=2.0.0",
"platforms": "espressif8266" "platforms": "espressif8266"
}, },
{ {
"owner": "me-no-dev", "owner": "esphome",
"name": "AsyncTCP", "name": "AsyncTCP-esphome",
"version": ">=1.1.1", "version": ">=2.1.1",
"platforms": "espressif32" "platforms": "espressif32"
} }
], ],

View File

@@ -1,5 +1,5 @@
name=espMqttClient name=espMqttClient
version=1.4.2 version=1.6.0
author=Bert Melis author=Bert Melis
maintainer=Bert Melis maintainer=Bert Melis
sentence=an MQTT client for the Arduino framework for ESP8266 / ESP32 sentence=an MQTT client for the Arduino framework for ESP8266 / ESP32

View File

@@ -19,6 +19,7 @@ build_flags =
-Wextra -Wextra
-std=c++11 -std=c++11
-pthread -pthread
-ggdb3
[env:native] [env:native]
platform = native platform = native
@@ -29,5 +30,13 @@ build_flags =
--coverage --coverage
-D EMC_RX_BUFFER_SIZE=100 -D EMC_RX_BUFFER_SIZE=100
-D EMC_TX_BUFFER_SIZE=10 -D EMC_TX_BUFFER_SIZE=10
-D EMC_MULTIPLE_CALLBACKS=1
;extra_scripts = test-coverage.py ;extra_scripts = test-coverage.py
build_type = debug build_type = debug
test_testing_command =
valgrind
--leak-check=full
--show-leak-kinds=all
--track-origins=yes
--error-exitcode=1
${platformio.build_dir}/${this.__env__}/program

View File

@@ -1,50 +0,0 @@
#!/bin/bash
# already done by workflow
#pip install -U platformio
#platformio update
#pio pkg install --global --library me-no-dev/AsyncTCP
#pio pkg install --global --library EspAsyncTCP
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m'
lines=$(find ./examples/ -maxdepth 1 -mindepth 1 -type d)
retval=0
retvalpart=0
while read line; do
if [[ "$line" != *esp8266 && "$line" != *esp32 && "$line" != *linux ]]; then
echo -e "========================== BUILDING $line =========================="
echo -e "${YELLOW}SKIPPING${NC}"
continue
fi
echo -e "========================== BUILDING $line =========================="
if [[ -e "$line/platformio.ini" ]]; then
output=$(platformio ci --lib="." --project-conf="$line/platformio.ini" $line 2>&1)
retvalpart=$?
else
if [[ "$line" == *esp8266 ]]; then
output=$(platformio ci --lib="." --project-conf="scripts/CI/platformio_esp8266.ini" $line 2>&1)
retvalpart=$?
else
output=$(platformio ci --lib="." --project-conf="scripts/CI/platformio_esp32.ini" $line 2>&1)
retvalpart=$?
fi
:
fi
if [ $retvalpart -ne 0 ]; then
echo "$output"
echo -e "Building $line ${RED}FAILED${NC}"
retval=1
else
echo -e "${GREEN}SUCCESS${NC}"
fi
done <<< "$lines"
# will be deleted together with container
#pio pkg uninstall --global --library me-no-dev/AsyncTCP
#pio pkg uninstall --global --library EspAsyncTCP
exit "$retval"

View File

@@ -1,19 +0,0 @@
; PlatformIO Project Configuration File
;
; Build options: build flags, source filter
; Upload options: custom upload port, speed and extra flags
; Library options: dependencies, extra library storages
; Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html
[env:esp32]
platform = espressif32
board = lolin32
framework = arduino
build_flags =
;-Werror
-Wall
-Wextra
-Werror

View File

@@ -1,19 +0,0 @@
; PlatformIO Project Configuration File
;
; Build options: build flags, source filter
; Upload options: custom upload port, speed and extra flags
; Library options: dependencies, extra library storages
; Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html
[env:esp8266]
platform = espressif8266
board = d1_mini
framework = arduino
build_flags =
;-Werror
-Wall
-Wextra
-Werror

View File

@@ -9,7 +9,7 @@ the LICENSE file.
#pragma once #pragma once
#ifndef EMC_TX_TIMEOUT #ifndef EMC_TX_TIMEOUT
#define EMC_TX_TIMEOUT 5000 #define EMC_TX_TIMEOUT 10000
#endif #endif
#ifndef EMC_RX_BUFFER_SIZE #ifndef EMC_RX_BUFFER_SIZE
@@ -29,7 +29,7 @@ the LICENSE file.
#endif #endif
#ifndef EMC_MIN_FREE_MEMORY #ifndef EMC_MIN_FREE_MEMORY
#define EMC_MIN_FREE_MEMORY 4096 #define EMC_MIN_FREE_MEMORY 16384
#endif #endif
#ifndef EMC_ESP8266_MULTITHREADING #ifndef EMC_ESP8266_MULTITHREADING
@@ -53,6 +53,10 @@ the LICENSE file.
#define EMC_TASK_STACK_SIZE 5120 #define EMC_TASK_STACK_SIZE 5120
#endif #endif
#ifndef EMC_MULTIPLE_CALLBACKS
#define EMC_MULTIPLE_CALLBACKS 0
#endif
#ifndef EMC_USE_WATCHDOG #ifndef EMC_USE_WATCHDOG
#define EMC_USE_WATCHDOG 0 #define EMC_USE_WATCHDOG 0
#endif #endif

View File

@@ -16,7 +16,7 @@ the LICENSE file.
#define EMC_SEMAPHORE_TAKE() xSemaphoreTake(_xSemaphore, portMAX_DELAY) #define EMC_SEMAPHORE_TAKE() xSemaphoreTake(_xSemaphore, portMAX_DELAY)
#define EMC_SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore) #define EMC_SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore)
#define EMC_GET_FREE_MEMORY() std::max(ESP.getMaxAllocHeap(), ESP.getMaxAllocPsram()) #define EMC_GET_FREE_MEMORY() std::max(ESP.getMaxAllocHeap(), ESP.getMaxAllocPsram())
#define EMC_YIELD() taskYIELD() #define EMC_YIELD() vTaskDelay(1)
#define EMC_GENERATE_CLIENTID(x) snprintf(x, EMC_CLIENTID_LENGTH, "esp32%06llx", ESP.getEfuseMac()); #define EMC_GENERATE_CLIENTID(x) snprintf(x, EMC_CLIENTID_LENGTH, "esp32%06llx", ESP.getEfuseMac());
#elif defined(ARDUINO_ARCH_ESP8266) #elif defined(ARDUINO_ARCH_ESP8266)
#include <Arduino.h> // millis(), ESP.getFreeHeap(); #include <Arduino.h> // millis(), ESP.getFreeHeap();

View File

@@ -10,6 +10,8 @@ the LICENSE file.
#if defined(ARDUINO_ARCH_ESP32) #if defined(ARDUINO_ARCH_ESP32)
#include <esp32-hal-log.h> #include <esp32-hal-log.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#if defined(DEBUG_ESP_MQTT_CLIENT) #if defined(DEBUG_ESP_MQTT_CLIENT)
// Logging is en/disabled by Arduino framework macros // Logging is en/disabled by Arduino framework macros
#define emc_log_i(...) log_i(__VA_ARGS__) #define emc_log_i(...) log_i(__VA_ARGS__)

View File

@@ -14,12 +14,8 @@ using espMqttClientTypes::DisconnectReason;
using espMqttClientTypes::Error; using espMqttClientTypes::Error;
MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority, uint8_t core) MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority, uint8_t core)
#if defined(ARDUINO_ARCH_ESP32)
: _useInternalTask(useInternalTask) : _useInternalTask(useInternalTask)
, _transport(nullptr) , _transport(nullptr)
#else
: _transport(nullptr)
#endif
, _onConnectCallback(nullptr) , _onConnectCallback(nullptr)
, _onDisconnectCallback(nullptr) , _onDisconnectCallback(nullptr)
, _onSubscribeCallback(nullptr) , _onSubscribeCallback(nullptr)
@@ -41,7 +37,7 @@ MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint
, _willPayloadLength(0) , _willPayloadLength(0)
, _willQos(0) , _willQos(0)
, _willRetain(false) , _willRetain(false)
, _timeout(10000) , _timeout(EMC_TX_TIMEOUT)
, _state(State::disconnected) , _state(State::disconnected)
, _generatedClientId{0} , _generatedClientId{0}
, _packetId(0) , _packetId(0)
@@ -101,7 +97,7 @@ bool MqttClient::disconnected() const {
} }
bool MqttClient::connect() { bool MqttClient::connect() {
bool result = true; bool result = false;
if (_state == State::disconnected) { if (_state == State::disconnected) {
EMC_SEMAPHORE_TAKE(); EMC_SEMAPHORE_TAKE();
if (_addPacketFront(_cleanSession, if (_addPacketFront(_cleanSession,
@@ -114,17 +110,17 @@ bool MqttClient::connect() {
_willPayloadLength, _willPayloadLength,
(uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally (uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally
_clientId)) { _clientId)) {
result = true;
_setState(State::connectingTcp1);
#if defined(ARDUINO_ARCH_ESP32) #if defined(ARDUINO_ARCH_ESP32)
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) { if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskResume(_taskHandle); vTaskResume(_taskHandle);
} }
#endif #endif
_state = State::connectingTcp1;
} else { } else {
EMC_SEMAPHORE_GIVE(); EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create CONNECT packet"); emc_log_e("Could not create CONNECT packet");
_onError(0, Error::OUT_OF_MEMORY); _onError(0, Error::OUT_OF_MEMORY);
result = false;
} }
EMC_SEMAPHORE_GIVE(); EMC_SEMAPHORE_GIVE();
} }
@@ -133,11 +129,11 @@ bool MqttClient::connect() {
bool MqttClient::disconnect(bool force) { bool MqttClient::disconnect(bool force) {
if (force && _state != State::disconnected && _state != State::disconnectingTcp1 && _state != State::disconnectingTcp2) { if (force && _state != State::disconnected && _state != State::disconnectingTcp1 && _state != State::disconnectingTcp2) {
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
return true; return true;
} }
if (!force && _state == State::connected) { if (!force && _state == State::connected) {
_state = State::disconnectingMqtt1; _setState(State::disconnectingMqtt1);
return true; return true;
} }
return false; return false;
@@ -151,8 +147,8 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const
#endif #endif
return 0; return 0;
} }
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE(); EMC_SEMAPHORE_TAKE();
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
if (!_addPacket(packetId, topic, payload, length, qos, retain)) { if (!_addPacket(packetId, topic, payload, length, qos, retain)) {
emc_log_e("Could not create PUBLISH packet"); emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY); _onError(packetId, Error::OUT_OF_MEMORY);
@@ -175,8 +171,8 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqt
#endif #endif
return 0; return 0;
} }
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE(); EMC_SEMAPHORE_TAKE();
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
if (!_addPacket(packetId, topic, callback, length, qos, retain)) { if (!_addPacket(packetId, topic, callback, length, qos, retain)) {
emc_log_e("Could not create PUBLISH packet"); emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY); _onError(packetId, Error::OUT_OF_MEMORY);
@@ -194,6 +190,14 @@ const char* MqttClient::getClientId() const {
return _clientId; return _clientId;
} }
size_t MqttClient::queueSize() {
size_t ret = 0;
EMC_SEMAPHORE_TAKE();
ret = _outbox.size();
EMC_SEMAPHORE_GIVE();
return ret;
}
void MqttClient::loop() { void MqttClient::loop() {
switch (_state) { switch (_state) {
case State::disconnected: case State::disconnected:
@@ -205,9 +209,9 @@ void MqttClient::loop() {
break; break;
case State::connectingTcp1: case State::connectingTcp1:
if (_useIp ? _transport->connect(_ip, _port) : _transport->connect(_host, _port)) { if (_useIp ? _transport->connect(_ip, _port) : _transport->connect(_host, _port)) {
_state = State::connectingTcp2; _setState(State::connectingTcp2);
} else { } else {
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED; _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
break; break;
} }
@@ -217,7 +221,7 @@ void MqttClient::loop() {
if (_transport->connected()) { if (_transport->connected()) {
_parser.reset(); _parser.reset();
_lastClientActivity = _lastServerActivity = millis(); _lastClientActivity = _lastServerActivity = millis();
_state = State::connectingMqtt; _setState(State::connectingMqtt);
} }
break; break;
case State::connectingMqtt: case State::connectingMqtt:
@@ -227,7 +231,7 @@ void MqttClient::loop() {
_checkIncoming(); _checkIncoming();
_checkPing(); _checkPing();
} else { } else {
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED; _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
} }
break; break;
@@ -247,7 +251,7 @@ void MqttClient::loop() {
_checkPing(); _checkPing();
_checkTimeout(); _checkTimeout();
} else { } else {
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED; _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
} }
break; break;
@@ -259,7 +263,7 @@ void MqttClient::loop() {
emc_log_e("Could not create DISCONNECT packet"); emc_log_e("Could not create DISCONNECT packet");
_onError(0, Error::OUT_OF_MEMORY); _onError(0, Error::OUT_OF_MEMORY);
} else { } else {
_state = State::disconnectingMqtt2; _setState(State::disconnectingMqtt2);
} }
} }
EMC_SEMAPHORE_GIVE(); EMC_SEMAPHORE_GIVE();
@@ -270,13 +274,13 @@ void MqttClient::loop() {
break; break;
case State::disconnectingTcp1: case State::disconnectingTcp1:
_transport->stop(); _transport->stop();
_state = State::disconnectingTcp2; _setState(State::disconnectingTcp2);
break; // keep break to accomodate async clients break; // keep break to accomodate async clients
case State::disconnectingTcp2: case State::disconnectingTcp2:
if (_transport->disconnected()) { if (_transport->disconnected()) {
_clearQueue(0); _clearQueue(0);
_bytesSent = 0; _bytesSent = 0;
_state = State::disconnected; _setState(State::disconnected);
if (_onDisconnectCallback) _onDisconnectCallback(_disconnectReason); if (_onDisconnectCallback) _onDisconnectCallback(_disconnectReason);
} }
break; break;
@@ -308,13 +312,15 @@ void MqttClient::_loop(MqttClient* c) {
} }
#endif #endif
inline void MqttClient::_setState(State newState) {
emc_log_i("state %i --> %i", static_cast<std::underlying_type<State>::type>(_state.load()), static_cast<std::underlying_type<State>::type>(newState));
_state = newState;
}
uint16_t MqttClient::_getNextPacketId() { uint16_t MqttClient::_getNextPacketId() {
uint16_t packetId = 0; ++_packetId;
EMC_SEMAPHORE_TAKE(); if (_packetId == 0) ++_packetId;
// cppcheck-suppress knownConditionTrueFalse return _packetId;
packetId = (++_packetId == 0) ? ++_packetId : _packetId;
EMC_SEMAPHORE_GIVE();
return packetId;
} }
void MqttClient::_checkOutbox() { void MqttClient::_checkOutbox() {
@@ -329,21 +335,14 @@ int MqttClient::_sendPacket() {
EMC_SEMAPHORE_TAKE(); EMC_SEMAPHORE_TAKE();
OutgoingPacket* packet = _outbox.getCurrent(); OutgoingPacket* packet = _outbox.getCurrent();
int32_t wantToWrite = 0; size_t written = 0;
int32_t written = 0; if (packet) {
if (packet && (wantToWrite == written)) { size_t wantToWrite = packet->packet.available(_bytesSent);
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->packet.available(_bytesSent);
if (wantToWrite == 0) { if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE(); EMC_SEMAPHORE_GIVE();
return 0; return 0;
} }
written = _transport->write(packet->packet.data(_bytesSent), wantToWrite); written = _transport->write(packet->packet.data(_bytesSent), wantToWrite);
if (written < 0) {
emc_log_w("Write error, check connection");
EMC_SEMAPHORE_GIVE();
return -1;
}
packet->timeSent = millis(); packet->timeSent = millis();
_lastClientActivity = millis(); _lastClientActivity = millis();
_bytesSent += written; _bytesSent += written;
@@ -358,7 +357,7 @@ bool MqttClient::_advanceOutbox() {
OutgoingPacket* packet = _outbox.getCurrent(); OutgoingPacket* packet = _outbox.getCurrent();
if (packet && _bytesSent == packet->packet.size()) { if (packet && _bytesSent == packet->packet.size()) {
if ((packet->packet.packetType()) == PacketType.DISCONNECT) { if ((packet->packet.packetType()) == PacketType.DISCONNECT) {
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::USER_OK; _disconnectReason = DisconnectReason::USER_OK;
} }
if (packet->packet.removable()) { if (packet->packet.removable()) {
@@ -388,7 +387,7 @@ void MqttClient::_checkIncoming() {
espMqttClientInternals::MQTTPacketType packetType = _parser.getPacket().fixedHeader.packetType & 0xF0; espMqttClientInternals::MQTTPacketType packetType = _parser.getPacket().fixedHeader.packetType & 0xF0;
if (_state == State::connectingMqtt && packetType != PacketType.CONNACK) { if (_state == State::connectingMqtt && packetType != PacketType.CONNACK) {
emc_log_w("Disconnecting, expected CONNACK - protocol error"); emc_log_w("Disconnecting, expected CONNACK - protocol error");
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
return; return;
} }
switch (packetType & 0xF0) { switch (packetType & 0xF0) {
@@ -426,7 +425,7 @@ void MqttClient::_checkIncoming() {
} }
} else if (result == espMqttClientInternals::ParserResult::protocolError) { } else if (result == espMqttClientInternals::ParserResult::protocolError) {
emc_log_w("Disconnecting, protocol error"); emc_log_w("Disconnecting, protocol error");
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED; _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
return; return;
} }
@@ -446,7 +445,7 @@ void MqttClient::_checkPing() {
// disconnect when server was inactive for twice the keepalive time // disconnect when server was inactive for twice the keepalive time
if (currentMillis - _lastServerActivity > 2 * _keepAlive) { if (currentMillis - _lastServerActivity > 2 * _keepAlive) {
emc_log_w("Disconnecting, server exceeded keepalive"); emc_log_w("Disconnecting, server exceeded keepalive");
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED; _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
return; return;
} }
@@ -484,7 +483,7 @@ void MqttClient::_checkTimeout() {
void MqttClient::_onConnack() { void MqttClient::_onConnack() {
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode == 0x00) { if (_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode == 0x00) {
_pingSent = false; // reset after keepalive timeout disconnect _pingSent = false; // reset after keepalive timeout disconnect
_state = State::connected; _setState(State::connected);
_advanceOutbox(); _advanceOutbox();
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent == 0) { if (_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent == 0) {
_clearQueue(1); _clearQueue(1);
@@ -493,14 +492,14 @@ void MqttClient::_onConnack() {
_onConnectCallback(_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent); _onConnectCallback(_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent);
} }
} else { } else {
_state = State::disconnectingTcp1; _setState(State::disconnectingTcp1);
// cast is safe because the parser already checked for a valid return code // cast is safe because the parser already checked for a valid return code
_disconnectReason = static_cast<DisconnectReason>(_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode); _disconnectReason = static_cast<DisconnectReason>(_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode);
} }
} }
void MqttClient::_onPublish() { void MqttClient::_onPublish() {
espMqttClientInternals::IncomingPacket p = _parser.getPacket(); const espMqttClientInternals::IncomingPacket& p = _parser.getPacket();
uint8_t qos = p.qos(); uint8_t qos = p.qos();
bool retain = p.retain(); bool retain = p.retain();
bool dup = p.dup(); bool dup = p.dup();
@@ -633,9 +632,6 @@ void MqttClient::_onPubcomp() {
// if it doesn't match the ID, return // if it doesn't match the ID, return
if ((it.get()->packet.packetType()) == PacketType.PUBREL) { if ((it.get()->packet.packetType()) == PacketType.PUBREL) {
if (it.get()->packet.packetId() == idToMatch) { if (it.get()->packet.packetId() == idToMatch) {
if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
}
callback = true; callback = true;
_outbox.remove(it); _outbox.remove(it);
break; break;

View File

@@ -65,6 +65,7 @@ class MqttClient {
uint16_t publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length); uint16_t publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true! void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
const char* getClientId() const; const char* getClientId() const;
size_t queueSize(); // No const because of mutex
void loop(); void loop();
protected: protected:
@@ -110,6 +111,7 @@ class MqttClient {
disconnectingTcp2 = 8 disconnectingTcp2 = 8
}; };
std::atomic<State> _state; std::atomic<State> _state;
inline void _setState(State newState);
private: private:
char _generatedClientId[EMC_CLIENTID_LENGTH]; char _generatedClientId[EMC_CLIENTID_LENGTH];
@@ -130,7 +132,7 @@ class MqttClient {
uint32_t timeSent; uint32_t timeSent;
espMqttClientInternals::Packet packet; espMqttClientInternals::Packet packet;
template <typename... Args> template <typename... Args>
OutgoingPacket(uint32_t t, espMqttClientTypes::Error error, Args&&... args) : OutgoingPacket(uint32_t t, espMqttClientTypes::Error& error, Args&&... args) : // NOLINT(runtime/references)
timeSent(t), timeSent(t),
packet(error, std::forward<Args>(args) ...) {} packet(error, std::forward<Args>(args) ...) {}
}; };
@@ -148,16 +150,24 @@ class MqttClient {
bool _addPacket(Args&&... args) { bool _addPacket(Args&&... args) {
espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS); espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS);
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplace(0, error, std::forward<Args>(args) ...); espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplace(0, error, std::forward<Args>(args) ...);
if (it && error == espMqttClientTypes::Error::SUCCESS) return true; if (it && error == espMqttClientTypes::Error::SUCCESS) {
return false; return true;
} else {
if (it) _outbox.remove(it);
return false;
}
} }
template <typename... Args> template <typename... Args>
bool _addPacketFront(Args&&... args) { bool _addPacketFront(Args&&... args) {
espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS); espMqttClientTypes::Error error(espMqttClientTypes::Error::SUCCESS);
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplaceFront(0, error, std::forward<Args>(args) ...); espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.emplaceFront(0, error, std::forward<Args>(args) ...);
if (it && error == espMqttClientTypes::Error::SUCCESS) return true; if (it && error == espMqttClientTypes::Error::SUCCESS) {
return false; return true;
} else {
if (it) _outbox.remove(it);
return false;
}
} }
void _checkOutbox(); void _checkOutbox();

View File

@@ -11,6 +11,11 @@ the LICENSE file.
#pragma once #pragma once
#if EMC_MULTIPLE_CALLBACKS
#include <list>
#include <utility>
#endif
#include "MqttClient.h" #include "MqttClient.h"
template <typename T> template <typename T>
@@ -73,36 +78,128 @@ class MqttClientSetup : public MqttClient {
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
T& onConnect(espMqttClientTypes::OnConnectCallback callback) { T& onConnect(espMqttClientTypes::OnConnectCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onConnectCallbacks.emplace_back(callback, id);
#else
(void) id;
_onConnectCallback = callback; _onConnectCallback = callback;
#endif
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
T& onDisconnect(espMqttClientTypes::OnDisconnectCallback callback) { T& onDisconnect(espMqttClientTypes::OnDisconnectCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onDisconnectCallbacks.emplace_back(callback, id);
#else
(void) id;
_onDisconnectCallback = callback; _onDisconnectCallback = callback;
#endif
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
T& onSubscribe(espMqttClientTypes::OnSubscribeCallback callback) { T& onSubscribe(espMqttClientTypes::OnSubscribeCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onSubscribeCallbacks.emplace_back(callback, id);
#else
(void) id;
_onSubscribeCallback = callback; _onSubscribeCallback = callback;
#endif
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
T& onUnsubscribe(espMqttClientTypes::OnUnsubscribeCallback callback) { T& onUnsubscribe(espMqttClientTypes::OnUnsubscribeCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onUnsubscribeCallbacks.emplace_back(callback, id);
#else
(void) id;
_onUnsubscribeCallback = callback; _onUnsubscribeCallback = callback;
#endif
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
T& onMessage(espMqttClientTypes::OnMessageCallback callback) { T& onMessage(espMqttClientTypes::OnMessageCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onMessageCallbacks.emplace_back(callback, id);
#else
(void) id;
_onMessageCallback = callback; _onMessageCallback = callback;
#endif
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
T& onPublish(espMqttClientTypes::OnPublishCallback callback) { T& onPublish(espMqttClientTypes::OnPublishCallback callback, uint32_t id = 0) {
#if EMC_MULTIPLE_CALLBACKS
_onPublishCallbacks.emplace_back(callback, id);
#else
(void) id;
_onPublishCallback = callback; _onPublishCallback = callback;
#endif
return static_cast<T&>(*this); return static_cast<T&>(*this);
} }
#if EMC_MULTIPLE_CALLBACKS
T& removeOnConnect(uint32_t id) {
for (auto it = _onConnectCallbacks.begin(); it != _onConnectCallbacks.end(); ++it) {
if (it->second == id) {
_onConnectCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
T& removeOnDisconnect(uint32_t id) {
for (auto it = _onDisconnectCallbacks.begin(); it != _onDisconnectCallbacks.end(); ++it) {
if (it->second == id) {
_onDisconnectCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
T& removeOnSubscribe(uint32_t id) {
for (auto it = _onSubscribeCallbacks.begin(); it != _onSubscribeCallbacks.end(); ++it) {
if (it->second == id) {
_onSubscribeCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
T& removeOnUnsubscribe(uint32_t id) {
for (auto it = _onUnsubscribeCallbacks.begin(); it != _onUnsubscribeCallbacks.end(); ++it) {
if (it->second == id) {
_onUnsubscribeCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
T& removeOnMessage(uint32_t id) {
for (auto it = _onMessageCallbacks.begin(); it != _onMessageCallbacks.end(); ++it) {
if (it->second == id) {
_onMessageCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
T& removeOnPublish(uint32_t id) {
for (auto it = _onPublishCallbacks.begin(); it != _onPublishCallbacks.end(); ++it) {
if (it->second == id) {
_onPublishCallbacks.erase(it);
break;
}
}
return static_cast<T&>(*this);
}
#endif
/* /*
T& onError(espMqttClientTypes::OnErrorCallback callback) { T& onError(espMqttClientTypes::OnErrorCallback callback) {
_onErrorCallback = callback; _onErrorCallback = callback;
@@ -112,5 +209,37 @@ class MqttClientSetup : public MqttClient {
protected: protected:
explicit MqttClientSetup(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority = 1, uint8_t core = 1) explicit MqttClientSetup(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority = 1, uint8_t core = 1)
: MqttClient(useInternalTask, priority, core) {} : MqttClient(useInternalTask, priority, core) {
#if EMC_MULTIPLE_CALLBACKS
_onConnectCallback = [this](bool sessionPresent) {
for (auto callback : _onConnectCallbacks) if (callback.first) callback.first(sessionPresent);
};
_onDisconnectCallback = [this](espMqttClientTypes::DisconnectReason reason) {
for (auto callback : _onDisconnectCallbacks) if (callback.first) callback.first(reason);
};
_onSubscribeCallback = [this](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) {
for (auto callback : _onSubscribeCallbacks) if (callback.first) callback.first(packetId, returncodes, len);
};
_onUnsubscribeCallback = [this](int16_t packetId) {
for (auto callback : _onUnsubscribeCallbacks) if (callback.first) callback.first(packetId);
};
_onMessageCallback = [this](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
for (auto callback : _onMessageCallbacks) if (callback.first) callback.first(properties, topic, payload, len, index, total);
};
_onPublishCallback = [this](uint16_t packetId) {
for (auto callback : _onPublishCallbacks) if (callback.first) callback.first(packetId);
};
#else
// empty
#endif
}
#if EMC_MULTIPLE_CALLBACKS
std::list<std::pair<espMqttClientTypes::OnConnectCallback, uint32_t>> _onConnectCallbacks;
std::list<std::pair<espMqttClientTypes::OnDisconnectCallback, uint32_t>> _onDisconnectCallbacks;
std::list<std::pair<espMqttClientTypes::OnSubscribeCallback, uint32_t>> _onSubscribeCallbacks;
std::list<std::pair<espMqttClientTypes::OnUnsubscribeCallback, uint32_t>> _onUnsubscribeCallbacks;
std::list<std::pair<espMqttClientTypes::OnMessageCallback, uint32_t>> _onMessageCallbacks;
std::list<std::pair<espMqttClientTypes::OnPublishCallback, uint32_t>> _onPublishCallbacks;
#endif
}; };

View File

@@ -163,6 +163,16 @@ class Outbox {
return false; return false;
} }
size_t size() const {
Node* n = _first;
size_t count = 0;
while (n) {
n = n->next;
++count;
}
return count;
}
private: private:
Node* _first; Node* _first;
Node* _last; Node* _last;

View File

@@ -100,7 +100,7 @@ Packet::Packet(espMqttClientTypes::Error& error,
(password ? 2 + strlen(password) : 0); (password ? 2 + strlen(password) : 0);
// allocate memory // allocate memory
if (!_allocate(remainingLength)) { if (!_allocate(remainingLength, false)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY; error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return; return;
} }
@@ -300,8 +300,8 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)
} }
bool Packet::_allocate(size_t remainingLength) { bool Packet::_allocate(size_t remainingLength, bool check) {
if (EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) { if (check && EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
emc_log_w("Packet buffer not allocated: low memory"); emc_log_w("Packet buffer not allocated: low memory");
return false; return false;
} }

View File

@@ -12,12 +12,12 @@ the LICENSE file.
#include <stddef.h> #include <stddef.h>
#include "Constants.h" #include "Constants.h"
#include "Config.h" #include "../Config.h"
#include "../TypeDefs.h" #include "../TypeDefs.h"
#include "../Helpers.h" #include "../Helpers.h"
#include "../Logging.h" #include "../Logging.h"
#include "RemainingLength.h" #include "RemainingLength.h"
#include "String.h" #include "StringUtil.h"
namespace espMqttClientInternals { namespace espMqttClientInternals {
@@ -97,7 +97,7 @@ class Packet {
, _payloadStartIndex(0) , _payloadStartIndex(0)
, _payloadEndIndex(0) , _payloadEndIndex(0)
, _getPayload(nullptr) { , _getPayload(nullptr) {
static_assert(sizeof...(Args) % 2 == 0); static_assert(sizeof...(Args) % 2 == 0, "Subscribe should be in topic/qos pairs");
size_t numberTopics = 2 + (sizeof...(Args) / 2); size_t numberTopics = 2 + (sizeof...(Args) / 2);
SubscribeItem list[numberTopics] = {topic1, qos1, topic2, qos2, args...}; SubscribeItem list[numberTopics] = {topic1, qos1, topic2, qos2, args...};
_createSubscribe(error, list, numberTopics); _createSubscribe(error, list, numberTopics);
@@ -133,7 +133,7 @@ class Packet {
private: private:
// pass remainingLength = total size - header - remainingLengthLength! // pass remainingLength = total size - header - remainingLengthLength!
bool _allocate(size_t remainingLength); bool _allocate(size_t remainingLength, bool check = true);
// fills header and returns index of next available byte in buffer // fills header and returns index of next available byte in buffer
size_t _fillPublishHeader(uint16_t packetId, size_t _fillPublishHeader(uint16_t packetId,

View File

@@ -6,7 +6,7 @@ For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file. the LICENSE file.
*/ */
#include "String.h" #include "StringUtil.h"
namespace espMqttClientInternals { namespace espMqttClientInternals {

View File

@@ -40,7 +40,7 @@ bool ClientPosix::connect(IPAddress ip, uint16_t port) {
_host.sin_addr.s_addr = htonl(uint32_t(ip)); _host.sin_addr.s_addr = htonl(uint32_t(ip));
_host.sin_port = ::htons(port); _host.sin_port = ::htons(port);
int ret = ::connect(_sockfd, (struct sockaddr *)&_host, sizeof(_host)); int ret = ::connect(_sockfd, reinterpret_cast<sockaddr*>(&_host), sizeof(_host));
if (ret < 0) { if (ret < 0) {
emc_log_e("Error connecting: %d - (%d) %s", ret, errno, strerror(errno)); emc_log_e("Error connecting: %d - (%d) %s", ret, errno, strerror(errno));

View File

@@ -43,7 +43,7 @@ class ClientPosix : public Transport {
protected: protected:
int _sockfd; int _sockfd;
struct sockaddr_in _host; sockaddr_in _host;
}; };
} // namespace espMqttClientInternals } // namespace espMqttClientInternals

View File

@@ -8,7 +8,7 @@ the LICENSE file.
#if defined(__linux__) #if defined(__linux__)
#include "IPAddress.h" #include "ClientPosixIPAddress.h"
IPAddress::IPAddress() IPAddress::IPAddress()
: _address(0) { : _address(0) {

View File

@@ -10,7 +10,7 @@ the LICENSE file.
#include <stddef.h> // size_t #include <stddef.h> // size_t
#include "IPAddress.h" #include "ClientPosixIPAddress.h"
namespace espMqttClientInternals { namespace espMqttClientInternals {

View File

@@ -7,6 +7,12 @@ void setUp() {}
void tearDown() {} void tearDown() {}
espMqttClient mqttClient; espMqttClient mqttClient;
uint32_t onConnectCbId = 1;
uint32_t onDisconnectCbId = 2;
uint32_t onSubscribeCbId = 3;
uint32_t onUnsubscribeCbId = 4;
uint32_t onMessageCbId = 5;
uint32_t onPublishCbId = 6;
std::atomic_bool exitProgram(false); std::atomic_bool exitProgram(false);
std::thread t; std::thread t;
@@ -30,7 +36,7 @@ void test_connect() {
.onConnect([&](bool sessionPresent) mutable { .onConnect([&](bool sessionPresent) mutable {
sessionPresentTest = sessionPresent; sessionPresentTest = sessionPresent;
onConnectCalledTest = true; onConnectCalledTest = true;
}); }, onConnectCbId);
mqttClient.connect(); mqttClient.connect();
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 2000) { while (millis() - start < 2000) {
@@ -44,7 +50,7 @@ void test_connect() {
TEST_ASSERT_TRUE(onConnectCalledTest); TEST_ASSERT_TRUE(onConnectCalledTest);
TEST_ASSERT_FALSE(sessionPresentTest); TEST_ASSERT_FALSE(sessionPresentTest);
mqttClient.onConnect(nullptr); mqttClient.removeOnConnect(onConnectCbId);
} }
/* /*
@@ -83,7 +89,7 @@ void test_subscribe() {
if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS0) { if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS0) {
subscribeTest = true; subscribeTest = true;
} }
}); }, onSubscribeCbId);
mqttClient.subscribe("test/test", 0); mqttClient.subscribe("test/test", 0);
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 2000) { while (millis() - start < 2000) {
@@ -96,7 +102,7 @@ void test_subscribe() {
TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_TRUE(subscribeTest); TEST_ASSERT_TRUE(subscribeTest);
mqttClient.onSubscribe(nullptr); mqttClient.removeOnSubscribe(onSubscribeCbId);
} }
/* /*
@@ -112,7 +118,7 @@ void test_publish() {
mqttClient.onPublish([&](uint16_t packetId) mutable { mqttClient.onPublish([&](uint16_t packetId) mutable {
(void) packetId; (void) packetId;
publishSendTest++; publishSendTest++;
}); }, onPublishCbId);
std::atomic<int> publishReceiveTest(0); std::atomic<int> publishReceiveTest(0);
mqttClient.onMessage([&](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) mutable { mqttClient.onMessage([&](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) mutable {
(void) properties; (void) properties;
@@ -122,7 +128,7 @@ void test_publish() {
(void) index; (void) index;
(void) total; (void) total;
publishReceiveTest++; publishReceiveTest++;
}); }, onMessageCbId);
uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0"); uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0");
uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1"); uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1");
uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2"); uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2");
@@ -138,8 +144,8 @@ void test_publish() {
TEST_ASSERT_EQUAL_INT(2, publishSendTest); TEST_ASSERT_EQUAL_INT(2, publishSendTest);
TEST_ASSERT_EQUAL_INT(3, publishReceiveTest); TEST_ASSERT_EQUAL_INT(3, publishReceiveTest);
mqttClient.onPublish(nullptr); mqttClient.removeOnPublish(onPublishCbId);
mqttClient.onMessage(nullptr); mqttClient.removeOnMessage(onMessageCbId);
} }
void test_publish_empty() { void test_publish_empty() {
@@ -147,7 +153,7 @@ void test_publish_empty() {
mqttClient.onPublish([&](uint16_t packetId) mutable { mqttClient.onPublish([&](uint16_t packetId) mutable {
(void) packetId; (void) packetId;
publishSendEmptyTest++; publishSendEmptyTest++;
}); }, onPublishCbId);
std::atomic<int> publishReceiveEmptyTest(0); std::atomic<int> publishReceiveEmptyTest(0);
mqttClient.onMessage([&](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) mutable { mqttClient.onMessage([&](const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) mutable {
(void) properties; (void) properties;
@@ -157,7 +163,7 @@ void test_publish_empty() {
(void) index; (void) index;
(void) total; (void) total;
publishReceiveEmptyTest++; publishReceiveEmptyTest++;
}); }, onMessageCbId);
uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, nullptr, 0); uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, nullptr, 0);
uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, nullptr, 0); uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, nullptr, 0);
uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, nullptr, 0); uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, nullptr, 0);
@@ -173,8 +179,8 @@ void test_publish_empty() {
TEST_ASSERT_EQUAL_INT(2, publishSendEmptyTest); TEST_ASSERT_EQUAL_INT(2, publishSendEmptyTest);
TEST_ASSERT_EQUAL_INT(3, publishReceiveEmptyTest); TEST_ASSERT_EQUAL_INT(3, publishReceiveEmptyTest);
mqttClient.onPublish(nullptr); mqttClient.removeOnPublish(onPublishCbId);
mqttClient.onMessage(nullptr); mqttClient.removeOnMessage(onMessageCbId);
} }
/* /*
@@ -195,13 +201,13 @@ void test_receive1() {
(void) index; (void) index;
(void) total; (void) total;
publishReceive1Test++; publishReceive1Test++;
}); }, onMessageCbId);
mqttClient.onSubscribe([&](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) mutable { mqttClient.onSubscribe([&](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) mutable {
(void) packetId; (void) packetId;
if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS1) { if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS1) {
mqttClient.publish("test/test", 1, false, ""); mqttClient.publish("test/test", 1, false, "");
} }
}); }, onSubscribeCbId);
mqttClient.subscribe("test/test", 1); mqttClient.subscribe("test/test", 1);
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 6000) { while (millis() - start < 6000) {
@@ -211,8 +217,8 @@ void test_receive1() {
TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_GREATER_THAN_INT(0, publishReceive1Test); TEST_ASSERT_GREATER_THAN_INT(0, publishReceive1Test);
mqttClient.onMessage(nullptr); mqttClient.removeOnMessage(onMessageCbId);
mqttClient.onSubscribe(nullptr); mqttClient.removeOnSubscribe(onSubscribeCbId);
} }
/* /*
@@ -233,13 +239,13 @@ void test_receive2() {
(void) index; (void) index;
(void) total; (void) total;
publishReceive2Test++; publishReceive2Test++;
}); }, onMessageCbId);
mqttClient.onSubscribe([&](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) mutable { mqttClient.onSubscribe([&](uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* returncodes, size_t len) mutable {
(void) packetId; (void) packetId;
if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS2) { if (len == 1 && returncodes[0] == espMqttClientTypes::SubscribeReturncode::QOS2) {
mqttClient.publish("test/test", 2, false, ""); mqttClient.publish("test/test", 2, false, "");
} }
}); }, onSubscribeCbId);
mqttClient.subscribe("test/test", 2); mqttClient.subscribe("test/test", 2);
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 6000) { while (millis() - start < 6000) {
@@ -249,8 +255,8 @@ void test_receive2() {
TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_EQUAL_INT(1, publishReceive2Test); TEST_ASSERT_EQUAL_INT(1, publishReceive2Test);
mqttClient.onMessage(nullptr); mqttClient.removeOnMessage(onMessageCbId);
mqttClient.onSubscribe(nullptr); mqttClient.removeOnSubscribe(onSubscribeCbId);
} }
@@ -265,7 +271,7 @@ void test_unsubscribe() {
mqttClient.onUnsubscribe([&](uint16_t packetId) mutable { mqttClient.onUnsubscribe([&](uint16_t packetId) mutable {
(void) packetId; (void) packetId;
unsubscribeTest = true; unsubscribeTest = true;
}); }, onUnsubscribeCbId);
mqttClient.unsubscribe("test/test"); mqttClient.unsubscribe("test/test");
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 2000) { while (millis() - start < 2000) {
@@ -278,7 +284,7 @@ void test_unsubscribe() {
TEST_ASSERT_TRUE(mqttClient.connected()); TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_TRUE(unsubscribeTest); TEST_ASSERT_TRUE(unsubscribeTest);
mqttClient.onUnsubscribe(nullptr); mqttClient.removeOnUnsubscribe(onUnsubscribeCbId);
} }
/* /*
@@ -293,7 +299,7 @@ void test_disconnect() {
mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable { mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable {
reasonTest = reason; reasonTest = reason;
onDisconnectCalled = true; onDisconnectCalled = true;
}); }, onDisconnectCbId);
mqttClient.disconnect(); mqttClient.disconnect();
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 2000) { while (millis() - start < 2000) {
@@ -307,7 +313,7 @@ void test_disconnect() {
TEST_ASSERT_EQUAL_UINT8(espMqttClientTypes::DisconnectReason::USER_OK, reasonTest); TEST_ASSERT_EQUAL_UINT8(espMqttClientTypes::DisconnectReason::USER_OK, reasonTest);
TEST_ASSERT_TRUE(mqttClient.disconnected()); TEST_ASSERT_TRUE(mqttClient.disconnected());
mqttClient.onDisconnect(nullptr); mqttClient.removeOnDisconnect(onDisconnectCbId);
} }
void test_pub_before_connect() { void test_pub_before_connect() {
@@ -320,11 +326,11 @@ void test_pub_before_connect() {
.onConnect([&](bool sessionPresent) mutable { .onConnect([&](bool sessionPresent) mutable {
sessionPresentTest = sessionPresent; sessionPresentTest = sessionPresent;
onConnectCalledTest = true; onConnectCalledTest = true;
}) }, onConnectCbId)
.onPublish([&](uint16_t packetId) mutable { .onPublish([&](uint16_t packetId) mutable {
(void) packetId; (void) packetId;
publishSendTest++; publishSendTest++;
}); }, onPublishCbId);
uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0"); uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0");
uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1"); uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1");
uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2"); uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2");
@@ -349,8 +355,8 @@ void test_pub_before_connect() {
TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos2Test); TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos2Test);
TEST_ASSERT_EQUAL_INT(2, publishSendTest); TEST_ASSERT_EQUAL_INT(2, publishSendTest);
mqttClient.onConnect(nullptr); mqttClient.removeOnConnect(onConnectCbId);
mqttClient.onPublish(nullptr); mqttClient.removeOnPublish(onPublishCbId);
} }
void final_disconnect() { void final_disconnect() {
@@ -358,7 +364,7 @@ void final_disconnect() {
mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable { mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable {
(void) reason; (void) reason;
onDisconnectCalled = true; onDisconnectCalled = true;
}); }, onDisconnectCbId);
mqttClient.disconnect(); mqttClient.disconnect();
uint32_t start = millis(); uint32_t start = millis();
while (millis() - start < 2000) { while (millis() - start < 2000) {
@@ -370,7 +376,7 @@ void final_disconnect() {
if (mqttClient.connected()) { if (mqttClient.connected()) {
mqttClient.disconnect(true); mqttClient.disconnect(true);
} }
mqttClient.onDisconnect(nullptr); mqttClient.removeOnDisconnect(onDisconnectCbId);
} }
int main() { int main() {

View File

@@ -2,7 +2,7 @@
#include <unity.h> #include <unity.h>
#include <Packets/String.h> #include <Packets/StringUtil.h>
void setUp() {} void setUp() {}
void tearDown() {} void tearDown() {}

Binary file not shown.