update esp mqtt library

This commit is contained in:
technyon
2023-03-10 19:27:36 +01:00
parent 0229280023
commit d134dc9aab
29 changed files with 272 additions and 215 deletions

View File

@@ -13,12 +13,11 @@ using espMqttClientInternals::PacketType;
using espMqttClientTypes::DisconnectReason;
using espMqttClientTypes::Error;
MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority, uint8_t core)
#if defined(ARDUINO_ARCH_ESP32)
MqttClient::MqttClient(bool useTask, uint8_t priority, uint8_t core)
: _useTask(useTask)
: _useInternalTask(useInternalTask)
, _transport(nullptr)
#else
MqttClient::MqttClient()
: _transport(nullptr)
#endif
, _onConnectCallback(nullptr)
@@ -31,7 +30,7 @@ MqttClient::MqttClient()
, _clientId(nullptr)
, _ip()
, _host(nullptr)
, _port(1183)
, _port(1883)
, _useIp(false)
, _keepAlive(15000)
, _cleanSession(true)
@@ -58,19 +57,21 @@ MqttClient::MqttClient()
, _lastServerActivity(0)
, _pingSent(false)
, _disconnectReason(DisconnectReason::TCP_DISCONNECTED)
#if defined(ARDUINO_ARCH_ESP32)
#if ARDUHAL_LOG_LEVEL >= ARDUHAL_LOG_LEVEL_INFO
#if defined(ARDUINO_ARCH_ESP32) && ARDUHAL_LOG_LEVEL >= ARDUHAL_LOG_LEVEL_INFO
, _highWaterMark(4294967295)
#endif
#endif
{
EMC_GENERATE_CLIENTID(_generatedClientId);
#if defined(ARDUINO_ARCH_ESP32)
_xSemaphore = xSemaphoreCreateMutex();
EMC_SEMAPHORE_GIVE(); // release before first use
if (useTask) {
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
xTaskCreatePinnedToCore((TaskFunction_t)_loop, "mqttclient", EMC_TASK_STACK_SIZE, this, priority, &_taskHandle, core);
}
#else
(void) useInternalTask;
(void) priority;
(void) core;
#endif
_clientId = _generatedClientId;
}
@@ -80,7 +81,7 @@ MqttClient::~MqttClient() {
_clearQueue(2);
#if defined(ARDUINO_ARCH_ESP32)
vSemaphoreDelete(_xSemaphore);
if (_useTask) {
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
#if EMC_USE_WATCHDOG
esp_task_wdt_delete(_taskHandle); // not sure if this is really needed
#endif
@@ -114,7 +115,7 @@ bool MqttClient::connect() {
(uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally
_clientId)) {
#if defined(ARDUINO_ARCH_ESP32)
if (_useTask) {
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskResume(_taskHandle);
}
#endif
@@ -197,7 +198,7 @@ void MqttClient::loop() {
switch (_state) {
case State::disconnected:
#if defined(ARDUINO_ARCH_ESP32)
if (_useTask) {
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskSuspend(_taskHandle);
}
#endif
@@ -239,6 +240,7 @@ void MqttClient::loop() {
_checkOutbox();
_checkIncoming();
_checkPing();
_checkTimeout();
} else {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
@@ -259,6 +261,7 @@ void MqttClient::loop() {
_checkOutbox();
_checkIncoming();
_checkPing();
_checkTimeout();
break;
case State::disconnectingTcp1:
_transport->stop();
@@ -316,70 +319,29 @@ void MqttClient::_checkOutbox() {
}
}
/*
void MqttClient::_checkOutgoing() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
int32_t wantToWrite = 0;
int32_t written = 0;
while (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->available(_bytesSent);
written = _transport->write(packet->data(_bytesSent), wantToWrite);
if (written < 0) {
emc_log_w("Write error, check connection");
break;
}
_lastClientActivity = millis();
_bytesSent += written;
emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType());
if (_bytesSent == packet->size()) {
if ((packet->packetType()) == PacketType.DISCONNECT) {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::USER_OK;
}
if (packet->removable()) {
_outbox.removeCurrent();
} else {
// handle with care! millis() returns unsigned 32 bit, token is void*
packet->token = reinterpret_cast<void*>(millis());
if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup();
_outbox.next();
}
packet = _outbox.getCurrent();
_bytesSent = 0;
}
}
EMC_SEMAPHORE_GIVE();
}
*/
int MqttClient::_sendPacket() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
OutgoingPacket* packet = _outbox.getCurrent();
int32_t wantToWrite = 0;
int32_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->available(_bytesSent);
wantToWrite = packet->packet.available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
return 0;
}
written = _transport->write(packet->data(_bytesSent), wantToWrite);
written = _transport->write(packet->packet.data(_bytesSent), wantToWrite);
if (written < 0) {
emc_log_w("Write error, check connection");
EMC_SEMAPHORE_GIVE();
return -1;
}
// handle with care! millis() returns unsigned 32 bit, token is void*
static_assert(sizeof(uint32_t) <= sizeof(void*), "the size of uint32_t must be smaller than or equal to the size of a pointer");
packet->token = reinterpret_cast<void*>(millis());
packet->timeSent = millis();
_lastClientActivity = millis();
_bytesSent += written;
emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->size(), packet->packetType());
emc_log_i("tx %zu/%zu (%02x)", _bytesSent, packet->packet.size(), packet->packet.packetType());
}
EMC_SEMAPHORE_GIVE();
return written;
@@ -387,17 +349,17 @@ int MqttClient::_sendPacket() {
bool MqttClient::_advanceOutbox() {
EMC_SEMAPHORE_TAKE();
Packet* packet = _outbox.getCurrent();
if (packet && _bytesSent == packet->size()) {
if ((packet->packetType()) == PacketType.DISCONNECT) {
OutgoingPacket* packet = _outbox.getCurrent();
if (packet && _bytesSent == packet->packet.size()) {
if ((packet->packet.packetType()) == PacketType.DISCONNECT) {
_state = State::disconnectingTcp1;
_disconnectReason = DisconnectReason::USER_OK;
}
if (packet->removable()) {
if (packet->packet.removable()) {
_outbox.removeCurrent();
} else {
// we already set 'dup' here, in case we have to retry
if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup();
if ((packet->packet.packetType()) == PacketType.PUBLISH) packet->packet.setDup();
_outbox.next();
}
packet = _outbox.getCurrent();
@@ -500,14 +462,17 @@ void MqttClient::_checkPing() {
}
void MqttClient::_checkTimeout() {
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
if (it && _bytesSent == 0) { // check that we're not busy sending
if (millis() - *((uint32_t*)&(it.get()->token)) > _timeout) { // NOLINT(readability/casting)
// TODO(bertmelis): fix ugly casting hack
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
// check that we're not busy sending
// don't check when first item hasn't been sent yet
if (it && _bytesSent == 0 && it.get() != _outbox.getCurrent()) {
if (millis() - it.get()->timeSent > _timeout) {
emc_log_w("Packet ack timeout, retrying");
_outbox.resetCurrent();
}
}
EMC_SEMAPHORE_GIVE();
}
void MqttClient::_onConnack() {
@@ -545,9 +510,9 @@ void MqttClient::_onPublish() {
}
} else if (qos == 2) {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
while (it) {
if ((it.get()->packetType()) == PacketType.PUBREC && it.get()->packetId() == packetId) {
if ((it.get()->packet.packetType()) == PacketType.PUBREC && it.get()->packet.packetId() == packetId) {
callback = false;
emc_log_e("QoS2 packet previously delivered");
break;
@@ -573,12 +538,12 @@ void MqttClient::_onPuback() {
bool callback = false;
uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId;
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
while (it) {
// PUBACKs come in the order PUBs are sent. So we only check the first PUB packet in outbox
// if it doesn't match the ID, return
if ((it.get()->packetType()) == PacketType.PUBLISH) {
if (it.get()->packetId() == idToMatch) {
if ((it.get()->packet.packetType()) == PacketType.PUBLISH) {
if (it.get()->packet.packetId() == idToMatch) {
callback = true;
_outbox.remove(it);
break;
@@ -600,12 +565,12 @@ void MqttClient::_onPubrec() {
bool success = false;
uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId;
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
while (it) {
// PUBRECs come in the order PUBs are sent. So we only check the first PUB packet in outbox
// if it doesn't match the ID, return
if ((it.get()->packetType()) == PacketType.PUBLISH) {
if (it.get()->packetId() == idToMatch) {
if ((it.get()->packet.packetType()) == PacketType.PUBLISH) {
if (it.get()->packet.packetId() == idToMatch) {
if (!_addPacket(PacketType.PUBREL, idToMatch)) {
emc_log_e("Could not create PUBREL packet");
}
@@ -628,12 +593,12 @@ void MqttClient::_onPubrel() {
bool success = false;
uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId;
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
while (it) {
// PUBRELs come in the order PUBRECs are sent. So we only check the first PUBREC packet in outbox
// if it doesn't match the ID, return
if ((it.get()->packetType()) == PacketType.PUBREC) {
if (it.get()->packetId() == idToMatch) {
if ((it.get()->packet.packetType()) == PacketType.PUBREC) {
if (it.get()->packet.packetId() == idToMatch) {
if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
}
@@ -655,13 +620,13 @@ void MqttClient::_onPubrel() {
void MqttClient::_onPubcomp() {
bool callback = false;
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId;
while (it) {
// PUBCOMPs come in the order PUBRELs are sent. So we only check the first PUBREL packet in outbox
// if it doesn't match the ID, return
if ((it.get()->packetType()) == PacketType.PUBREL) {
if (it.get()->packetId() == idToMatch) {
if ((it.get()->packet.packetType()) == PacketType.PUBREL) {
if (it.get()->packet.packetId() == idToMatch) {
if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
}
@@ -686,9 +651,9 @@ void MqttClient::_onSuback() {
bool callback = false;
uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId;
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
while (it) {
if (((it.get()->packetType()) == PacketType.SUBSCRIBE) && it.get()->packetId() == idToMatch) {
if (((it.get()->packet.packetType()) == PacketType.SUBSCRIBE) && it.get()->packet.packetId() == idToMatch) {
callback = true;
_outbox.remove(it);
break;
@@ -706,10 +671,10 @@ void MqttClient::_onSuback() {
void MqttClient::_onUnsuback() {
bool callback = false;
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
uint16_t idToMatch = _parser.getPacket().variableHeader.fixed.packetId;
while (it) {
if (it.get()->packetId() == idToMatch) {
if (it.get()->packet.packetId() == idToMatch) {
callback = true;
_outbox.remove(it);
break;
@@ -727,16 +692,16 @@ void MqttClient::_onUnsuback() {
void MqttClient::_clearQueue(int clearData) {
emc_log_i("clearing queue (clear session: %d)", clearData);
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
if (clearData == 0) {
// keep PUB (qos > 0, aka packetID != 0), PUBREC and PUBREL
// Spec only mentions PUB and PUBREL but this lib implements method B from point 4.3.3 (Fig. 4.3)
// and stores the packet id in the PUBREC packet. So we also must keep PUBREC.
while (it) {
espMqttClientInternals::MQTTPacketType type = it.get()->packetType();
espMqttClientInternals::MQTTPacketType type = it.get()->packet.packetType();
if (type == PacketType.PUBREC ||
type == PacketType.PUBREL ||
(type == PacketType.PUBLISH && it.get()->packetId() != 0)) {
(type == PacketType.PUBLISH && it.get()->packet.packetId() != 0)) {
++it;
} else {
_outbox.remove(it);
@@ -745,7 +710,7 @@ void MqttClient::_clearQueue(int clearData) {
} else if (clearData == 1) {
// keep PUB
while (it) {
if (it.get()->packetType() == PacketType.PUBLISH) {
if (it.get()->packet.packetType() == PacketType.PUBLISH) {
++it;
} else {
_outbox.remove(it);