[SOLVED] Publishing and subscribing using ESP32

Hi to all!
I´m starting my first MQTT project and i face a dead end.
I want to collect various sensor data and also control relays by the UBIDOTS Dashboard.
Separatly i can achieve that, i can only publish various topics and also subcribe to various topics.
The problem is when i try to do it all together, my guess is a timming issue on the ESP but i found no where to turn.
Can you help me?

/****************************************
 * Include Libraries
 ****************************************/
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
//#define WIFISSID "" // Put your WifiSSID here
//#define PASSWORD "" // Put your wifi password here
#define TOKEN "" // Put your Ubidots' TOKEN
#define MQTT_CLIENT_NAME "Random" // MQTT client Name, please enter your own 8-12 alphanumeric character ASCII string; 
                                           //it should be a random and unique ascii string and different from all other devices

/****************************************
 * Define Constants
 ****************************************/
 //IN
#define PHASE1_LABEL "current1" // Assing the variable label
#define PHASE2_LABEL "current2" // Assing the variable label
#define PHASE3_LABEL "current3" // Assing the variable label
#define TEMP_LABEL "temperature" // Assing the variable label
#define HUMID_LABEL "humidity" // Assing the variable label
//OUT
#define VARIABLE_LABEL_SUB_0 "relay_big" // Assing the variable label
#define VARIABLE_LABEL_SUB_1 "relay_small" // Assing the variable 
#define VARIABLE_LABEL_SUB_2 "relay_fan" // Assing the variable 

#define DHTTYPE DHT22   // DHT 22  (AM2302), AM2321
uint8_t DHTPin = 4; 


#define DEVICE_LABEL "esp32" // Assig the device label
/****************************************
 * Define Pins
 ****************************************/
#define SENSOR0 32 // Set the GPIO12 as SENSOR
#define SENSOR1 35 // Set the GPIO12 as SENSOR
#define SENSOR2 34 // Set the GPIO12 as SENSOR
#define R1 33 // Set the GPIO as RELAY
#define R2 25
#define R3 26


char mqttBroker[]  = "things.ubidots.com";
char payload[100];
char topic[150];
// Space to store values to send
char str_sensor1[10];
char str_sensor2[10];
char str_sensor3[10];
char str_temp[10];
char str_humid[10];

const uint8_t NUMBER_OF_VARIABLES = 16; // Number of variable to subscribe to
char * variable_labels[NUMBER_OF_VARIABLES] = {"relay_big", "relay_small", "relay_fan"}; // labels of the variable to subscribe to

float CONTROL1; // Name of the variable to be used within the code.
float CONTROL2; // Name of the variable to be used within the code.
float CONTROL3; // Name of the variable to be used within the code.

float value; // To store incoming value.
uint8_t variable; // To keep track of the state machine and be able to use the switch case.
const int ERROR_VALUE = 65535; // Set here an error value
/****************************************
 * Interrupts
 ****************************************/
//--------------- initialize timer1 16bits ( 2Hz )--------------- 
 
/****************************************
 * Auxiliar Functions
 ****************************************/
WiFiClient ubidots;
PubSubClient client(ubidots);
DHT dht(DHTPin, DHTTYPE);                

float Temperature;
float Humidity;

void get_variable_label_topic(char * topic, char * variable_label) {
  Serial.print("topic:");
  Serial.println(topic);
  sprintf(variable_label, "");
  for (int i = 0; i < NUMBER_OF_VARIABLES; i++) {
    char * result_lv = strstr(topic, variable_labels[i]);
    if (result_lv != NULL) {
      uint8_t len = strlen(result_lv);
      char result[100];
      uint8_t i = 0;
      for (i = 0; i < len - 3; i++) {
        result[i] = result_lv[i];
      }
      result[i] = '\0';
      Serial.print("Label is: ");
      Serial.println(result);
      sprintf(variable_label, "%s", result);
      break;
    }
  }
}

float btof(byte * payload, unsigned int length) {
  char * demo_ = (char *) malloc(sizeof(char) * 10);
  for (int i = 0; i < length; i++) {
    demo_[i] = payload[i];
  }
  return atof(demo_);
}


void set_state(char* variable_label) {
  variable = 0;
  for (uint8_t i = 0; i < NUMBER_OF_VARIABLES; i++) {
    if (strcmp(variable_label, variable_labels[i]) == 0) {
      break;
    }
    variable++;
  }
  if (variable >= NUMBER_OF_VARIABLES) variable = ERROR_VALUE; // Not valid
}

void execute_cases() {
  switch (variable) {
    case 0:
      CONTROL1 = value;
      digitalWrite(R1,value);
      Serial.print("CONTROL1: ");
      Serial.println(CONTROL1);
      Serial.println();
      break;
    case 1:
      CONTROL2 = value;
        digitalWrite(R2,value);
      Serial.print("CONTROL2: ");
      Serial.println(CONTROL2);
      Serial.println();
      break;
    case 2:
      CONTROL3 = value;
        digitalWrite(R3,value);
      Serial.print("CONTROL3: ");
      Serial.println(CONTROL3);
      Serial.println();
      break;
    case ERROR_VALUE:
      Serial.println("error");
      Serial.println();
      break;
    default:
      Serial.println("default");
      Serial.println();
  }

}

void callback(char* topic, byte* payload, unsigned int length) {
  char p[length + 1];
  memcpy(p, payload, length);
  p[length] = NULL;
  String message(p);
  Serial.write(payload, length);
  Serial.println(topic);
}

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.println("Attempting MQTT connection...");
    
    // Attemp to connect
    if (client.connect(MQTT_CLIENT_NAME, TOKEN, "")) {
      Serial.println("Connected");
    } else {
      Serial.print("Failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 2 seconds");
      // Wait 2 seconds before retrying
      delay(2000);
    }
  }
}

/****************************************
 * Main Functions
 ****************************************/
void setup() {
  Serial.begin(115200);
  WiFi.begin(WIFISSID, PASSWORD);
  // Assign the pin as INPUT 
  pinMode(SENSOR0, INPUT);
  pinMode(SENSOR1, INPUT);
  pinMode(SENSOR2, INPUT);
  pinMode(DHTPin, INPUT);

  pinMode(R1, OUTPUT);
  pinMode(R2, OUTPUT);
  pinMode(R3, OUTPUT);


  dht.begin();  //Starting DHTT
   
  Serial.println();
  Serial.print("Wait for WiFi...");
  
  while (WiFi.status() != WL_CONNECTED) {
    Serial.print(".");
    delay(500);
  }
  
  Serial.println("");
  Serial.println("WiFi Connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
  client.setServer(mqttBroker, 1883);
  client.setCallback(callback);  
}

void loop() {
  if (!client.connected()) {
    reconnect();
    }
       // Subscribes for getting the value of the control variable in the temperature-box device
    char topicToSubscribe_variable_1[200];
    sprintf(topicToSubscribe_variable_1, "%s", ""); // Cleans the content of the char
    sprintf(topicToSubscribe_variable_1, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
    sprintf(topicToSubscribe_variable_1, "%s/%s/lv", topicToSubscribe_variable_1, VARIABLE_LABEL_SUB_0);
    Serial.println("subscribing to topic:");
    Serial.println(topicToSubscribe_variable_1);
    client.subscribe(topicToSubscribe_variable_1);

    char topicToSubscribe_variable_2[200];
    sprintf(topicToSubscribe_variable_2, "%s", ""); // Cleans the content of the char
    sprintf(topicToSubscribe_variable_2, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
    sprintf(topicToSubscribe_variable_2, "%s/%s/lv", topicToSubscribe_variable_2, VARIABLE_LABEL_SUB_1);
    Serial.println("subscribing to topic:");
    Serial.println(topicToSubscribe_variable_2);
    client.subscribe(topicToSubscribe_variable_2);

    char topicToSubscribe_variable_3[200];
    sprintf(topicToSubscribe_variable_3, "%s", ""); // Cleans the content of the char
    sprintf(topicToSubscribe_variable_3, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
    sprintf(topicToSubscribe_variable_3, "%s/%s/lv", topicToSubscribe_variable_3, VARIABLE_LABEL_SUB_2);
    Serial.println("subscribing to topic:");
    Serial.println(topicToSubscribe_variable_3);
    client.subscribe(topicToSubscribe_variable_3);
    
  //------Current Sensor value--------
  float sensor1 = (3.3*analogRead(SENSOR0))/4096;
  float sensor2 = (3.3*analogRead(SENSOR1))/4096; 
  float sensor3 = (3.3*analogRead(SENSOR2))/4096;
  //--------------DHT Sensor------------------
  Temperature = dht.readTemperature(); // Gets the values of the temperature
  Humidity = dht.readHumidity(); // Gets the values of the humidity   
  //--------------Uploading Values-------------
  
  sprintf(topic, "%s%s", "/v1.6/devices/", DEVICE_LABEL);
  sprintf(payload, "%s", ""); // Cleans the payload
  sprintf(payload, "{\"%s\":", PHASE1_LABEL); // Adds the variable label
  
  /* 4 is mininum width, 2 is precision; float value is copied onto str_sensor*/
  dtostrf(sensor1, 4, 2, str_sensor1);
  dtostrf(sensor2, 4, 2, str_sensor2);
  dtostrf(sensor3, 4, 2, str_sensor3);
  dtostrf(Temperature, 4, 2, str_temp);
  dtostrf(Humidity, 4, 2, str_humid);
  
  sprintf(payload, "{\"");
  sprintf(payload, "%s%s\":%s", payload, PHASE1_LABEL, str_sensor1);
  sprintf(payload, "%s,\"%s\":%s", payload, PHASE2_LABEL, str_sensor2);
  sprintf(payload, "%s,\"%s\":%s", payload, PHASE3_LABEL, str_sensor3);
  sprintf(payload, "%s,\"%s\":%s", payload, TEMP_LABEL, str_temp);
  sprintf(payload, "%s,\"%s\":%s", payload, HUMID_LABEL, str_humid);
  
  sprintf(payload, "%s}", payload);
  
  Serial.println(payload);
  client.publish(topic, payload);

  client.loop();
  delay(500);
}

Hi there, the example from this link may serve you as reference to publish and subscribe using the same tcp client. Feel free to adapt it to your needs.

All the best

Thank you it helped and i am now at the end of my project with another hasle, connection problems…
Made a new topic to it :sweat_smile:

If your curious: ESP32 after 1h disconnects from ubidots and never reconnescts

I have the same problem did you solve it can you help me

I couldn´t really find a “problem” itself but solutions to make it active.
Most likely the problem is my router being weird, so i did a static IP, and a condition that if the esp was at the “connect…”, for too long, state it would restart.
That made it be online and self connect