update espMqttclient

This commit is contained in:
technyon
2024-03-02 04:30:46 +01:00
parent e92d0e9ac9
commit 7219ddd556
36 changed files with 667 additions and 210 deletions

View File

@@ -14,12 +14,8 @@ using espMqttClientTypes::DisconnectReason;
using espMqttClientTypes::Error;
MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint8_t priority, uint8_t core)
#if defined(ARDUINO_ARCH_ESP32)
: _useInternalTask(useInternalTask)
, _transport(nullptr)
#else
: _transport(nullptr)
#endif
, _onConnectCallback(nullptr)
, _onDisconnectCallback(nullptr)
, _onSubscribeCallback(nullptr)
@@ -41,7 +37,7 @@ MqttClient::MqttClient(espMqttClientTypes::UseInternalTask useInternalTask, uint
, _willPayloadLength(0)
, _willQos(0)
, _willRetain(false)
, _timeout(10000)
, _timeout(EMC_TX_TIMEOUT)
, _state(State::disconnected)
, _generatedClientId{0}
, _packetId(0)
@@ -101,7 +97,7 @@ bool MqttClient::disconnected() const {
}
bool MqttClient::connect() {
bool result = true;
bool result = false;
if (_state == State::disconnected) {
EMC_SEMAPHORE_TAKE();
if (_addPacketFront(_cleanSession,
@@ -114,17 +110,17 @@ bool MqttClient::connect() {
_willPayloadLength,
(uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally
_clientId)) {
result = true;
_setState(State::connectingTcp1);
#if defined(ARDUINO_ARCH_ESP32)
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskResume(_taskHandle);
}
#endif
_state = State::connectingTcp1;
} else {
EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create CONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
result = false;
}
EMC_SEMAPHORE_GIVE();
}
@@ -133,11 +129,11 @@ bool MqttClient::connect() {
bool MqttClient::disconnect(bool force) {
if (force && _state != State::disconnected && _state != State::disconnectingTcp1 && _state != State::disconnectingTcp2) {
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
return true;
}
if (!force && _state == State::connected) {
_state = State::disconnectingMqtt1;
_setState(State::disconnectingMqtt1);
return true;
}
return false;
@@ -151,8 +147,8 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const
#endif
return 0;
}
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE();
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
if (!_addPacket(packetId, topic, payload, length, qos, retain)) {
emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY);
@@ -175,8 +171,8 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqt
#endif
return 0;
}
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
EMC_SEMAPHORE_TAKE();
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
if (!_addPacket(packetId, topic, callback, length, qos, retain)) {
emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY);
@@ -194,6 +190,14 @@ const char* MqttClient::getClientId() const {
return _clientId;
}
size_t MqttClient::queueSize() {
size_t ret = 0;
EMC_SEMAPHORE_TAKE();
ret = _outbox.size();
EMC_SEMAPHORE_GIVE();
return ret;
}
void MqttClient::loop() {
switch (_state) {
case State::disconnected:
@@ -205,9 +209,9 @@ void MqttClient::loop() {
break;
case State::connectingTcp1:
if (_useIp ? _transport->connect(_ip, _port) : _transport->connect(_host, _port)) {
_state = State::connectingTcp2;
_setState(State::connectingTcp2);
} else {
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
break;
}
@@ -217,7 +221,7 @@ void MqttClient::loop() {
if (_transport->connected()) {
_parser.reset();
_lastClientActivity = _lastServerActivity = millis();
_state = State::connectingMqtt;
_setState(State::connectingMqtt);
}
break;
case State::connectingMqtt:
@@ -227,7 +231,7 @@ void MqttClient::loop() {
_checkIncoming();
_checkPing();
} else {
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
break;
@@ -247,7 +251,7 @@ void MqttClient::loop() {
_checkPing();
_checkTimeout();
} else {
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
break;
@@ -259,7 +263,7 @@ void MqttClient::loop() {
emc_log_e("Could not create DISCONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
} else {
_state = State::disconnectingMqtt2;
_setState(State::disconnectingMqtt2);
}
}
EMC_SEMAPHORE_GIVE();
@@ -270,13 +274,13 @@ void MqttClient::loop() {
break;
case State::disconnectingTcp1:
_transport->stop();
_state = State::disconnectingTcp2;
_setState(State::disconnectingTcp2);
break; // keep break to accomodate async clients
case State::disconnectingTcp2:
if (_transport->disconnected()) {
_clearQueue(0);
_bytesSent = 0;
_state = State::disconnected;
_setState(State::disconnected);
if (_onDisconnectCallback) _onDisconnectCallback(_disconnectReason);
}
break;
@@ -308,13 +312,15 @@ void MqttClient::_loop(MqttClient* c) {
}
#endif
inline void MqttClient::_setState(State newState) {
emc_log_i("state %i --> %i", static_cast<std::underlying_type<State>::type>(_state.load()), static_cast<std::underlying_type<State>::type>(newState));
_state = newState;
}
uint16_t MqttClient::_getNextPacketId() {
uint16_t packetId = 0;
EMC_SEMAPHORE_TAKE();
// cppcheck-suppress knownConditionTrueFalse
packetId = (++_packetId == 0) ? ++_packetId : _packetId;
EMC_SEMAPHORE_GIVE();
return packetId;
++_packetId;
if (_packetId == 0) ++_packetId;
return _packetId;
}
void MqttClient::_checkOutbox() {
@@ -329,21 +335,14 @@ int MqttClient::_sendPacket() {
EMC_SEMAPHORE_TAKE();
OutgoingPacket* packet = _outbox.getCurrent();
int32_t wantToWrite = 0;
int32_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->packet.available(_bytesSent);
size_t written = 0;
if (packet) {
size_t wantToWrite = packet->packet.available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
return 0;
}
written = _transport->write(packet->packet.data(_bytesSent), wantToWrite);
if (written < 0) {
emc_log_w("Write error, check connection");
EMC_SEMAPHORE_GIVE();
return -1;
}
packet->timeSent = millis();
_lastClientActivity = millis();
_bytesSent += written;
@@ -358,7 +357,7 @@ bool MqttClient::_advanceOutbox() {
OutgoingPacket* packet = _outbox.getCurrent();
if (packet && _bytesSent == packet->packet.size()) {
if ((packet->packet.packetType()) == PacketType.DISCONNECT) {
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::USER_OK;
}
if (packet->packet.removable()) {
@@ -388,7 +387,7 @@ void MqttClient::_checkIncoming() {
espMqttClientInternals::MQTTPacketType packetType = _parser.getPacket().fixedHeader.packetType & 0xF0;
if (_state == State::connectingMqtt && packetType != PacketType.CONNACK) {
emc_log_w("Disconnecting, expected CONNACK - protocol error");
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
return;
}
switch (packetType & 0xF0) {
@@ -426,7 +425,7 @@ void MqttClient::_checkIncoming() {
}
} else if (result == espMqttClientInternals::ParserResult::protocolError) {
emc_log_w("Disconnecting, protocol error");
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
return;
}
@@ -446,7 +445,7 @@ void MqttClient::_checkPing() {
// disconnect when server was inactive for twice the keepalive time
if (currentMillis - _lastServerActivity > 2 * _keepAlive) {
emc_log_w("Disconnecting, server exceeded keepalive");
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
return;
}
@@ -484,7 +483,7 @@ void MqttClient::_checkTimeout() {
void MqttClient::_onConnack() {
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode == 0x00) {
_pingSent = false; // reset after keepalive timeout disconnect
_state = State::connected;
_setState(State::connected);
_advanceOutbox();
if (_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent == 0) {
_clearQueue(1);
@@ -493,14 +492,14 @@ void MqttClient::_onConnack() {
_onConnectCallback(_parser.getPacket().variableHeader.fixed.connackVarHeader.sessionPresent);
}
} else {
_state = State::disconnectingTcp1;
_setState(State::disconnectingTcp1);
// cast is safe because the parser already checked for a valid return code
_disconnectReason = static_cast<DisconnectReason>(_parser.getPacket().variableHeader.fixed.connackVarHeader.returnCode);
}
}
void MqttClient::_onPublish() {
espMqttClientInternals::IncomingPacket p = _parser.getPacket();
const espMqttClientInternals::IncomingPacket& p = _parser.getPacket();
uint8_t qos = p.qos();
bool retain = p.retain();
bool dup = p.dup();
@@ -633,9 +632,6 @@ void MqttClient::_onPubcomp() {
// if it doesn't match the ID, return
if ((it.get()->packet.packetType()) == PacketType.PUBREL) {
if (it.get()->packet.packetId() == idToMatch) {
if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
}
callback = true;
_outbox.remove(it);
break;