use connect and disconnect callback for reconnect logic

This commit is contained in:
technyon
2023-01-31 21:02:46 +01:00
parent 3a65e2ad5a
commit df9b971eed
9 changed files with 152 additions and 7 deletions

View File

@@ -76,6 +76,15 @@ void Network::setupDevice()
_device = new WifiDevice(_hostname, _preferences);
break;
}
_device->mqttOnConnect([&](bool sessionPresent)
{
onMqttConnect(sessionPresent);
});
_device->mqttOnDisconnect([&](espMqttClientTypes::DisconnectReason reason)
{
onMqttDisconnect(reason);
});
}
void Network::initialize()
@@ -233,6 +242,49 @@ bool Network::update()
return true;
}
void Network::onMqttConnect(const bool &sessionPresent)
{
_connectReplyReceived = true;
}
void Network::onMqttDisconnect(const espMqttClientTypes::DisconnectReason &reason)
{
_connectReplyReceived = true;
Log->print("MQTT disconnected. Reason: ");
switch(reason)
{
case espMqttClientTypes::DisconnectReason::USER_OK:
Log->println(F("USER_OK"));
break;
case espMqttClientTypes::DisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
Log->println(F("MQTT_UNACCEPTABLE_PROTOCOL_VERSION"));
break;
case espMqttClientTypes::DisconnectReason::MQTT_IDENTIFIER_REJECTED:
Log->println(F("MQTT_IDENTIFIER_REJECTED"));
break;
case espMqttClientTypes::DisconnectReason::MQTT_SERVER_UNAVAILABLE:
Log->println(F("MQTT_SERVER_UNAVAILABLE"));
break;
case espMqttClientTypes::DisconnectReason::MQTT_MALFORMED_CREDENTIALS:
Log->println(F("MQTT_MALFORMED_CREDENTIALS"));
break;
case espMqttClientTypes::DisconnectReason::MQTT_NOT_AUTHORIZED:
Log->println(F("MQTT_NOT_AUTHORIZED"));
break;
case espMqttClientTypes::DisconnectReason::TLS_BAD_FINGERPRINT:
Log->println(F("TLS_BAD_FINGERPRINT"));
break;
case espMqttClientTypes::DisconnectReason::TCP_DISCONNECTED:
Log->println(F("TCP_DISCONNECTED"));
break;
default:
Log->println(F("Unknown"));
break;
}
}
bool Network::reconnect()
{
_mqttConnectionState = 0;
@@ -249,6 +301,7 @@ bool Network::reconnect()
Log->println(F("Attempting MQTT connection"));
_connectReplyReceived = false;
if(strlen(_mqttUser) == 0)
{
Log->println(F("MQTT: Connecting without credentials"));
@@ -263,22 +316,23 @@ bool Network::reconnect()
_device->mqttConnect();
}
bool connected = _device->mqttConnected();
unsigned long timeout = millis() + 3000;
unsigned long timeout = millis() + 60000;
while(!connected && millis() < timeout)
while(!_connectReplyReceived && millis() < timeout)
{
connected = _device->mqttConnected();
delay(200);
if(_keepAliveCallback != nullptr)
{
_keepAliveCallback();
}
}
if (connected)
if (_device->mqttConnected())
{
Log->println(F("MQTT connected"));
_mqttConnectionState = 1;
delay(100);
// TODO
_device->mqttOnMessage(Network::onMqttDataReceivedCallback);
for(const String& topic : _subscribedTopics)
{
@@ -298,10 +352,10 @@ bool Network::reconnect()
else
{
Log->print(F("MQTT connect failed, rc="));
// Log->println(_device->mqttClient()->connectError());
_device->printError();
_mqttConnectionState = 0;
_nextReconnect = millis() + 5000;
_device->mqttDisonnect(true);
}
}
return _mqttConnectionState > 0;
@@ -847,3 +901,8 @@ uint16_t Network::subscribe(const char *topic, uint8_t qos)
{
return _device->mqttSubscribe(topic, qos);
}
void Network::setKeepAliveCallback(std::function<void()> reconnectTick)
{
_keepAliveCallback = reconnectTick;
}

View File

@@ -50,12 +50,17 @@ public:
uint16_t subscribe(const char* topic, uint8_t qos);
void setKeepAliveCallback(std::function<void()> reconnectTick);
private:
static void onMqttDataReceivedCallback(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
void onMqttDataReceived(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
void setupDevice();
bool reconnect();
void onMqttConnect(const bool& sessionPresent);
void onMqttDisconnect(const espMqttClientTypes::DisconnectReason& reason);
void buildMqttPath(const char* prefix, const char* path, char* outPath);
static Network* _inst;
@@ -64,6 +69,7 @@ private:
char _hostnameArr[101] = {0};
NetworkDevice* _device = nullptr;
int _mqttConnectionState = 0;
bool _connectReplyReceived = false;
unsigned long _nextReconnect = 0;
char _mqttBrokerAddr[101] = {0};
@@ -83,6 +89,7 @@ private:
unsigned long _lastMaintenanceTs = 0;
unsigned long _lastRssiTs = 0;
long _rssiPublishInterval = 0;
std::function<void()> _keepAliveCallback;
NetworkDeviceType _networkDeviceType = (NetworkDeviceType)-1;

View File

@@ -161,6 +161,11 @@ void WebCfgServer::initialize()
});
_server.begin();
_network->setKeepAliveCallback([&]()
{
update();
});
}
bool WebCfgServer::processArgs(String& message)

View File

@@ -67,6 +67,11 @@ void nukiTask(void *pvParameters)
}
}
void webServerTask(void *pvParameters)
{
}
void presenceDetectionTask(void *pvParameters)
{
while(true)

View File

@@ -35,8 +35,12 @@ public:
virtual bool mqttConnected() const = 0;
virtual void mqttSetServer(const char* host, uint16_t port) = 0;
virtual bool mqttConnect() = 0;
virtual bool mqttDisonnect(bool force) = 0;
virtual void mqttSetCredentials(const char* username, const char* password) = 0;
virtual void mqttOnMessage(espMqttClientTypes::OnMessageCallback callback) = 0;
virtual void mqttOnConnect(espMqttClientTypes::OnConnectCallback callback) = 0;
virtual void mqttOnDisconnect(espMqttClientTypes::OnDisconnectCallback callback) = 0;
virtual uint16_t mqttSubscribe(const char* topic, uint8_t qos) = 0;
protected:

View File

@@ -217,6 +217,11 @@ bool W5500Device::mqttConnect()
return _mqttClient.connect();
}
bool W5500Device::mqttDisonnect(bool force)
{
return _mqttClient.disconnect(force);
}
void W5500Device::mqttSetCredentials(const char *username, const char *password)
{
_mqttClient.setCredentials(username, password);
@@ -227,6 +232,16 @@ void W5500Device::mqttOnMessage(espMqttClientTypes::OnMessageCallback callback)
_mqttClient.onMessage(callback);
}
void W5500Device::mqttOnConnect(espMqttClientTypes::OnConnectCallback callback)
{
_mqttClient.onConnect(callback);
}
void W5500Device::mqttOnDisconnect(espMqttClientTypes::OnDisconnectCallback callback)
{
_mqttClient.onDisconnect(callback);
}
uint16_t W5500Device::mqttSubscribe(const char *topic, uint8_t qos)
{
return _mqttClient.subscribe(topic, qos);

View File

@@ -39,10 +39,16 @@ public:
bool mqttConnect() override;
bool mqttDisonnect(bool force) override;
void mqttSetCredentials(const char *username, const char *password) override;
void mqttOnMessage(espMqttClientTypes::OnMessageCallback callback) override;
void mqttOnConnect(espMqttClientTypes::OnConnectCallback callback) override;
void mqttOnDisconnect(espMqttClientTypes::OnDisconnectCallback callback) override;
uint16_t mqttSubscribe(const char *topic, uint8_t qos) override;
private:

View File

@@ -241,6 +241,18 @@ bool WifiDevice::mqttConnect()
}
}
bool WifiDevice::mqttDisonnect(bool force)
{
if(_useEncryption)
{
return _mqttClientSecure->disconnect(force);
}
else
{
return _mqttClient->disconnect(force);
}
}
void WifiDevice::mqttSetCredentials(const char *username, const char *password)
{
if(_useEncryption)
@@ -265,6 +277,32 @@ void WifiDevice::mqttOnMessage(espMqttClientTypes::OnMessageCallback callback)
}
}
void WifiDevice::mqttOnConnect(espMqttClientTypes::OnConnectCallback callback)
{
if(_useEncryption)
{
_mqttClientSecure->onConnect(callback);
}
else
{
_mqttClient->onConnect(callback);
}
}
void WifiDevice::mqttOnDisconnect(espMqttClientTypes::OnDisconnectCallback callback)
{
if(_useEncryption)
{
_mqttClientSecure->onDisconnect(callback);
}
else
{
_mqttClient->onDisconnect(callback);
}
}
uint16_t WifiDevice::mqttSubscribe(const char *topic, uint8_t qos)
{
if(_useEncryption)

View File

@@ -38,10 +38,16 @@ public:
bool mqttConnect() override;
bool mqttDisonnect(bool force) override;
void mqttSetCredentials(const char *username, const char *password) override;
void mqttOnMessage(espMqttClientTypes::OnMessageCallback callback) override;
void mqttOnConnect(espMqttClientTypes::OnConnectCallback callback) override;
void mqttOnDisconnect(espMqttClientTypes::OnDisconnectCallback callback) override;
uint16_t mqttSubscribe(const char *topic, uint8_t qos) override;
private: