Merge pull request #132 from technyon/move-epsMqttClient-to-network-task

Move eps mqtt client to network task
This commit is contained in:
Jan-Ole Schümann
2023-03-03 18:03:32 +01:00
committed by GitHub
32 changed files with 615 additions and 181 deletions

View File

@@ -1,6 +1,6 @@
#pragma once
#define NUKI_HUB_VERSION "8.12-pre-1"
#define NUKI_HUB_VERSION "8.12-pre-11"
#define MQTT_QOS_LEVEL 1
#define MQTT_CLEAN_SESSIONS false

View File

@@ -56,53 +56,55 @@ void Network::setupDevice()
}
else
{
if(hardwareDetect == 1)
Log->print(F("Network device: "));
switch (hardwareDetect)
{
Log->println(F("W5500 hardware is disabled, using Wifi."));
_networkDeviceType = NetworkDeviceType::WiFi;
}
else if(hardwareDetect == 2)
{
Log->print(F("Using PIN "));
Log->print(hardwareDetectGpio);
Log->println(F(" for network device selection"));
case 1:
Log->println(F("Wifi only"));
_networkDeviceType = NetworkDeviceType::WiFi;
break;
case 2:
Log->print(F("Using PIN "));
Log->print(hardwareDetectGpio);
Log->println(F(" for network device selection"));
pinMode(hardwareDetectGpio, INPUT_PULLUP);
_networkDeviceType = digitalRead(hardwareDetectGpio) == HIGH ? NetworkDeviceType::WiFi : NetworkDeviceType::W5500;
}
else if(hardwareDetect == 3)
{
Log->println(F("W5500 on M5Stack Atom POE"));
_networkDeviceType = NetworkDeviceType::W5500;
}
else if(hardwareDetect == 4)
{
Log->println(F("Olimex ESP32-POE / ESP-POE-ISO"));
_networkDeviceType = NetworkDeviceType::LAN8720;
}
else
{
Log->println(F("Unknown hardware selected, falling back to Wifi."));
_networkDeviceType = NetworkDeviceType::WiFi;
pinMode(hardwareDetectGpio, INPUT_PULLUP);
_networkDeviceType = digitalRead(hardwareDetectGpio) == HIGH ? NetworkDeviceType::WiFi : NetworkDeviceType::W5500;
break;
case 3:
Log->println(F("W5500 on M5Stack Atom POE"));
_networkDeviceType = NetworkDeviceType::W5500;
break;
case 4:
Log->println(F("Olimex ESP32-POE / ESP-POE-ISO"));
_networkDeviceType = NetworkDeviceType::Olimex_LAN8720;
break;
case 5:
Log->println(F("WT32-ETH01"));
_networkDeviceType = NetworkDeviceType::WT32_LAN8720;
break;
default:
Log->println(F("Unknown hardware selected, falling back to Wifi."));
_networkDeviceType = NetworkDeviceType::WiFi;
break;
}
}
switch(_networkDeviceType)
switch (_networkDeviceType)
{
case NetworkDeviceType::W5500:
Log->println(F("Network device: W5500"));
_device = new W5500Device(_hostname, _preferences, hardwareDetect);
break;
case NetworkDeviceType::LAN8720:
Log->println(F("Network device: LAN8720"));
_device = new EthLan8720Device(_hostname, _preferences);
case NetworkDeviceType::Olimex_LAN8720:
_device = new EthLan8720Device(_hostname, _preferences, "Olimex (LAN8720)", ETH_PHY_ADDR, 12, ETH_PHY_MDC, ETH_PHY_MDIO, ETH_PHY_TYPE, ETH_CLOCK_GPIO17_OUT);
break;
case NetworkDeviceType::WT32_LAN8720:
_device = new EthLan8720Device(_hostname, _preferences, "WT32-ETH01", 1, 16);
break;
case NetworkDeviceType::WiFi:
Log->println(F("Network device: Builtin WiFi"));
_device = new WifiDevice(_hostname, _preferences);
break;
default:
Log->println(F("Unknown network device type, defaulting to WiFi"));
_device = new WifiDevice(_hostname, _preferences);
break;
}
@@ -357,7 +359,8 @@ bool Network::reconnect()
while(!_connectReplyReceived && millis() < timeout)
{
delay(200);
delay(50);
_device->update();
if(_keepAliveCallback != nullptr)
{
_keepAliveCallback();

View File

@@ -10,7 +10,8 @@ enum class NetworkDeviceType
{
WiFi,
W5500,
LAN8720
Olimex_LAN8720,
WT32_LAN8720
};
#define JSON_BUFFER_SIZE 1024

View File

@@ -18,6 +18,7 @@ As an alternative to Wifi, the following ESP32 modules with wired ethernet are s
[M5Stack Atom POE](https://docs.m5stack.com/en/atom/atom_poe)<br>
[Olimex ESP32-POE](https://www.olimex.com/Products/IoT/ESP32/ESP32-POE/open-source-hardware)<br>
[Olimex ESP32-POE-ISO](https://www.olimex.com/Products/IoT/ESP32/ESP32-POE-ISO/open-source-hardware)<br>
[WT32-ETH01](http://www.wireless-tag.com/portfolio/wt32-eth01/)<br>
## Installation
@@ -34,7 +35,10 @@ https://github.com/technyon/nuki_hub/releases
The firmware uses the Wifi Manager to configure the WiFi network. Power up the ESP32, a new Access Point should appear. Connect to this access point and in a browser navigate to "192.168.4.1". Use the web interface configure your Wifi network.
After configuring the Wifi, the ESP should automatically connect to your network. Use the web interface to setup the MQTT broker; just navigate to the IP-Address assigned to the ESP32 via DHCP (often found in the web interface of the internet router).<br>
To configure MQTT, enter the adress of your MQTT broker and eventually a username and a password if required. The firmware supports SSL encryption for MQTT, however most people and especially home users don't use this. In that case leave all fields about "MQTT SSL" blank.
To configure MQTT, enter the adress of your MQTT broker and eventually a username and a password if required. The firmware supports SSL encryption for MQTT, however most people and especially home users don't use this. In that case leave all fields about "MQTT SSL" blank.<br>
If a PIN has been configured using the smartphone app, it's recommended to supply this PIN to NUKI Hub.
Certain functionality is not available without configuring the PIN, like changing the config or keypad coded.
To do so, navigate to "Credentials" in the web interface. This will only supply the PIN to NUK Hub, it will on no way reconfigure the PIN on the lock.
## Pairing

View File

@@ -785,6 +785,14 @@ void WebCfgServer::buildInfoHtml(String &response)
response.concat(esp_get_free_heap_size());
response.concat("\n");
response.concat("Stack watermarks: nw: ");
response.concat(uxTaskGetStackHighWaterMark(networkTaskHandle));
response.concat(", nuki: ");
response.concat(uxTaskGetStackHighWaterMark(nukiTaskHandle));
response.concat(", pd: ");
response.concat(uxTaskGetStackHighWaterMark(presenceDetectionTaskHandle));
response.concat("\n");
response.concat("Restart reason FW: ");
response.concat(getRestartReason());
response.concat( "\n");
@@ -1105,10 +1113,11 @@ const std::vector<std::pair<String, String>> WebCfgServer::getNetworkDetectionOp
{
std::vector<std::pair<String, String>> options;
options.push_back(std::make_pair("1", "Disable W5500 (Wifi only)"));
options.push_back(std::make_pair("1", "Wifi only"));
options.push_back(std::make_pair("2", "Detect W5500 (GPIO CS=5; SCK=18; MISO=19; MOSI=23; RST=33)"));
options.push_back(std::make_pair("3", "M5Stack Atom POE (W5500)"));
options.push_back(std::make_pair("4", "Olimex ESP32-POE / ESP-POE-ISO"));
options.push_back(std::make_pair("5", "WT32-ETH01"));
return options;
}

View File

@@ -7,6 +7,10 @@
#include "NukiOpenerWrapper.h"
#include "Ota.h"
extern TaskHandle_t networkTaskHandle;
extern TaskHandle_t nukiTaskHandle;
extern TaskHandle_t presenceDetectionTaskHandle;
enum class TokenType
{
None,

View File

@@ -66,7 +66,7 @@ espMqttClientAsync()
```
Instantiate a new espMqttClient or espMqttSecure object.
On ESP32, two optional parameters are available: `espMqttClient(uint8_t priority = 1, uint8_t core = 1)`. This will change the priority of the MQTT client task and the core on which it runs (higher priority = more cpu-time).
On ESP32, three optional parameters are available: `espMqttClient(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1)`. By default, espMqttclient creates its own task to manage TCP. By setting `internalTask` to false, no task will be created and you will be responsible yourself to call `espMqttClient.loop()`. `priority` changes the priority of the MQTT client task and the core on which it runs (higher priority = more cpu-time).
For the asynchronous version, use `espMqttClientAsync`.
@@ -150,6 +150,15 @@ Set the server.
- **`host`**: Host of the server, expects a null-terminated char array (c-string)
- **`port`**: Port of the server
```cpp
espMqttClient& setTimeout(uint16_t timeout)
```
Set the timeout for packets that need acknowledgement. Defaults to 10 seconds.
When no acknowledgement has been received from the broker after sending a packet, the client will retransmit **all** the packets in the queue.
* **`timeout`**: Timeout in seconds
#### Options for TLS connections
All common options from WiFiClientSecure to setup an encrypted connection are made available. These include:
@@ -312,17 +321,19 @@ Publish a packet with a callback for payload handling. Return the packet ID (or
The callback has the following signature: `size_t callback(uint8_t* data, size_t maxSize, size_t index)`. When the library needs payload data, the callback will be invoked. It is the callback's job to write data indo `data` with a maximum of `maxSize` bytes, according the `index` and return the amount of bytes written.
```cpp
void clearQueue()
void clearQueue(bool deleteSessionData = false)
```
When disconnected, clears all queued messages.
Keep in mind that this also deletes any session data and therefore is no MQTT compliant.
Clears all queued messages.
Keep in mind that this may also delete any session data and therefore is not MQTT compliant.
- **`deleteSessionData`**: When true, delete all outgoing messages. Not MQTT compliant!
```cpp
void loop()
```
This is the worker function of the MQTT client. For ESP8266 you must call this function in the Arduino loop. For ESP32 this function is only used internally and is not available in the API.
This is the worker function of the MQTT client. For ESP8266 you must call this function in the Arduino loop. For ESP32 you have to call this function yourself **only if you have disabled the internal task** (see the constructors).
```cpp
const char* getClientId() const
@@ -361,12 +372,19 @@ Set this to 1 if you use the async version on ESP8266. For the regular client th
### EMC_ALLOW_NOT_CONNECTED_PUBLISH 1
By default, you can publish when the client is not connected. If you don't want this, set this to 0.
Regardless of this setting, after you called `disconnect()`, no messages can be published until fully disconnected.
### EMC_WAIT_FOR_CONNACK 1
espMqttClient waits for the CONNACK (connection acknowledge) packet before starting to send other packets.
The MQTT specification allows to start sending before the broker acknowledges the connection but some brokers
don't allow this (AWS for example doesn't).
### EMC_CLIENTID_LENGTH 18 + 1
The (maximum) length of the client ID. (Keep in mind that this is a c-string. You need to have 1 position available for the null-termination.)
### EMC_TASK_STACK_SIZE 5000
### EMC_TASK_STACK_SIZE 5120
Only used on ESP32. Sets the stack size (in words) of the MQTT client worker task.

View File

@@ -1,5 +1,6 @@
#include <ESP8266WiFi.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
@@ -11,7 +12,8 @@
WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
espMqttClient mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
size_t fetchPayload(uint8_t* dest, size_t len, size_t index) {
Serial.printf("filling buffer at index %zu\n", index);
@@ -35,7 +37,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
@@ -45,8 +53,6 @@ void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) {
Serial.println("Disconnected from Wi-Fi.");
reconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
reconnectTimer.once(5, connectToWiFi);
}
void onMqttConnect(bool sessionPresent) {
@@ -60,7 +66,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -75,6 +82,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
wifiConnectHandler = WiFi.onStationModeGotIP(onWiFiConnect);
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWiFiDisconnect);
@@ -87,5 +96,10 @@ void setup() {
}
void loop() {
static uint32_t currentMillis = millis();
mqttClient.loop();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -1,6 +1,7 @@
#include <ESP8266WiFi.h>
#include <Updater.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
@@ -14,7 +15,8 @@
WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
espMqttClient mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
bool disconnectFlag = false;
bool restartFlag = false;
@@ -25,7 +27,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
@@ -35,8 +43,6 @@ void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) {
Serial.println("Disconnected from Wi-Fi.");
reconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
reconnectTimer.once(5, connectToWiFi);
}
void onMqttConnect(bool sessionPresent) {
@@ -57,7 +63,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
}
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -114,6 +121,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
wifiConnectHandler = WiFi.onStationModeGotIP(onWiFiConnect);
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWiFiDisconnect);
@@ -127,16 +136,22 @@ void setup() {
}
void loop() {
mqttClient.loop();
if (disconnectFlag) {
// it's safe to call this multiple times
mqttClient.disconnect();
}
if (restartFlag) {
Serial.println("Rebooting... See you next time!");
Serial.flush();
ESP.reset();
}
static uint32_t currentMillis = millis();
mqttClient.loop();
if (!disconnectFlag && reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
if (disconnectFlag) {
// it's safe to call this multiple times
mqttClient.disconnect();
}
}

View File

@@ -1,5 +1,6 @@
#include <WiFi.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
@@ -9,7 +10,8 @@
#define MQTT_PORT 1883
espMqttClient mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
@@ -18,7 +20,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void WiFiEvent(WiFiEvent_t event) {
@@ -32,7 +40,6 @@ void WiFiEvent(WiFiEvent_t event) {
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
reconnectTimer.once(5, connectToWiFi);
break;
default:
break;
@@ -60,7 +67,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -109,6 +117,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
WiFi.onEvent(WiFiEvent);
mqttClient.onConnect(onMqttConnect);
@@ -123,5 +133,9 @@ void setup() {
}
void loop() {
// nothing to do here
static uint32_t currentMillis = millis();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -1,5 +1,6 @@
#include <ESP8266WiFi.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
@@ -11,7 +12,8 @@
WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
espMqttClient mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
@@ -20,7 +22,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
@@ -30,8 +38,6 @@ void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) {
Serial.println("Disconnected from Wi-Fi.");
reconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
reconnectTimer.once(5, connectToWiFi);
}
void onMqttConnect(bool sessionPresent) {
@@ -55,7 +61,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -104,6 +111,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
wifiConnectHandler = WiFi.onStationModeGotIP(onWiFiConnect);
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWiFiDisconnect);
@@ -119,5 +128,10 @@ void setup() {
}
void loop() {
static uint32_t currentMillis = millis();
mqttClient.loop();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -1,5 +1,6 @@
#include <WiFi.h>
#include <Ticker.h>
#include <espMqttClientAsync.h>
#define WIFI_SSID "yourSSID"
@@ -9,7 +10,8 @@
#define MQTT_PORT 1883
espMqttClientAsync mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
@@ -18,7 +20,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void WiFiEvent(WiFiEvent_t event) {
@@ -32,7 +40,6 @@ void WiFiEvent(WiFiEvent_t event) {
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
reconnectTimer.once(5, connectToWiFi);
break;
default:
break;
@@ -60,7 +67,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -109,6 +117,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
WiFi.onEvent(WiFiEvent);
mqttClient.onConnect(onMqttConnect);
@@ -123,5 +133,9 @@ void setup() {
}
void loop() {
// nothing to do here
static uint32_t currentMillis = millis();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -1,5 +1,6 @@
#include <ESP8266WiFi.h>
#include <Ticker.h>
#include <espMqttClientAsync.h>
#define WIFI_SSID "yourSSID"
@@ -11,7 +12,8 @@
WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
espMqttClientAsync mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
@@ -20,7 +22,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
@@ -30,8 +38,6 @@ void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) {
Serial.println("Disconnected from Wi-Fi.");
reconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
reconnectTimer.once(5, connectToWiFi);
}
void onMqttConnect(bool sessionPresent) {
@@ -55,7 +61,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -104,6 +111,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
wifiConnectHandler = WiFi.onStationModeGotIP(onWiFiConnect);
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWiFiDisconnect);
@@ -119,5 +128,9 @@ void setup() {
}
void loop() {
// nothing to do here
static uint32_t currentMillis = millis();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -1,5 +1,6 @@
#include <WiFi.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
@@ -16,7 +17,8 @@ const char rootCA[] = \
"-----END CERTIFICATE-----\n";
espMqttClientSecure mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
@@ -25,7 +27,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void WiFiEvent(WiFiEvent_t event) {
@@ -39,7 +47,6 @@ void WiFiEvent(WiFiEvent_t event) {
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
reconnectTimer.once(5, connectToWiFi);
break;
default:
break;
@@ -64,7 +71,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -113,6 +121,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
WiFi.onEvent(WiFiEvent);
//mqttClient.setInsecure();
@@ -131,15 +141,21 @@ void setup() {
}
void loop() {
static uint32_t currentMillis = millis();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
static uint32_t lastMillis = 0;
if (millis() - lastMillis > 5000) {
lastMillis = millis();
if (currentMillis - lastMillis > 5000) {
lastMillis = currentMillis;
Serial.printf("heap: %u\n", ESP.getFreeHeap());
}
static uint32_t millisDisconnect = 0;
if (millis() - millisDisconnect > 60000) {
millisDisconnect = millis();
if (currentMillis - millisDisconnect > 60000) {
millisDisconnect = currentMillis;
mqttClient.disconnect();
}
}

View File

@@ -1,5 +1,6 @@
#include <ESP8266WiFi.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define WIFI_SSID "yourSSID"
@@ -14,7 +15,8 @@ const uint8_t fingerprint[] = {0xee, 0xbc, 0x4b, 0xf8, 0x57, 0xe3, 0xd3, 0xe4, 0
WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
espMqttClientSecure mqttClient;
Ticker reconnectTimer;
bool reconnectMqtt = false;
uint32_t lastReconnect = 0;
void connectToWiFi() {
Serial.println("Connecting to Wi-Fi...");
@@ -23,7 +25,13 @@ void connectToWiFi() {
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
if (!mqttClient.connect()) {
reconnectMqtt = true;
lastReconnect = millis();
Serial.println("Connecting failed.");
} else {
reconnectMqtt = false;
}
}
void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
@@ -33,8 +41,6 @@ void onWiFiConnect(const WiFiEventStationModeGotIP& event) {
void onWiFiDisconnect(const WiFiEventStationModeDisconnected& event) {
Serial.println("Disconnected from Wi-Fi.");
reconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
reconnectTimer.once(5, connectToWiFi);
}
void onMqttConnect(bool sessionPresent) {
@@ -58,7 +64,8 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
Serial.printf("Disconnected from MQTT: %u.\n", static_cast<uint8_t>(reason));
if (WiFi.isConnected()) {
reconnectTimer.once(5, connectToMqtt);
reconnectMqtt = true;
lastReconnect = millis();
}
}
@@ -107,6 +114,8 @@ void setup() {
Serial.println();
Serial.println();
WiFi.setAutoConnect(false);
WiFi.setAutoReconnect(true);
wifiConnectHandler = WiFi.onStationModeGotIP(onWiFiConnect);
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWiFiDisconnect);
@@ -123,5 +132,10 @@ void setup() {
}
void loop() {
static uint32_t currentMillis = millis();
mqttClient.loop();
if (reconnectMqtt && currentMillis - lastReconnect > 5000) {
connectToMqtt();
}
}

View File

@@ -40,13 +40,17 @@ the LICENSE file.
#define EMC_ALLOW_NOT_CONNECTED_PUBLISH 1
#endif
#ifndef EMC_WAIT_FOR_CONNACK
#define EMC_WAIT_FOR_CONNACK 1
#endif
#ifndef EMC_CLIENTID_LENGTH
// esp8266abc123 and esp32abcdef123456
#define EMC_CLIENTID_LENGTH 23 + 1
#endif
#ifndef EMC_TASK_STACK_SIZE
#define EMC_TASK_STACK_SIZE 5000
#define EMC_TASK_STACK_SIZE 5120
#endif
#ifndef EMC_USE_WATCHDOG

View File

@@ -9,13 +9,14 @@ the LICENSE file.
#pragma once
#if defined(ARDUINO_ARCH_ESP32)
// Logging is en/disabled by Arduino framework macros
#include <esp32-hal-log.h>
#if defined(DEBUG_ESP_MQTT_CLIENT)
// Logging is en/disabled by Arduino framework macros
#define emc_log_i(...) log_i(__VA_ARGS__)
#define emc_log_e(...) log_e(__VA_ARGS__)
#define emc_log_w(...) log_w(__VA_ARGS__)
#else
// Logging is disabled
#define emc_log_i(...)
#define emc_log_e(...)
#define emc_log_w(...)

View File

@@ -42,6 +42,7 @@ MqttClient::MqttClient()
, _willPayloadLength(0)
, _willQos(0)
, _willRetain(false)
, _timeout(10000)
, _state(State::disconnected)
, _generatedClientId{0}
, _packetId(0)
@@ -76,7 +77,7 @@ MqttClient::MqttClient()
MqttClient::~MqttClient() {
disconnect(true);
_clearQueue(true);
_clearQueue(2);
#if defined(ARDUINO_ARCH_ESP32)
vSemaphoreDelete(_xSemaphore);
if (_useTask) {
@@ -144,9 +145,11 @@ bool MqttClient::disconnect(bool force) {
uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const uint8_t* payload, size_t length) {
#if !EMC_ALLOW_NOT_CONNECTED_PUBLISH
if (_state != State::connected) {
#else
if (_state > State::connected) {
#endif
return 0;
}
#endif
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE();
if (!_addPacket(packetId, topic, payload, length, qos, retain)) {
@@ -166,9 +169,11 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const
uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length) {
#if !EMC_ALLOW_NOT_CONNECTED_PUBLISH
if (_state != State::connected) {
#else
if (_state > State::connected) {
#endif
return 0;
}
#endif
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE();
if (!_addPacket(packetId, topic, callback, length, qos, retain)) {
@@ -180,8 +185,8 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqt
return packetId;
}
void MqttClient::clearQueue(bool all) {
_clearQueue(all);
void MqttClient::clearQueue(bool deleteSessionData) {
_clearQueue(deleteSessionData ? 2 : 0);
}
const char* MqttClient::getClientId() const {
@@ -214,6 +219,31 @@ void MqttClient::loop() {
_state = State::connectingMqtt;
}
break;
case State::connectingMqtt:
#if EMC_WAIT_FOR_CONNACK
_sendPacket();
_checkIncoming();
_checkPing();
break;
#else
// receipt of CONNACK packet will set state to CONNECTED
// client however is allowed to send packets before CONNACK is received
// so we fall through to 'connected'
[[fallthrough]];
#endif
case State::connected:
[[fallthrough]];
case State::disconnectingMqtt2:
if (_transport->connected()) {
// CONNECT packet is first in the queue
_checkOutbox();
_checkIncoming();
_checkPing();
} else {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
break;
case State::disconnectingMqtt1:
EMC_SEMAPHORE_TAKE();
if (_outbox.empty()) {
@@ -226,25 +256,9 @@ void MqttClient::loop() {
}
}
EMC_SEMAPHORE_GIVE();
// fall through to 'connected' to send out DISCONN packet
[[fallthrough]];
case State::disconnectingMqtt2:
[[fallthrough]];
case State::connectingMqtt:
// receipt of CONNACK packet will set state to CONNECTED
// client however is allowed to send packets before CONNACK is received
// so we fall through to 'connected'
[[fallthrough]];
case State::connected:
if (_transport->connected()) {
// CONNECT packet is first in the queue
_checkOutgoing();
_checkIncoming();
_checkPing();
} else {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
_checkOutbox();
_checkIncoming();
_checkPing();
break;
case State::disconnectingTcp1:
_transport->stop();
@@ -252,7 +266,7 @@ void MqttClient::loop() {
break;
case State::disconnectingTcp2:
if (_transport->disconnected()) {
_clearQueue(false);
_clearQueue(0);
_state = State::disconnected;
if (_onDisconnectCallback) _onDisconnectCallback(_disconnectReason);
}
@@ -294,6 +308,15 @@ uint16_t MqttClient::_getNextPacketId() {
return packetId;
}
void MqttClient::_checkOutbox() {
while (_sendPacket() > 0) {
if (!_advanceOutbox()) {
break;
}
}
}
/*
void MqttClient::_checkOutgoing() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
@@ -330,6 +353,59 @@ void MqttClient::_checkOutgoing() {
}
EMC_SEMAPHORE_GIVE();
}
*/
int MqttClient::_sendPacket() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
int32_t wantToWrite = 0;
int32_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
return 0;
}
written = _transport->write(packet->data(_bytesSent), wantToWrite);
if (written < 0) {
emc_log_w("Write error, check connection");
EMC_SEMAPHORE_GIVE();
return -1;
}
// handle with care! millis() returns unsigned 32 bit, token is void*
static_assert(sizeof(uint32_t) <= sizeof(void*), "the size of uint32_t must be smaller than or equal to the size of a pointer");
packet->token = reinterpret_cast<void*>(millis());
_lastClientActivity = millis();
_bytesSent += written;
emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType());
}
EMC_SEMAPHORE_GIVE();
return written;
}
bool MqttClient::_advanceOutbox() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
if (packet && _bytesSent == packet->size()) {
if ((packet->packetType()) == PacketType.DISCONNECT) {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::USER_OK;
}
if (packet->removable()) {
_outbox.removeCurrent();
} else {
// we already set 'dup' here, in case we have to retry
if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup();
_outbox.next();
}
packet = _outbox.getCurrent();
_bytesSent = 0;
}
EMC_SEMAPHORE_GIVE();
return packet;
}
void MqttClient::_checkIncoming() {
int32_t remainingBufferLength = _transport->read(_rxBuffer, EMC_RX_BUFFER_SIZE);
@@ -355,7 +431,7 @@ void MqttClient::_checkIncoming() {
}
break;
case PacketType.PUBLISH:
if (_state == State::disconnectingMqtt1 || _state == State::disconnectingMqtt2) break; // stop processing incoming once user has called disconnect
if (_state >= State::disconnectingMqtt1) break; // stop processing incoming once user has called disconnect
_onPublish();
break;
case PacketType.PUBACK:
@@ -423,12 +499,24 @@ void MqttClient::_checkPing() {
}
}
void MqttClient::_checkTimeout() {
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
if (it && _bytesSent == 0) { // check that we're not busy sending
if (millis() - *((uint32_t*)&(it.get()->token)) > _timeout) { // NOLINT(readability/casting)
// TODO(bertmelis): fix ugly casting hack
emc_log_w("Packet ack timeout, retrying");
_outbox.resetCurrent();
}
}
}
void MqttClient::_onConnack() {
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode == 0x00) {
_pingSent = false; // reset after keepalive timeout disconnect
_state = State::connected;
_advanceOutbox();
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent == 0) {
_clearQueue(true);
_clearQueue(1);
}
if (_onConnectCallback) {
_onConnectCallback(_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent);
@@ -636,15 +724,11 @@ void MqttClient::_onUnsuback() {
}
}
void MqttClient::_clearQueue(bool clearSession) {
emc_log_i("clearing queue (clear session: %s)", clearSession ? "true" : "false");
void MqttClient::_clearQueue(int clearData) {
emc_log_i("clearing queue (clear session: %d)", clearData);
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
if (clearSession) {
while (it) {
_outbox.remove(it);
}
} else {
if (clearData == 0) {
// keep PUB (qos > 0, aka packetID != 0), PUBREC and PUBREL
// Spec only mentions PUB and PUBREL but this lib implements method B from point 4.3.3 (Fig. 4.3)
// and stores the packet id in the PUBREC packet. So we also must keep PUBREC.
@@ -652,12 +736,25 @@ void MqttClient::_clearQueue(bool clearSession) {
espMqttClientInternals::MQTTPacketType type = it.get()->packetType();
if (type == PacketType.PUBREC ||
type == PacketType.PUBREL ||
(type == PacketType.PUBLISH && it.get()->packetId() != 0)) {
(type == PacketType.PUBLISH && it.get()->packetId() != 0)) {
++it;
} else {
_outbox.remove(it);
}
}
} else if (clearData == 1) {
// keep PUB
while (it) {
if (it.get()->packetType() == PacketType.PUBLISH) {
++it;
} else {
_outbox.remove(it);
}
}
} else { // clearData == 2
while (it) {
_outbox.remove(it);
}
}
EMC_SEMAPHORE_GIVE();
}

View File

@@ -63,19 +63,15 @@ class MqttClient {
uint16_t publish(const char* topic, uint8_t qos, bool retain, const uint8_t* payload, size_t length);
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload);
uint16_t publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
void clearQueue(bool all = false); // Not MQTT compliant and may cause unpredictable results when `all` = true!
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
const char* getClientId() const;
#if defined(ARDUINO_ARCH_ESP32)
void loop();
protected:
#endif
void loop();
#if defined(ARDUINO_ARCH_ESP32)
explicit MqttClient(bool useTask, uint8_t priority = 1, uint8_t core = 1);
bool _useTask;
#else
protected:
MqttClient();
#endif
espMqttClientInternals::Transport* _transport;
@@ -102,19 +98,20 @@ class MqttClient {
uint16_t _willPayloadLength;
uint8_t _willQos;
bool _willRetain;
uint32_t _timeout;
// state is protected to allow state changes by the transport system, defined in child classes
// eg. to allow AsyncTCP
enum class State {
disconnected,
connectingTcp1,
connectingTcp2,
connectingMqtt,
connected,
disconnectingMqtt1,
disconnectingMqtt2,
disconnectingTcp1,
disconnectingTcp2
disconnected = 0,
connectingTcp1 = 1,
connectingTcp2 = 2,
connectingMqtt = 3,
connected = 4,
disconnectingMqtt1 = 5,
disconnectingMqtt2 = 6,
disconnectingTcp1 = 7,
disconnectingTcp2 = 8
};
std::atomic<State> _state;
@@ -161,9 +158,12 @@ class MqttClient {
return false;
}
void _checkOutgoing();
void _checkOutbox();
int _sendPacket();
bool _advanceOutbox();
void _checkIncoming();
void _checkPing();
void _checkTimeout();
void _onConnack();
void _onPublish();
@@ -174,7 +174,9 @@ class MqttClient {
void _onSuback();
void _onUnsuback();
void _clearQueue(bool clearSession);
void _clearQueue(int clearData); // 0: keep session,
// 1: keep only PUBLISH qos > 0
// 2: delete all
void _onError(uint16_t packetId, espMqttClientTypes::Error error);
#if defined(ARDUINO_ARCH_ESP32)

View File

@@ -68,6 +68,11 @@ class MqttClientSetup : public MqttClient {
return static_cast<T&>(*this);
}
T& setTimeout(uint16_t timeout) {
_timeout = timeout * 1000; // s to ms conversion, will also do 16 to 32 bit conversion
return static_cast<T&>(*this);
}
T& onConnect(espMqttClientTypes::OnConnectCallback callback) {
_onConnectCallback = callback;
return static_cast<T&>(*this);

View File

@@ -138,6 +138,10 @@ class Outbox {
return nullptr;
}
void resetCurrent() {
_current = _first;
}
Iterator front() const {
Iterator it;
it._node = _first;

View File

@@ -9,8 +9,8 @@ the LICENSE file.
#include "espMqttClient.h"
#if defined(ARDUINO_ARCH_ESP32)
espMqttClient::espMqttClient(uint8_t priority, uint8_t core)
: MqttClientSetup(true, priority, core)
espMqttClient::espMqttClient(bool internalTask, uint8_t priority, uint8_t core)
: MqttClientSetup(internalTask, priority, core)
, _client() {
#else
espMqttClient::espMqttClient()
@@ -21,8 +21,8 @@ espMqttClient::espMqttClient()
#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32)
#if defined(ARDUINO_ARCH_ESP32)
espMqttClientSecure::espMqttClientSecure(uint8_t priority, uint8_t core)
: MqttClientSetup(priority, core)
espMqttClientSecure::espMqttClientSecure(bool internalTask, uint8_t priority, uint8_t core)
: MqttClientSetup(internalTask, priority, core)
, _client() {
#else
espMqttClientSecure::espMqttClientSecure()

View File

@@ -23,7 +23,7 @@ the LICENSE file.
class espMqttClient : public MqttClientSetup<espMqttClient> {
public:
#if defined(ARDUINO_ARCH_ESP32)
explicit espMqttClient(uint8_t priority = 1, uint8_t core = 1);
explicit espMqttClient(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1);
#else
espMqttClient();
#endif
@@ -40,7 +40,7 @@ class espMqttClient : public MqttClientSetup<espMqttClient> {
class espMqttClientSecure : public MqttClientSetup<espMqttClientSecure> {
public:
#if defined(ARDUINO_ARCH_ESP32)
explicit espMqttClientSecure(uint8_t priority = 1, uint8_t core = 1);
explicit espMqttClientSecure(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1);
#else
espMqttClientSecure();
#endif

View File

@@ -43,6 +43,8 @@ void test_connect() {
TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_TRUE(onConnectCalledTest);
TEST_ASSERT_FALSE(sessionPresentTest);
mqttClient.onConnect(nullptr);
}
/*
@@ -93,6 +95,8 @@ void test_subscribe() {
TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_TRUE(subscribeTest);
mqttClient.onSubscribe(nullptr);
}
/*
@@ -133,6 +137,9 @@ void test_publish() {
TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos2Test);
TEST_ASSERT_EQUAL_INT(2, publishSendTest);
TEST_ASSERT_EQUAL_INT(3, publishReceiveTest);
mqttClient.onPublish(nullptr);
mqttClient.onMessage(nullptr);
}
void test_publish_empty() {
@@ -165,6 +172,9 @@ void test_publish_empty() {
TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos2Test);
TEST_ASSERT_EQUAL_INT(2, publishSendEmptyTest);
TEST_ASSERT_EQUAL_INT(3, publishReceiveEmptyTest);
mqttClient.onPublish(nullptr);
mqttClient.onMessage(nullptr);
}
/*
@@ -200,6 +210,9 @@ void test_receive1() {
TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_GREATER_THAN_INT(0, publishReceive1Test);
mqttClient.onMessage(nullptr);
mqttClient.onSubscribe(nullptr);
}
/*
@@ -235,6 +248,9 @@ void test_receive2() {
TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_EQUAL_INT(1, publishReceive2Test);
mqttClient.onMessage(nullptr);
mqttClient.onSubscribe(nullptr);
}
@@ -261,6 +277,8 @@ void test_unsubscribe() {
TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_TRUE(unsubscribeTest);
mqttClient.onUnsubscribe(nullptr);
}
/*
@@ -288,6 +306,71 @@ void test_disconnect() {
TEST_ASSERT_TRUE(onDisconnectCalled);
TEST_ASSERT_EQUAL_UINT8(espMqttClientTypes::DisconnectReason::USER_OK, reasonTest);
TEST_ASSERT_TRUE(mqttClient.disconnected());
mqttClient.onDisconnect(nullptr);
}
void test_pub_before_connect() {
std::atomic<bool> onConnectCalledTest(false);
std::atomic<int> publishSendTest(0);
bool sessionPresentTest = true;
mqttClient.setServer(broker, broker_port)
.setCleanSession(true)
.setKeepAlive(5)
.onConnect([&](bool sessionPresent) mutable {
sessionPresentTest = sessionPresent;
onConnectCalledTest = true;
})
.onPublish([&](uint16_t packetId) mutable {
(void) packetId;
publishSendTest++;
});
uint16_t sendQos0Test = mqttClient.publish("test/test", 0, false, "test0");
uint16_t sendQos1Test = mqttClient.publish("test/test", 1, false, "test1");
uint16_t sendQos2Test = mqttClient.publish("test/test", 2, false, "test2");
mqttClient.connect();
uint32_t start = millis();
while (millis() - start < 2000) {
if (onConnectCalledTest) {
break;
}
std::this_thread::yield();
}
TEST_ASSERT_TRUE(mqttClient.connected());
TEST_ASSERT_TRUE(onConnectCalledTest);
TEST_ASSERT_FALSE(sessionPresentTest);
start = millis();
while (millis() - start < 10000) {
std::this_thread::yield();
}
TEST_ASSERT_EQUAL_UINT16(1, sendQos0Test);
TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos1Test);
TEST_ASSERT_GREATER_THAN_UINT16(0, sendQos2Test);
TEST_ASSERT_EQUAL_INT(2, publishSendTest);
mqttClient.onConnect(nullptr);
mqttClient.onPublish(nullptr);
}
void final_disconnect() {
std::atomic<bool> onDisconnectCalled(false);
mqttClient.onDisconnect([&](espMqttClientTypes::DisconnectReason reason) mutable {
(void) reason;
onDisconnectCalled = true;
});
mqttClient.disconnect();
uint32_t start = millis();
while (millis() - start < 2000) {
if (onDisconnectCalled) {
break;
}
std::this_thread::yield();
}
if (mqttClient.connected()) {
mqttClient.disconnect(true);
}
mqttClient.onDisconnect(nullptr);
}
int main() {
@@ -307,6 +390,8 @@ int main() {
RUN_TEST(test_receive2);
RUN_TEST(test_unsubscribe);
RUN_TEST(test_disconnect);
RUN_TEST(test_pub_before_connect);
final_disconnect();
exitProgram = true;
t.join();
return UNITY_END();

View File

@@ -32,6 +32,10 @@ unsigned long restartTs = (2^32) - 5 * 60000;
RTC_NOINIT_ATTR int restartReason;
RTC_NOINIT_ATTR uint64_t restartReasonValid;
TaskHandle_t networkTaskHandle = nullptr;
TaskHandle_t nukiTaskHandle = nullptr;
TaskHandle_t presenceDetectionTaskHandle = nullptr;
void networkTask(void *pvParameters)
{
while(true)
@@ -51,9 +55,14 @@ void networkTask(void *pvParameters)
restartEsp(RestartReason::RestartTimer);
}
delay(200);
delay(100);
// Serial.println(uxTaskGetStackHighWaterMark(NULL));
// if(wmts < millis())
// {
// Serial.print("# ");
// Serial.println(uxTaskGetStackHighWaterMark(NULL));
// wmts = millis() + 60000;
// }
}
}
@@ -96,9 +105,9 @@ void setupTasks()
{
// configMAX_PRIORITIES is 25
xTaskCreatePinnedToCore(networkTask, "ntw", 8192, NULL, 3, NULL, 1);
xTaskCreatePinnedToCore(nukiTask, "nuki", 4096, NULL, 2, NULL, 1);
xTaskCreatePinnedToCore(presenceDetectionTask, "prdet", 768, NULL, 5, NULL, 1);
xTaskCreatePinnedToCore(networkTask, "ntw", 8192, NULL, 3, &networkTaskHandle, 1);
xTaskCreatePinnedToCore(nukiTask, "nuki", 3328, NULL, 2, &nukiTaskHandle, 1);
xTaskCreatePinnedToCore(presenceDetectionTask, "prdet", 896, NULL, 5, &presenceDetectionTaskHandle, 1);
}
uint32_t getRandomId()

View File

@@ -10,14 +10,22 @@
#include "espMqttClient.h"
#include "../RestartReason.h"
EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* _preferences)
: NetworkDevice(hostname)
EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* preferences, const std::string& deviceName, uint8_t phy_addr, int power, int mdc, int mdio, eth_phy_type_t ethtype, eth_clock_mode_t clock_mode, bool use_mac_from_efuse)
: NetworkDevice(hostname),
_deviceName(deviceName),
_phy_addr(phy_addr),
_power(power),
_mdc(mdc),
_mdio(mdio),
_type(ethtype),
_clock_mode(clock_mode),
_use_mac_from_efuse(use_mac_from_efuse)
{
_restartOnDisconnect = _preferences->getBool(preference_restart_on_disconnect);
_restartOnDisconnect = preferences->getBool(preference_restart_on_disconnect);
size_t caLength = _preferences->getString(preference_mqtt_ca,_ca,TLS_CA_MAX_SIZE);
size_t crtLength = _preferences->getString(preference_mqtt_crt,_cert,TLS_CERT_MAX_SIZE);
size_t keyLength = _preferences->getString(preference_mqtt_key,_key,TLS_KEY_MAX_SIZE);
size_t caLength = preferences->getString(preference_mqtt_ca, _ca, TLS_CA_MAX_SIZE);
size_t crtLength = preferences->getString(preference_mqtt_crt, _cert, TLS_CERT_MAX_SIZE);
size_t keyLength = preferences->getString(preference_mqtt_key, _key, TLS_KEY_MAX_SIZE);
_useEncryption = caLength > 1; // length is 1 when empty
@@ -25,7 +33,7 @@ EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* _prefere
{
Log->println(F("MQTT over TLS."));
Log->println(_ca);
_mqttClientSecure = new espMqttClientSecure();
_mqttClientSecure = new espMqttClientSecure(false);
_mqttClientSecure->setCACert(_ca);
if(crtLength > 1 && keyLength > 1) // length is 1 when empty
{
@@ -38,15 +46,15 @@ EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* _prefere
} else
{
Log->println(F("MQTT without TLS."));
_mqttClient = new espMqttClient();
_mqttClient = new espMqttClient(false);
}
if(_preferences->getBool(preference_mqtt_log_enabled))
if(preferences->getBool(preference_mqtt_log_enabled))
{
_path = new char[200];
memset(_path, 0, sizeof(_path));
String pathStr = _preferences->getString(preference_mqtt_lock_path);
String pathStr = preferences->getString(preference_mqtt_lock_path);
pathStr.concat(mqtt_topic_log);
strcpy(_path, pathStr.c_str());
Log = new MqttLogger(this, _path, MqttLoggerMode::MqttAndSerial);
@@ -55,14 +63,16 @@ EthLan8720Device::EthLan8720Device(const String& hostname, Preferences* _prefere
const String EthLan8720Device::deviceName() const
{
return "Olimex LAN8720";
return _deviceName.c_str();
}
void EthLan8720Device::initialize()
{
delay(250);
_hardwareInitialized = ETH.begin(ETH_PHY_ADDR, 12, ETH_PHY_MDC, ETH_PHY_MDIO, ETH_PHY_TYPE, ETH_CLOCK_GPIO17_OUT);
WiFi.setHostname(_hostname.c_str());
_hardwareInitialized = ETH.begin(_phy_addr, _power, _mdc, _mdio, _type, _clock_mode, _use_mac_from_efuse);
ETH.setHostname(_hostname.c_str());
if(_restartOnDisconnect)
{
@@ -95,7 +105,17 @@ bool EthLan8720Device::supportsEncryption()
bool EthLan8720Device::isConnected()
{
return ETH.linkUp();
bool connected = ETH.linkUp();
if(_lastConnected == false && connected == true)
{
Serial.print(F("Ethernet connected. IP address: "));
Serial.println(ETH.localIP().toString());
}
_lastConnected = connected;
return connected;
}
ReconnectStatus EthLan8720Device::reconnect()
@@ -104,13 +124,20 @@ ReconnectStatus EthLan8720Device::reconnect()
{
return ReconnectStatus::CriticalFailure;
}
delay(3000);
delay(200);
return isConnected() ? ReconnectStatus::Success : ReconnectStatus::Failure;
}
void EthLan8720Device::update()
{
if(_useEncryption)
{
_mqttClientSecure->loop();
}
else
{
_mqttClient->loop();
}
}
void EthLan8720Device::onDisconnected()

View File

@@ -5,11 +5,22 @@
#include <Preferences.h>
#include "NetworkDevice.h"
#include "espMqttClient.h"
#include <ETH.h>
class EthLan8720Device : public NetworkDevice
{
public:
EthLan8720Device(const String& hostname, Preferences* _preferences);
EthLan8720Device(const String& hostname,
Preferences* preferences,
const std::string& deviceName,
uint8_t phy_addr = ETH_PHY_ADDR,
int power = ETH_PHY_POWER,
int mdc = ETH_PHY_MDC,
int mdio = ETH_PHY_MDIO,
eth_phy_type_t ethtype = ETH_PHY_TYPE,
eth_clock_mode_t clock_mode = ETH_CLK_MODE,
bool use_mac_from_efuse = false);
const String deviceName() const override;
@@ -62,6 +73,16 @@ private:
char* _path;
bool _useEncryption = false;
bool _hardwareInitialized = false;
bool _lastConnected = false;
const std::string _deviceName;
uint8_t _phy_addr;
int _power;
int _mdc;
int _mdio;
eth_phy_type_t _type;
eth_clock_mode_t _clock_mode;
bool _use_mac_from_efuse;
char _ca[TLS_CA_MAX_SIZE] = {0};
char _cert[TLS_CERT_MAX_SIZE] = {0};

View File

@@ -164,7 +164,17 @@ bool W5500Device::supportsEncryption()
bool W5500Device::isConnected()
{
return Ethernet.linkStatus() == EthernetLinkStatus::LinkON && _maintainResult == 0 && _hasDHCPAddress;
bool connected = (Ethernet.linkStatus() == EthernetLinkStatus::LinkON && _maintainResult == 0 && _hasDHCPAddress);
if(_lastConnected == false && connected == true)
{
Serial.print(F("Ethernet connected. IP address: "));
Serial.println(Ethernet.localIP().toString());
}
_lastConnected = connected;
return connected;
}
void W5500Device::initializeMacAddress(byte *mac)
@@ -197,6 +207,7 @@ void W5500Device::initializeMacAddress(byte *mac)
void W5500Device::update()
{
_maintainResult = Ethernet.maintain();
_mqttClient.loop();
}
int8_t W5500Device::signalStrength()

View File

@@ -71,6 +71,7 @@ private:
bool _hasDHCPAddress = false;
char* _path;
W5500Variant _variant;
bool _lastConnected = false;
byte _mac[6];
};

View File

@@ -25,7 +25,7 @@ WifiDevice::WifiDevice(const String& hostname, Preferences* _preferences)
{
Log->println(F("MQTT over TLS."));
Log->println(_ca);
_mqttClientSecure = new espMqttClientSecure();
_mqttClientSecure = new espMqttClientSecure(false);
_mqttClientSecure->setCACert(_ca);
if(crtLength > 1 && keyLength > 1) // length is 1 when empty
{
@@ -38,7 +38,7 @@ WifiDevice::WifiDevice(const String& hostname, Preferences* _preferences)
} else
{
Log->println(F("MQTT without TLS."));
_mqttClient = new espMqttClient();
_mqttClient = new espMqttClient(false);
}
if(_preferences->getBool(preference_mqtt_log_enabled))
@@ -142,7 +142,14 @@ ReconnectStatus WifiDevice::reconnect()
void WifiDevice::update()
{
if(_useEncryption)
{
_mqttClientSecure->loop();
}
else
{
_mqttClient->loop();
}
}
void WifiDevice::onDisconnected()

View File

@@ -1,8 +1,13 @@
#include "espMqttClientW5500.h"
espMqttClientW5500::espMqttClientW5500(uint8_t priority, uint8_t core)
: MqttClientSetup(true, priority, core),
: MqttClientSetup(false, priority, core),
_client()
{
_transport = &_client;
}
void espMqttClientW5500::update()
{
loop();
}

View File

@@ -11,6 +11,8 @@ public:
espMqttClient();
#endif
void update();
protected:
#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32)
espMqttClientInternals::ClientSyncW5500 _client;