Publishing and subscribing using ESP32


#1

Hi to all!
I´m starting my first MQTT project and i face a dead end.
I want to collect various sensor data and also control relays by the UBIDOTS Dashboard.
Separatly i can achieve that, i can only publish various topics and also subcribe to various topics.
The problem is when i try to do it all together, my guess is a timming issue on the ESP but i found no where to turn.
Can you help me?


#2
/****************************************
 * Include Libraries
 ****************************************/
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
//#define WIFISSID "" // Put your WifiSSID here
//#define PASSWORD "" // Put your wifi password here
#define TOKEN "" // Put your Ubidots' TOKEN
#define MQTT_CLIENT_NAME "Random" // MQTT client Name, please enter your own 8-12 alphanumeric character ASCII string; 
                                           //it should be a random and unique ascii string and different from all other devices

/****************************************
 * Define Constants
 ****************************************/
 //IN
#define PHASE1_LABEL "current1" // Assing the variable label
#define PHASE2_LABEL "current2" // Assing the variable label
#define PHASE3_LABEL "current3" // Assing the variable label
#define TEMP_LABEL "temperature" // Assing the variable label
#define HUMID_LABEL "humidity" // Assing the variable label
//OUT
#define VARIABLE_LABEL_SUB_0 "relay_big" // Assing the variable label
#define VARIABLE_LABEL_SUB_1 "relay_small" // Assing the variable 
#define VARIABLE_LABEL_SUB_2 "relay_fan" // Assing the variable 

#define DHTTYPE DHT22   // DHT 22  (AM2302), AM2321
uint8_t DHTPin = 4; 


#define DEVICE_LABEL "esp32" // Assig the device label
/****************************************
 * Define Pins
 ****************************************/
#define SENSOR0 32 // Set the GPIO12 as SENSOR
#define SENSOR1 35 // Set the GPIO12 as SENSOR
#define SENSOR2 34 // Set the GPIO12 as SENSOR
#define R1 33 // Set the GPIO as RELAY
#define R2 25
#define R3 26


char mqttBroker[]  = "things.ubidots.com";
char payload[100];
char topic[150];
// Space to store values to send
char str_sensor1[10];
char str_sensor2[10];
char str_sensor3[10];
char str_temp[10];
char str_humid[10];

const uint8_t NUMBER_OF_VARIABLES = 16; // Number of variable to subscribe to
char * variable_labels[NUMBER_OF_VARIABLES] = {"relay_big", "relay_small", "relay_fan"}; // labels of the variable to subscribe to

float CONTROL1; // Name of the variable to be used within the code.
float CONTROL2; // Name of the variable to be used within the code.
float CONTROL3; // Name of the variable to be used within the code.

float value; // To store incoming value.
uint8_t variable; // To keep track of the state machine and be able to use the switch case.
const int ERROR_VALUE = 65535; // Set here an error value
/****************************************
 * Interrupts
 ****************************************/
//--------------- initialize timer1 16bits ( 2Hz )--------------- 
 
/****************************************
 * Auxiliar Functions
 ****************************************/
WiFiClient ubidots;
PubSubClient client(ubidots);
DHT dht(DHTPin, DHTTYPE);                

float Temperature;
float Humidity;

void get_variable_label_topic(char * topic, char * variable_label) {
  Serial.print("topic:");
  Serial.println(topic);
  sprintf(variable_label, "");
  for (int i = 0; i < NUMBER_OF_VARIABLES; i++) {
    char * result_lv = strstr(topic, variable_labels[i]);
    if (result_lv != NULL) {
      uint8_t len = strlen(result_lv);
      char result[100];
      uint8_t i = 0;
      for (i = 0; i < len - 3; i++) {
        result[i] = result_lv[i];
      }
      result[i] = '\0';
      Serial.print("Label is: ");
      Serial.println(result);
      sprintf(variable_label, "%s", result);
      break;
    }
  }
}

float btof(byte * payload, unsigned int length) {
  char * demo_ = (char *) malloc(sizeof(char) * 10);
  for (int i = 0; i < length; i++) {
    demo_[i] = payload[i];
  }
  return atof(demo_);
}


void set_state(char* variable_label) {
  variable = 0;
  for (uint8_t i = 0; i < NUMBER_OF_VARIABLES; i++) {
    if (strcmp(variable_label, variable_labels[i]) == 0) {
      break;
    }
    variable++;
  }
  if (variable >= NUMBER_OF_VARIABLES) variable = ERROR_VALUE; // Not valid
}

void execute_cases() {
  switch (variable) {
    case 0:
      CONTROL1 = value;
      digitalWrite(R1,value);
      Serial.print("CONTROL1: ");
      Serial.println(CONTROL1);
      Serial.println();
      break;
    case 1:
      CONTROL2 = value;
        digitalWrite(R2,value);
      Serial.print("CONTROL2: ");
      Serial.println(CONTROL2);
      Serial.println();
      break;
    case 2:
      CONTROL3 = value;
        digitalWrite(R3,value);
      Serial.print("CONTROL3: ");
      Serial.println(CONTROL3);
      Serial.println();
      break;
    case ERROR_VALUE:
      Serial.println("error");
      Serial.println();
      break;
    default:
      Serial.println("default");
      Serial.println();
  }

}

void callback(char* topic, byte* payload, unsigned int length) {
  char p[length + 1];
  memcpy(p, payload, length);
  p[length] = NULL;
  String message(p);
  Serial.write(payload, length);
  Serial.println(topic);
}

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.println("Attempting MQTT connection...");
    
    // Attemp to connect
    if (client.connect(MQTT_CLIENT_NAME, TOKEN, "")) {
      Serial.println("Connected");
    } else {
      Serial.print("Failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 2 seconds");
      // Wait 2 seconds before retrying
      delay(2000);
    }
  }
}

/****************************************
 * Main Functions
 ****************************************/
void setup() {
  Serial.begin(115200);
  WiFi.begin(WIFISSID, PASSWORD);
  // Assign the pin as INPUT 
  pinMode(SENSOR0, INPUT);
  pinMode(SENSOR1, INPUT);
  pinMode(SENSOR2, INPUT);
  pinMode(DHTPin, INPUT);

  pinMode(R1, OUTPUT);
  pinMode(R2, OUTPUT);
  pinMode(R3, OUTPUT);


  dht.begin();  //Starting DHTT
   
  Serial.println();
  Serial.print("Wait for WiFi...");
  
  while (WiFi.status() != WL_CONNECTED) {
    Serial.print(".");
    delay(500);
  }
  
  Serial.println("");
  Serial.println("WiFi Connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
  client.setServer(mqttBroker, 1883);
  client.setCallback(callback);  
}

void loop() {
  if (!client.connected()) {
    reconnect();
    }
       // Subscribes for getting the value of the control variable in the temperature-box device
    char topicToSubscribe_variable_1[200];
    sprintf(topicToSubscribe_variable_1, "%s", ""); // Cleans the content of the char
    sprintf(topicToSubscribe_variable_1, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
    sprintf(topicToSubscribe_variable_1, "%s/%s/lv", topicToSubscribe_variable_1, VARIABLE_LABEL_SUB_0);
    Serial.println("subscribing to topic:");
    Serial.println(topicToSubscribe_variable_1);
    client.subscribe(topicToSubscribe_variable_1);

    char topicToSubscribe_variable_2[200];
    sprintf(topicToSubscribe_variable_2, "%s", ""); // Cleans the content of the char
    sprintf(topicToSubscribe_variable_2, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
    sprintf(topicToSubscribe_variable_2, "%s/%s/lv", topicToSubscribe_variable_2, VARIABLE_LABEL_SUB_1);
    Serial.println("subscribing to topic:");
    Serial.println(topicToSubscribe_variable_2);
    client.subscribe(topicToSubscribe_variable_2);

    char topicToSubscribe_variable_3[200];
    sprintf(topicToSubscribe_variable_3, "%s", ""); // Cleans the content of the char
    sprintf(topicToSubscribe_variable_3, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
    sprintf(topicToSubscribe_variable_3, "%s/%s/lv", topicToSubscribe_variable_3, VARIABLE_LABEL_SUB_2);
    Serial.println("subscribing to topic:");
    Serial.println(topicToSubscribe_variable_3);
    client.subscribe(topicToSubscribe_variable_3);
    
  //------Current Sensor value--------
  float sensor1 = (3.3*analogRead(SENSOR0))/4096;
  float sensor2 = (3.3*analogRead(SENSOR1))/4096; 
  float sensor3 = (3.3*analogRead(SENSOR2))/4096;
  //--------------DHT Sensor------------------
  Temperature = dht.readTemperature(); // Gets the values of the temperature
  Humidity = dht.readHumidity(); // Gets the values of the humidity   
  //--------------Uploading Values-------------
  
  sprintf(topic, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
  sprintf(payload, "%s", ""); // Cleans the payload
  sprintf(payload, "{\"%s\":", PHASE1_LABEL); // Adds the variable label
  
  /* 4 is mininum width, 2 is precision; float value is copied onto str_sensor*/
  dtostrf(sensor1, 4, 2, str_sensor1);
  dtostrf(sensor2, 4, 2, str_sensor2);
  dtostrf(sensor3, 4, 2, str_sensor3);
  dtostrf(Temperature, 4, 2, str_temp);
  dtostrf(Humidity, 4, 2, str_humid);
  
  sprintf(payload, "{\"");
  sprintf(payload, "%s%s\":%s", payload, PHASE1_LABEL, str_sensor1);
  sprintf(payload, "%s,\"%s\":%s", payload, PHASE2_LABEL, str_sensor2);
  sprintf(payload, "%s,\"%s\":%s", payload, PHASE3_LABEL, str_sensor3);
  sprintf(payload, "%s,\"%s\":%s", payload, TEMP_LABEL, str_temp);
  sprintf(payload, "%s,\"%s\":%s", payload, HUMID_LABEL, str_humid);
  
  sprintf(payload, "%s}", payload);
  
  Serial.println(payload);
  client.publish(topic, payload);

  client.loop();
  delay(500);
}

#3

Hi there, the example from this link may serve you as reference to publish and subscribe using the same tcp client. Feel free to adapt it to your needs.

All the best


#4

Thank you it helped and i am now at the end of my project with another hasle, connection problems…
Made a new topic to it :sweat_smile:

If your curious: ESP32 after 1h disconnects from ubidots and never reconnescts