Ubidots Community

[SOLUTION] Aggregation method for filtered variables by variable label and time range

Hello Ubidots Community!

This week we want to share the following UbiFunction which aggregates data in a fixed time range from Variables with a specific Variable label from all of your devices

To achieve this, the UbiFunction:

  1. Determines the timestamps of the specified time range.
  2. Retrieves the Variable ID (and the information of the Device it belongs to) from all variables that match the specified variable label.
  3. Aggregates the data with the specified method. One value per variable.
  4. Creates a variable per device to post the result with the aggregated data.

Please see and try the code below and feel free to leave us your comments and questions on this forum about it.

NOTE: You‘ll need to replace de following constants with your information (the specific data of your account):

  • TOKEN : Temporary and revocable keys to be used in the API requests.
  • TIME_WINDOW : The time range to filter the data (in hours, 24h format)
  • SPEC_LABEL : The specific variable label to aggregate the data from.
  • FILTER : The filter type you want to implement (check this link to see all the filter options)
  • AGG_METHOD : The aggregation method you want to implement
  • AGG_VAR_LABEL : The variable label you want to use for the aggregated value
#------------- REPLACE CONSTS WITH YOUR INFO --------
TOKEN = "YOUR-UBIDOTS-TOKEN"
TIME_WINDOW = ["INITIAL_TIME, FINAL_TIME"] # Example: [15, 22] (24h format)
SPEC_LABEL = "YOUR-SPECIFIC-VARIABLE-LABEL-TO-FILTER"
FILTER = "iexact" # exact (=), iexact, contains, icontains, startswith, istartswith, endswith, iendswith, in, isnull
AGG_METHOD = "AGGREGATION-METHOD-TO-APPLY" # mean, min, max, count, sum, 
AGG_VAR_LABEL = "AGGREGATED-VARIABLE-LABEL"
#-----------------------------------------------------
#------------- IMPORT LIBRARIES ----------------------
import requests
import time
from datetime import datetime as dt
import pytz
import math
import json
#----------------------------------------------------

#------------- REPLACE CONSTS WITH YOUR INFO --------
TOKEN = "YOUR-UBIDOTS-TOKEN"
TIME_WINDOW = [INITIAL_TIME, FINAL_TIME] # Example: [15, 22] (24h format)
SPEC_LABEL = "YOUR-SPECIFIC-VARIABLE-LABEL-TO-FILTER"
FILTER = "iexact" # exact (=), iexact, contains, icontains, startswith, istartswith, endswith, iendswith, in, isnull
AGG_METHOD = "AGGREGATION-METHOD-TO-APPLY" # mean, min, max, count, sum, 
AGG_VAR_LABEL = "AGGREGATED-VARIABLE-LABEL"
#-----------------------------------------------------

BASE_URL = "https://industrial.api.ubidots.com"
now = dt.now(tz=pytz.UTC)

#------------- MAIN FUNCTION ------------------------  
def main(args):
    
    if not TIME_WINDOW[0] <= now.hour <= (TIME_WINDOW[1]):
        return {"status": "Not within the range hours"}
    
    
    spec_variables = get_specific_variables(BASE_URL, TOKEN, SPEC_LABEL, FILTER)
    timestamps = calculate_request_times(TIME_WINDOW)
    
    agg_payload = {
        "variables": [var["id"] for var in spec_variables["results"]],
        "aggregation": AGG_METHOD,
        "join_dataframes": "false",
        "start": timestamps[0],
        "end": timestamps[1]
    }
    
    agg_values = get_aggregated_values(TOKEN, agg_payload)
    payload = build_post_payload(agg_values, spec_variables, AGG_VAR_LABEL)  
    final_res = post_agg_values(BASE_URL, payload, TOKEN)

    return {"responses": final_res}
#----------------------------------------------------  

#---- GET THE VARIABLES WITH THE SPECIFIC LABEL ------
def get_specific_variables(url, token, spec_label, filt):
    url = f'{url}/api/v2.0/variables/?fields=device,id&label__{filt}={spec_label}&page_size=400'
    print(url)
    headers = {"X-Auth-Token": token}
    response = create_request(url, headers, 5, "get")
    spec_variables = response.json()
    print("Spec_variables:", spec_variables)
    return spec_variables
#---------------------------------------------------- 

#--------------- CALCULATE TIMESTAMPS----------------
def calculate_request_times(time_window):
    init = now.replace(hour=time_window[0], minute=0, second=0, microsecond=0)
    final = now.replace(hour=time_window[1], minute=0, second=0, microsecond=0)
    init_ts = int(init.timestamp()*1000)
    final_ts = int(final.timestamp()*1000)
    return [init_ts, final_ts]
#---------------------------------------------------- 

#--- GET AGREGGATED VALUES WITHIN SPECIFIC RANGE-----
def get_aggregated_values(token, payload):
    url = f"{BASE_URL}/api/v1.6/data/stats/aggregation/?page_size=400"
    headers = {"X-Auth-Token":token,"Content-Type":"application/json"}
    response = create_request(url, headers, 5, "post", data=payload)
    return response
#---------------------------------------------------- 

#---------------- BUILD POST PAYLOAD ----------------
def build_post_payload(agg_values, spec_variables, agg_var_label):
    agg_values_json = agg_values.json()["results"]
    payload = []
    post_dt = now.replace(hour=0, minute=0, second=0, microsecond=0)
    post_ts = int(post_dt.timestamp()*1000)
    for value, variable in zip(agg_values_json, spec_variables["results"]):
        if value["value"] is None:
            continue
        new_element = {
            "device": variable["device"]["label"],
            "timestamp": post_ts,
            "values": [{
                agg_var_label: value["value"]
            }]
        }
        payload.append(new_element)
    return payload
#----------------------------------------------------  

#-------- POST AGGREGATED VALUES TO UBIDOTS ---------
def post_agg_values(url, payload, token): 
    def chunks(lst, n):
        return [lst[i::n] for i in range(n)]

    url = f"{url}/api/v1.6/devices/_/bulk/values/"
    headers = {
        "X-Auth-Token": token,
        "X-Bulk-Operation": "True",
        "Content-Type": "application/json"
    }

    payload_length = len(json.dumps(payload))
    n = math.ceil(payload_length/10000)
    payload_chunks = chunks(payload, n)
    responses = []
    for payload_chunk in payload_chunks:
        response = create_request(url, headers, 5, "post", data=payload_chunk)
        responses.append(response.status_code)
        time.sleep(0.3)
    return responses
#----------------------------------------------------

#-------- CREATE A REQUEST TO THE SERVER ------------
def create_request(url, headers, attempts, request_type, data=None):
    request_func = getattr(requests, request_type)
    kwargs = {"url": url, "headers": headers}
    if request_type == "post" or request_type == "patch":
        kwargs["json"] = data
    try:
        req = request_func(**kwargs)
        status_code = req.status_code
        time.sleep(1)
        while status_code >= 400 and attempts < 5:
            req = request_func(**kwargs)
            status_code = req.status_code
            attempts += 1
            time.sleep(1)
        return req
    except Exception as e:
        print("[ERROR] There was an error with the request, details:")
        print(e)
        return None
#----------------------------------------------------