upadte esp mqtt client

This commit is contained in:
technyon
2023-02-26 14:34:08 +01:00
parent 8fd17206dd
commit b14ea6326e
27 changed files with 454 additions and 203 deletions

View File

@@ -40,13 +40,17 @@ the LICENSE file.
#define EMC_ALLOW_NOT_CONNECTED_PUBLISH 1
#endif
#ifndef EMC_WAIT_FOR_CONNACK
#define EMC_WAIT_FOR_CONNACK 1
#endif
#ifndef EMC_CLIENTID_LENGTH
// esp8266abc123 and esp32abcdef123456
#define EMC_CLIENTID_LENGTH 23 + 1
#endif
#ifndef EMC_TASK_STACK_SIZE
#define EMC_TASK_STACK_SIZE 5000
#define EMC_TASK_STACK_SIZE 5120
#endif
#ifndef EMC_USE_WATCHDOG

View File

@@ -9,13 +9,14 @@ the LICENSE file.
#pragma once
#if defined(ARDUINO_ARCH_ESP32)
// Logging is en/disabled by Arduino framework macros
#include <esp32-hal-log.h>
#if defined(DEBUG_ESP_MQTT_CLIENT)
// Logging is en/disabled by Arduino framework macros
#define emc_log_i(...) log_i(__VA_ARGS__)
#define emc_log_e(...) log_e(__VA_ARGS__)
#define emc_log_w(...) log_w(__VA_ARGS__)
#else
// Logging is disabled
#define emc_log_i(...)
#define emc_log_e(...)
#define emc_log_w(...)

View File

@@ -76,7 +76,7 @@ MqttClient::MqttClient()
MqttClient::~MqttClient() {
disconnect(true);
_clearQueue(true);
_clearQueue(2);
#if defined(ARDUINO_ARCH_ESP32)
vSemaphoreDelete(_xSemaphore);
if (_useTask) {
@@ -144,9 +144,11 @@ bool MqttClient::disconnect(bool force) {
uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const uint8_t* payload, size_t length) {
#if !EMC_ALLOW_NOT_CONNECTED_PUBLISH
if (_state != State::connected) {
#else
if (_state > State::connected) {
#endif
return 0;
}
#endif
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE();
if (!_addPacket(packetId, topic, payload, length, qos, retain)) {
@@ -166,9 +168,11 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const
uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length) {
#if !EMC_ALLOW_NOT_CONNECTED_PUBLISH
if (_state != State::connected) {
#else
if (_state > State::connected) {
#endif
return 0;
}
#endif
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE();
if (!_addPacket(packetId, topic, callback, length, qos, retain)) {
@@ -180,8 +184,8 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqt
return packetId;
}
void MqttClient::clearQueue(bool all) {
_clearQueue(all);
void MqttClient::clearQueue(bool deleteSessionData) {
_clearQueue(deleteSessionData ? 2 : 0);
}
const char* MqttClient::getClientId() const {
@@ -214,6 +218,31 @@ void MqttClient::loop() {
_state = State::connectingMqtt;
}
break;
case State::connectingMqtt:
#if EMC_WAIT_FOR_CONNACK
_sendPacket();
_checkIncoming();
_checkPing();
break;
#else
// receipt of CONNACK packet will set state to CONNECTED
// client however is allowed to send packets before CONNACK is received
// so we fall through to 'connected'
[[fallthrough]];
#endif
case State::connected:
[[fallthrough]];
case State::disconnectingMqtt2:
if (_transport->connected()) {
// CONNECT packet is first in the queue
_checkOutbox();
_checkIncoming();
_checkPing();
} else {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
break;
case State::disconnectingMqtt1:
EMC_SEMAPHORE_TAKE();
if (_outbox.empty()) {
@@ -226,25 +255,9 @@ void MqttClient::loop() {
}
}
EMC_SEMAPHORE_GIVE();
// fall through to 'connected' to send out DISCONN packet
[[fallthrough]];
case State::disconnectingMqtt2:
[[fallthrough]];
case State::connectingMqtt:
// receipt of CONNACK packet will set state to CONNECTED
// client however is allowed to send packets before CONNACK is received
// so we fall through to 'connected'
[[fallthrough]];
case State::connected:
if (_transport->connected()) {
// CONNECT packet is first in the queue
_checkOutgoing();
_checkIncoming();
_checkPing();
} else {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
_checkOutbox();
_checkIncoming();
_checkPing();
break;
case State::disconnectingTcp1:
_transport->stop();
@@ -252,7 +265,7 @@ void MqttClient::loop() {
break;
case State::disconnectingTcp2:
if (_transport->disconnected()) {
_clearQueue(false);
_clearQueue(0);
_state = State::disconnected;
if (_onDisconnectCallback) _onDisconnectCallback(_disconnectReason);
}
@@ -294,6 +307,15 @@ uint16_t MqttClient::_getNextPacketId() {
return packetId;
}
void MqttClient::_checkOutbox() {
while (_sendPacket() > 0) {
if (!_advanceOutbox()) {
break;
}
}
}
/*
void MqttClient::_checkOutgoing() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
@@ -330,6 +352,57 @@ void MqttClient::_checkOutgoing() {
}
EMC_SEMAPHORE_GIVE();
}
*/
int MqttClient::_sendPacket() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
int32_t wantToWrite = 0;
int32_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
return 0;
}
written = _transport->write(packet->data(_bytesSent), wantToWrite);
if (written < 0) {
emc_log_w("Write error, check connection");
EMC_SEMAPHORE_GIVE();
return -1;
}
_lastClientActivity = millis();
_bytesSent += written;
emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType());
}
EMC_SEMAPHORE_GIVE();
return written;
}
bool MqttClient::_advanceOutbox() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
if (packet && _bytesSent == packet->size()) {
if ((packet->packetType()) == PacketType.DISCONNECT) {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::USER_OK;
}
if (packet->removable()) {
_outbox.removeCurrent();
} else {
// handle with care! millis() returns unsigned 32 bit, token is void*
packet->token = reinterpret_cast<void*>(millis());
if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup();
_outbox.next();
}
packet = _outbox.getCurrent();
_bytesSent = 0;
}
EMC_SEMAPHORE_GIVE();
return packet;
}
void MqttClient::_checkIncoming() {
int32_t remainingBufferLength = _transport->read(_rxBuffer, EMC_RX_BUFFER_SIZE);
@@ -355,7 +428,7 @@ void MqttClient::_checkIncoming() {
}
break;
case PacketType.PUBLISH:
if (_state == State::disconnectingMqtt1 || _state == State::disconnectingMqtt2) break; // stop processing incoming once user has called disconnect
if (_state >= State::disconnectingMqtt1) break; // stop processing incoming once user has called disconnect
_onPublish();
break;
case PacketType.PUBACK:
@@ -427,8 +500,9 @@ void MqttClient::_onConnack() {
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode == 0x00) {
_pingSent = false; // reset after keepalive timeout disconnect
_state = State::connected;
_advanceOutbox();
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent == 0) {
_clearQueue(true);
_clearQueue(1);
}
if (_onConnectCallback) {
_onConnectCallback(_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent);
@@ -636,15 +710,11 @@ void MqttClient::_onUnsuback() {
}
}
void MqttClient::_clearQueue(bool clearSession) {
emc_log_i("clearing queue (clear session: %s)", clearSession ? "true" : "false");
void MqttClient::_clearQueue(int clearData) {
emc_log_i("clearing queue (clear session: %d)", clearData);
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
if (clearSession) {
while (it) {
_outbox.remove(it);
}
} else {
if (clearData == 0) {
// keep PUB (qos > 0, aka packetID != 0), PUBREC and PUBREL
// Spec only mentions PUB and PUBREL but this lib implements method B from point 4.3.3 (Fig. 4.3)
// and stores the packet id in the PUBREC packet. So we also must keep PUBREC.
@@ -652,12 +722,25 @@ void MqttClient::_clearQueue(bool clearSession) {
espMqttClientInternals::MQTTPacketType type = it.get()->packetType();
if (type == PacketType.PUBREC ||
type == PacketType.PUBREL ||
(type == PacketType.PUBLISH && it.get()->packetId() != 0)) {
(type == PacketType.PUBLISH && it.get()->packetId() != 0)) {
++it;
} else {
_outbox.remove(it);
}
}
} else if (clearData == 1) {
// keep PUB
while (it) {
if (it.get()->packetType() == PacketType.PUBLISH) {
++it;
} else {
_outbox.remove(it);
}
}
} else { // clearData == 2
while (it) {
_outbox.remove(it);
}
}
EMC_SEMAPHORE_GIVE();
}

View File

@@ -63,13 +63,13 @@ class MqttClient {
uint16_t publish(const char* topic, uint8_t qos, bool retain, const uint8_t* payload, size_t length);
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload);
uint16_t publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
void clearQueue(bool all = false); // Not MQTT compliant and may cause unpredictable results when `all` = true!
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
const char* getClientId() const;
#if defined(ARDUINO_ARCH_ESP32)
void loop();
protected:
#endif
void loop();
#if defined(ARDUINO_ARCH_ESP32)
explicit MqttClient(bool useTask, uint8_t priority = 1, uint8_t core = 1);
bool _useTask;
@@ -106,15 +106,15 @@ class MqttClient {
// state is protected to allow state changes by the transport system, defined in child classes
// eg. to allow AsyncTCP
enum class State {
disconnected,
connectingTcp1,
connectingTcp2,
connectingMqtt,
connected,
disconnectingMqtt1,
disconnectingMqtt2,
disconnectingTcp1,
disconnectingTcp2
disconnected = 0,
connectingTcp1 = 1,
connectingTcp2 = 2,
connectingMqtt = 3,
connected = 4,
disconnectingMqtt1 = 5,
disconnectingMqtt2 = 6,
disconnectingTcp1 = 7,
disconnectingTcp2 = 8
};
std::atomic<State> _state;
@@ -161,7 +161,9 @@ class MqttClient {
return false;
}
void _checkOutgoing();
void _checkOutbox();
int _sendPacket();
bool _advanceOutbox();
void _checkIncoming();
void _checkPing();
@@ -174,7 +176,9 @@ class MqttClient {
void _onSuback();
void _onUnsuback();
void _clearQueue(bool clearSession);
void _clearQueue(int clearData); // 0: keep session,
// 1: keep only PUBLISH qos > 0
// 2: delete all
void _onError(uint16_t packetId, espMqttClientTypes::Error error);
#if defined(ARDUINO_ARCH_ESP32)

View File

@@ -66,69 +66,3 @@ void espMqttClientAsync::onPollCb(void* a, AsyncClient* c) {
}
#endif
#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32)
#if defined(ARDUINO_ARCH_ESP32)
espMqttClientSecureAsync::espMqttClientSecureAsync(uint8_t priority, uint8_t core)
: MqttClientSetup(false, priority, core)
, _client() {
#else
espMqttClientSecure::espMqttClientSecure()
: _client() {
#endif
_transport = &_client;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setInsecure() {
_client.client.setInsecure();
return *this;
}
#if defined(ARDUINO_ARCH_ESP32)
espMqttClientSecureAsync& espMqttClientSecureAsync::setCACert(const char* rootCA) {
_client.client.setCACert(rootCA);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setCertificate(const char* clientCa) {
_client.client.setCertificate(clientCa);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setPrivateKey(const char* privateKey) {
_client.client.setPrivateKey(privateKey);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setPreSharedKey(const char* pskIdent, const char* psKey) {
_client.client.setPreSharedKey(pskIdent, psKey);
return *this;
}
#elif defined(ARDUINO_ARCH_ESP8266)
espMqttClientSecureAsync& espMqttClientSecureAsync::setFingerprint(const uint8_t fingerprint[20]) {
_client.client.setFingerprint(fingerprint);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setTrustAnchors(const X509List *ta) {
_client.client.setTrustAnchors(ta);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setClientRSACert(const X509List *cert, const PrivateKey *sk) {
_client.client.setClientRSACert(cert, sk);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setClientECCert(const X509List *cert, const PrivateKey *sk, unsigned allowed_usages, unsigned cert_issuer_key_type) {
_client.client.setClientECCert(cert, sk, allowed_usages, cert_issuer_key_type);
return *this;
}
espMqttClientSecureAsync& espMqttClientSecureAsync::setCertStore(CertStoreBase *certStore) {
_client.client.setCertStore(certStore);
return *this;
}
#endif
#endif

View File

@@ -16,7 +16,6 @@ the LICENSE file.
#include "Transport/ClientAsync.h"
#include "MqttClientSetup.h"
#include "Transport/ClientSecureSync.h"
class espMqttClientAsync : public MqttClientSetup<espMqttClientAsync> {
public:
@@ -39,32 +38,3 @@ class espMqttClientAsync : public MqttClientSetup<espMqttClientAsync> {
};
#endif
#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32)
class espMqttClientSecureAsync : public MqttClientSetup<espMqttClientSecureAsync> {
public:
#if defined(ARDUINO_ARCH_ESP32)
explicit espMqttClientSecureAsync(uint8_t priority = 1, uint8_t core = 1);
#else
espMqttClientSecure();
#endif
espMqttClientSecureAsync& setInsecure();
#if defined(ARDUINO_ARCH_ESP32)
espMqttClientSecureAsync& setCACert(const char* rootCA);
espMqttClientSecureAsync& setCertificate(const char* clientCa);
espMqttClientSecureAsync& setPrivateKey(const char* privateKey);
espMqttClientSecureAsync& setPreSharedKey(const char* pskIdent, const char* psKey);
#else
espMqttClientSecure& setFingerprint(const uint8_t fingerprint[20]);
espMqttClientSecure& setTrustAnchors(const X509List *ta);
espMqttClientSecure& setClientRSACert(const X509List *cert, const PrivateKey *sk);
espMqttClientSecure& setClientECCert(const X509List *cert, const PrivateKey *sk, unsigned allowed_usages, unsigned cert_issuer_key_type);
espMqttClientSecure& setCertStore(CertStoreBase *certStore);
#endif
protected:
espMqttClientInternals::ClientSecureSync _client;
};
#endif