Arduino MKR1000 MQTT Test Sketch
Please find the discussed test vehicle below:
// MQTT Client Benchmark for ARDUINO
// This example uses an Arduino/Genuino MKR1000
// Should also work with Arduino/Genuino Zero together with a WiFi101 Shield
//
#define UbidotsContextSize 128
#define UbidotsBufferSize 256
#define UbidotsTopicSize 81
#define MQTT_SERVER "mqtt://things.ubidots.com"
// #define _PAHO
#ifdef _PAHO
// if you use #define _PAHO, then MQTT by Joel Gaehwiler is used
// the MQTT library for Arduino based on the Eclipse Paho projects
// tested version: 1.10.1
// see http://www.eclipse.org/paho/clients/c/embedded/ for details
// Don't forget to add in your Library Manager: MQTT
#else
// if you do NOT #define _PAHO, then PubSubClient by Nick O'Leary is used
// its subtitled a client library for MQTT messiging
// tested version: 2.6.0
// see http://pubsubclient.knolleary.net/ for details
// Don't forget to add in your Library Manager: PubSubClient
#endif
// Used in MQTTClient.h and in this Sketch:
#define MQTT_BUFFER_SIZE 256
#include <SPI.h>
#include <WiFi101.h>
#ifdef _PAHO
#include <MQTTClient.h>
char _PahoTopic[UbidotsTopicSize];
#else
// see PubSubClient.h::39ff
// Arduino WiFi Shield - if you want to send packets > 90 bytes with this shield, enable the MQTT_MAX_TRANSFER_SIZE define in PubSubClient.h.
#define MQTT_MAX_TRANSFER_SIZE 256
#define MQTT_MAX_PACKET_SIZE 254
#include <PubSubClient.h>
#endif
// This extra include, is due to avoiding String() in loops
#include <avr/dtostrf.h>
#define VERBOSE
char LABEL_DATA_SOURCE[] = "/v1.6/devices/MQTT_Test";
#define DOT_STATE 0
#define DOT_TEST 1
#define DOT_LED 2
#define DOTARRAY_SIZE 3
typedef struct dot_t {
char* label;
char id[25];
float value;
unsigned long count;
char* context;
unsigned long timestamp; // This should be changed to long or to long long and Milliseconds
bool valid;
char* topic;
char buffer[UbidotsBufferSize];
} dot_t;
dot_t dot[] {
{"state", "xxxxxxxxxxxxxxxxxxxxxxxx", 0.0, 0L, NULL, 0L, false, "/v1.6/devices/MQTT_Test/state", ""},
{"test", "xxxxxxxxxxxxxxxxxxxxxxxx", 0.0, 0L, NULL, 0L, false, "/v1.6/devices/MQTT_Test/test", ""},
{"led", "xxxxxxxxxxxxxxxxxxxxxxxx", 1.0, 0L, NULL, 0L, false, "/v1.6/devices/MQTT_Test/led", ""}
};
char ssid[] = "xxxxxxxx"; // your network SSID (name)
char pass[] = "xxxxxxxxx"; // your network password (use for WPA, or use as key for WEP)
WiFiClient net;
#ifdef _PAHO
MQTTClient client;
MQTTMessage _msg; // Required to set retained-Flag, since there is no direct method available
#else
PubSubClient client(net);
#endif
void PrintDot(int index){
Serial.println("Dot["+String(index,DEC)+"] = {");
Serial.print("char* label = \"");
if(dot[index].label) Serial.print(dot[index].label);
else Serial.print(" (char *)NULL");
Serial.println("\"");
Serial.print("char id[25] = \"");
if(dot[index].id) Serial.print(dot[index].id);
Serial.println("\"");
Serial.print("float value = ");
Serial.println(dot[index].value);
Serial.print("ulong count = ");
Serial.println(dot[index].count);
Serial.print("char* context =");
if(dot[index].context) Serial.println(dot[index].context);
else Serial.println(" (char *)NULL");
Serial.print("ulong timestamp = ");
Serial.println(dot[index].timestamp);
Serial.print("bool valid = ");
if(dot[index].valid) Serial.println("true");
else Serial.println("false");
Serial.print("char* topic = \"");
if(dot[index].topic) Serial.print(dot[index].topic);
else Serial.print(" (char *)NULL");
Serial.println("\"");
Serial.print("char buffer["+String(UbidotsBufferSize,DEC)+"]= \"");
if(dot[index].buffer) Serial.print(dot[index].buffer);
Serial.println("\"");
Serial.println("}");
}
void setup() {
pinMode(LED_BUILTIN, OUTPUT);
digitalWrite(LED_BUILTIN, dot[DOT_LED].value*HIGH);
Serial.begin(9600);
for (int s = 0; s < 100; s++) {
if (Serial) break; // wait for serial port to connect. Needed for native USB port only
delay(100);
}
Serial.println("MQTT_BUFFER_SIZE := "+String(MQTT_BUFFER_SIZE,DEC));
WiFi.begin(ssid, pass);
#ifdef _PAHO
client.begin(MQTT_SERVER, net); // Port is 1883 by default
Serial.println("Using PaHo-Library: <MQTTClient.h>");
#else
client.setServer(MQTT_SERVER, 1883);
client.setClient(net);
client.setCallback(callback);
Serial.println("Using PubSub-Library: <PubSubClient.h>");
#endif
reconnect();
}
void reconnect() {
Serial.print("Checking wifi...");
while (WiFi.status() != WL_CONNECTED) {
Serial.print(".");
delay(1000);
}
Serial.println("OK");
Serial.print("Connecting MQTT ...");
#ifdef _PAHO
// boolean connect(const char * clientId, const char* username, const char* password);
while (!client.connect("mqtt://things.ubidots.com", "PUT YOUR UBIDOT TOKEN HERE", "")) {
#else
// boolean connect(const char* id, const char* user, const char* pass);
while (!client.connect("mqtt://things.ubidots.com", "PUT YOUR UBIDOT TOKEN HERE", "")) {
#endif
Serial.print(".");
delay(1000);
}
Serial.println("connected!");
int valid;
// Generate Subscribe for "/v1.6/devices/MQTT_Test/test"
Serial.print("Subscribe " + String(dot[DOT_TEST].topic) + "...");
#ifdef _PAHO
valid = client.subscribe(dot[DOT_TEST].topic);
#else
valid = client.subscribe(dot[DOT_TEST].topic);
#endif
if (valid == 1) Serial.println(" OK");
else Serial.println(" ERROR [" + String(valid, DEC) + "]");
delay(1000);
// Generate Subscribe for "/v1.6/devices/MQTT_Test/state"
Serial.print("Subscribe " + String(dot[DOT_STATE].topic) + "...");
#ifdef _PAHO
valid = client.subscribe(dot[DOT_STATE].topic);
#else
valid = client.subscribe(dot[DOT_STATE].topic);
#endif
if (valid == 1) Serial.println(" OK");
else Serial.println(" ERROR [" + String(valid, DEC) + "]");
delay(1000);
// Generate Subscribe for "/v1.6/devices/MQTT_Test/led"
Serial.print("Subscribe " + String(dot[DOT_LED].topic) + "...");
#ifdef _PAHO
valid = client.subscribe(dot[DOT_LED].topic);
#else
valid = client.subscribe(dot[DOT_LED].topic);
#endif
if (valid == 1) Serial.println(" OK");
else Serial.println(" ERROR [" + String(valid, DEC) + "]");
delay(1000);
}
int lines = 0;
float value;
unsigned long lastMillis = 0;
unsigned long lastMillis_Test = 0;
unsigned long lastMillis_State = 0;
unsigned long Interval_Test = 10000;
unsigned long Interval_State = 1000;
void loop() {
char *pch;
client.loop();
if (!client.connected()) {
reconnect();
}
// MQTT publish a message roughly every 10 seconds
if (millis() - lastMillis_Test > Interval_Test) {
lastMillis_Test = millis();
pch=dot[DOT_TEST].buffer;
strcpy(pch, "{\"");
strcat(pch, dot[DOT_TEST].label);
strcat(pch, "\": ");
itoa(lastMillis_Test,&pch[strlen(pch)], 10);
strcat(pch, "}");
Serial.println(String(lines++, DEC) + " [" + String(lastMillis_Test, DEC) + "] : >" + LABEL_DATA_SOURCE + "<\t>>" + String(dot[DOT_TEST].buffer) + "<<" );
client.publish(LABEL_DATA_SOURCE, dot[DOT_TEST].buffer);
}
// MQTT publish a message roughly every 1 seconds
if (millis() - lastMillis_State > Interval_State) {
lastMillis_State = millis();
dot[DOT_STATE].value += 1.0;
if (dot[DOT_STATE].value > 1000.0) dot[DOT_STATE].value = 0.0;
pch=dot[DOT_STATE].buffer;
strcpy(pch, "{\"");
strcat(pch, dot[DOT_STATE].label);
strcat(pch, "\": ");
dtostrf(dot[DOT_STATE].value, 8, 5, &pch[strlen(pch)]);
strcat(pch, "}");
Serial.println(String(lines++, DEC) + " [" + String(lastMillis_State, DEC) + "] : >" + LABEL_DATA_SOURCE + "<\t>>" + String(dot[DOT_STATE].buffer) + "<<" );
#ifdef _PAHO
_msg.topic = LABEL_DATA_SOURCE;
_msg.retained = true;
_msg.payload = dot[DOT_STATE].buffer;
_msg.length = strlen(dot[DOT_STATE].buffer);
client.publish(&_msg);
#else
// boolean publish(const char* topic, const char* payload, boolean retained);
client.publish(LABEL_DATA_SOURCE, dot[DOT_STATE].buffer, true);
#endif
}
// PrintDot(0);
// PrintDot(1);
// PrintDot(2);
}
#ifdef _PAHO
// This is the default function of Paho-MQTT Client for callbacks
void messageReceived(String topic, String payload, char * bytes, unsigned int length) {
Serial.print("incoming: ");
Serial.print(topic);
Serial.print(" [");
Serial.print(String(length, DEC));
Serial.print("] - ");
Serial.print(payload);
Serial.print(" - ");
Serial.print(bytes);
Serial.println();
topic.toCharArray(_PahoTopic, UbidotsTopicSize);
parseMessage(_PahoTopic, bytes, length);
}
#else
// This is the PubSubClient-Callback
void callback(char* topic, byte* bytes, unsigned int length) {
Serial.print("incoming: ");
Serial.print(topic);
Serial.print(" [");
Serial.print(String(length, DEC));
Serial.print("] - ");
Serial.print((char*)bytes);
Serial.println();
parseMessage(topic, (char*)bytes, length);
}
#endif
bool parseMessage(char * topic, char * bytes, unsigned int length) {
// Please note, that I prefer to NOT use String-type here due to potential heap fragmentation
// Compromises were made in setup() resp. loop() to increase readability, but not here
// Ref for background: https://hackingmajenkoblog.wordpress.com/2016/02/04/the-evils-of-arduino-strings/
// Thanks to Majenko for his nice article
//
int index=-1;
int context_length;
char *pch;
char *rest;
char *end;
bool valid=true;
pch = strrchr(topic,'/');
if (pch==NULL){
Serial.print("ERROR: Topic invalid: >");
Serial.print(topic);
Serial.println("<");
goto CLEANUP_ON_ERROR;
}
strcpy(topic, pch+1);
Serial.print("Topic: >");
Serial.print(topic);
Serial.print("<\t");
for (int i=0;i<DOTARRAY_SIZE;){
if (strcasecmp(dot[i].label, topic)==0){
index=i;
break;
}
i++;
}
if (index<0){
Serial.println("ERROR: Could NOT associate topic");
goto CLEANUP_ON_ERROR;
}
Serial.print(" Index: ");
Serial.print(index);
Serial.print("\t");
// Parse payload
Serial.print("Payload: >");
Serial.print(bytes);
Serial.print("<\t");
end = bytes+length;
pch = strstr(bytes, "value\":");
if (pch+8>end) goto CLEANUP_ON_ERROR;
value = strtof (pch+8,&rest);
if (*rest!=',' && *rest!='}'){
value=0.0;
valid=false;
}
dot[index].value = value;
Serial.print("value: >");
Serial.print(value);
Serial.print("<\t");
// Parse for count
pch = strstr(bytes,"\"count\":");
if (pch+8>end || pch==NULL) { // keyword "count" NOT found or truncated contend
dot[index].count = 0L;
} else {
dot[index].count = strtoul(pch+8,&rest,10);
if (*rest!=',' && *rest!='}') valid=false; // delimiter not correct invalidated reading
}
// Parse for timestamp
pch = strstr(bytes,"\"timestamp\":");
if (pch+12>end || pch==NULL){ // No keyword "timestamp" found
dot[index].timestamp = 0L;
} else {
dot[index].timestamp = (ulong) (strtoll(pch+12,&rest,10)/1000); // Timestamp in seconds NOT milliseconds, sorry
if (*rest!=',' && *rest!='}') valid=false;
}
// Parse for context
pch = strstr(bytes,"\"context\":");
if (pch+11>end || pch==NULL){ // No keyword "context" found
goto ContextSkipped;
}
rest = strstr(pch,",");
context_length = rest-(pch+11);
if (context_length > 0) {
if (strncmp(pch+11,"{}",2)==0){
goto ContextSkipped;
}
char* tmp = (char *)realloc(dot->context,sizeof(char)*(context_length+1)); // Try to re-use the memory ...
if (tmp==NULL) {
Serial.print(F("ERROR: No memory allocated for dot->context. "));
Serial.print(context_length+1);
Serial.println(F(" bytes required"));
valid=false;
goto ContextSkipped;
}
dot[index].context=tmp;
strncpy(dot[index].context, pch+11, context_length);
dot[index].context[context_length]='\0';
}
ContextSkipped:
// Parse for id
pch = strstr(bytes,"\"id\":");
if (pch+6>end || pch==NULL) goto IdSkipped;
pch = strstr(pch+6, "\"")+1; // Just after the left-\"
rest = strstr(pch,"\""); // Just before the right-\"
context_length = rest-pch;
if (context_length == 24) { // There have to be exactly 24 chars
strncpy(dot[index].id, pch, context_length);
dot[index].id[context_length]='\0';
}
IdSkipped:
dot[index].valid = valid;
switch (index){
case DOT_LED:
digitalWrite(LED_BUILTIN, dot[DOT_LED].value*HIGH);
break;
default:
break;
}
Serial.println(" ... OK");
return true;
CLEANUP_ON_ERROR:
Serial.println("ERROR: Skipped parsing message");
return false;
}
with the Ubidots dashboard looking something like this: