upgrad esp mqtt lib
This commit is contained in:
@@ -42,6 +42,7 @@ MqttClient::MqttClient()
|
||||
, _willPayloadLength(0)
|
||||
, _willQos(0)
|
||||
, _willRetain(false)
|
||||
, _timeout(10000)
|
||||
, _state(State::disconnected)
|
||||
, _generatedClientId{0}
|
||||
, _packetId(0)
|
||||
@@ -373,6 +374,9 @@ int MqttClient::_sendPacket() {
|
||||
EMC_SEMAPHORE_GIVE();
|
||||
return -1;
|
||||
}
|
||||
// handle with care! millis() returns unsigned 32 bit, token is void*
|
||||
static_assert(sizeof(uint32_t) <= sizeof(void*), "the size of uint32_t must be smaller than or equal to the size of a pointer");
|
||||
packet->token = reinterpret_cast<void*>(millis());
|
||||
_lastClientActivity = millis();
|
||||
_bytesSent += written;
|
||||
emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType());
|
||||
@@ -392,8 +396,7 @@ bool MqttClient::_advanceOutbox() {
|
||||
if (packet->removable()) {
|
||||
_outbox.removeCurrent();
|
||||
} else {
|
||||
// handle with care! millis() returns unsigned 32 bit, token is void*
|
||||
packet->token = reinterpret_cast<void*>(millis());
|
||||
// we already set 'dup' here, in case we have to retry
|
||||
if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup();
|
||||
_outbox.next();
|
||||
}
|
||||
@@ -496,6 +499,17 @@ void MqttClient::_checkPing() {
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClient::_checkTimeout() {
|
||||
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
|
||||
if (it && _bytesSent == 0) { // check that we're not busy sending
|
||||
if (millis() - *((uint32_t*)&(it.get()->token)) > _timeout) { // NOLINT(readability/casting)
|
||||
// TODO(bertmelis): fix ugly casting hack
|
||||
emc_log_w("Packet ack timeout, retrying");
|
||||
_outbox.resetCurrent();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClient::_onConnack() {
|
||||
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode == 0x00) {
|
||||
_pingSent = false; // reset after keepalive timeout disconnect
|
||||
|
||||
@@ -65,17 +65,13 @@ class MqttClient {
|
||||
uint16_t publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
|
||||
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
|
||||
const char* getClientId() const;
|
||||
#if defined(ARDUINO_ARCH_ESP32)
|
||||
void loop();
|
||||
|
||||
protected:
|
||||
#endif
|
||||
void loop();
|
||||
#if defined(ARDUINO_ARCH_ESP32)
|
||||
explicit MqttClient(bool useTask, uint8_t priority = 1, uint8_t core = 1);
|
||||
bool _useTask;
|
||||
#else
|
||||
|
||||
protected:
|
||||
MqttClient();
|
||||
#endif
|
||||
espMqttClientInternals::Transport* _transport;
|
||||
@@ -102,6 +98,7 @@ class MqttClient {
|
||||
uint16_t _willPayloadLength;
|
||||
uint8_t _willQos;
|
||||
bool _willRetain;
|
||||
uint32_t _timeout;
|
||||
|
||||
// state is protected to allow state changes by the transport system, defined in child classes
|
||||
// eg. to allow AsyncTCP
|
||||
@@ -166,6 +163,7 @@ class MqttClient {
|
||||
bool _advanceOutbox();
|
||||
void _checkIncoming();
|
||||
void _checkPing();
|
||||
void _checkTimeout();
|
||||
|
||||
void _onConnack();
|
||||
void _onPublish();
|
||||
|
||||
@@ -68,6 +68,11 @@ class MqttClientSetup : public MqttClient {
|
||||
return static_cast<T&>(*this);
|
||||
}
|
||||
|
||||
T& setTimeout(uint16_t timeout) {
|
||||
_timeout = timeout * 1000; // s to ms conversion, will also do 16 to 32 bit conversion
|
||||
return static_cast<T&>(*this);
|
||||
}
|
||||
|
||||
T& onConnect(espMqttClientTypes::OnConnectCallback callback) {
|
||||
_onConnectCallback = callback;
|
||||
return static_cast<T&>(*this);
|
||||
|
||||
@@ -138,6 +138,10 @@ class Outbox {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void resetCurrent() {
|
||||
_current = _first;
|
||||
}
|
||||
|
||||
Iterator front() const {
|
||||
Iterator it;
|
||||
it._node = _first;
|
||||
|
||||
@@ -9,8 +9,8 @@ the LICENSE file.
|
||||
#include "espMqttClient.h"
|
||||
|
||||
#if defined(ARDUINO_ARCH_ESP32)
|
||||
espMqttClient::espMqttClient(uint8_t priority, uint8_t core)
|
||||
: MqttClientSetup(true, priority, core)
|
||||
espMqttClient::espMqttClient(bool internalTask, uint8_t priority, uint8_t core)
|
||||
: MqttClientSetup(internalTask, priority, core)
|
||||
, _client() {
|
||||
#else
|
||||
espMqttClient::espMqttClient()
|
||||
@@ -21,8 +21,8 @@ espMqttClient::espMqttClient()
|
||||
|
||||
#if defined(ARDUINO_ARCH_ESP8266) || defined(ARDUINO_ARCH_ESP32)
|
||||
#if defined(ARDUINO_ARCH_ESP32)
|
||||
espMqttClientSecure::espMqttClientSecure(uint8_t priority, uint8_t core)
|
||||
: MqttClientSetup(priority, core)
|
||||
espMqttClientSecure::espMqttClientSecure(bool internalTask, uint8_t priority, uint8_t core)
|
||||
: MqttClientSetup(internalTask, priority, core)
|
||||
, _client() {
|
||||
#else
|
||||
espMqttClientSecure::espMqttClientSecure()
|
||||
|
||||
@@ -23,7 +23,7 @@ the LICENSE file.
|
||||
class espMqttClient : public MqttClientSetup<espMqttClient> {
|
||||
public:
|
||||
#if defined(ARDUINO_ARCH_ESP32)
|
||||
explicit espMqttClient(uint8_t priority = 1, uint8_t core = 1);
|
||||
explicit espMqttClient(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1);
|
||||
#else
|
||||
espMqttClient();
|
||||
#endif
|
||||
@@ -40,7 +40,7 @@ class espMqttClient : public MqttClientSetup<espMqttClient> {
|
||||
class espMqttClientSecure : public MqttClientSetup<espMqttClientSecure> {
|
||||
public:
|
||||
#if defined(ARDUINO_ARCH_ESP32)
|
||||
explicit espMqttClientSecure(uint8_t priority = 1, uint8_t core = 1);
|
||||
explicit espMqttClientSecure(bool internalTask = true, uint8_t priority = 1, uint8_t core = 1);
|
||||
#else
|
||||
espMqttClientSecure();
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user