/
Google Cloud Platform (GCP) Application Example

Google Cloud Platform (GCP) Application Example

The SmartServer IoT IAP Input and Output nodes can be used with standard Node-RED nodes for cloud IoT connectivity. The following sections describe the SmartServer IoT and GCP application solution, which eliminates the need for extensive programming skills and many hard-coded lines of code.

Architecture Overview

The figure shown below depicts the SmartServer IoT and GCP application architecture.

An overview of the SmartServer IoT system architecture is shown below.

On the SmartServer CMS Sequencing widget, this solution can appear similar to the examples shown below. The sections that follow provide the steps needed to create the flows in these examples.


See the Integrating SmartServer IoT and GCP Nodes section

or


See the Scaling for Large Datapoint Counts section

To get started, continue with Using SmartServer IoT and GCP Nodes

Using SmartServer IoT and GCP Nodes

IAP Input and Output nodes are shipped with the SmartServer IoT and are available on the Sequencing widget. For more information about SmartServer IAP Input and Output nodes, see Creating Sequence of Operations.

GCP nodes need to be installed and added to the SmartServer IoT Sequencing widget. For information on how to install GCP nodes, see the Installing GCP Nodes section. 

Installing GCP Nodes

To install GCP nodes on the Sequencing widget, perform the following steps:

  1. Open the SmartServer IoT Sequencing widget (you may need to enable it first on the CMS dashboard).



  2. Click the Node-RED button ().



  3. Select the Manage Palette option. 



    The Nodes and Install tabs appear.



  4. Click Install.



  5. Enter node-red-contrib-google-cloud in the search field.



  6. Click Install for the Google Cloud Platform Node-RED nodes.



  7. Click Install at the confirmation prompt (click the Open node information to review node dependency information).



    Allow a few minutes for the installation to complete.

  8. Once the node installation is finished, click Close.



You can start using the following GCP nodes that appear at the bottom of the node list on the Sequencing widget. The pubsub node is used in the procedure that follows in the Integrating SmartServer IoT and GCP Nodes section. 


Integrating SmartServer IoT and GCP Nodes

To integrate SmartServer IoT and GCP nodes for cloud IoT connectivity, perform the steps defined in the example application that follows.

  1. Create a GCP account.

  2. Create a Pub/Sub Topic (projects/white-dynamo-303610/topics/iap in this example).



  3. Create a service account allowing the SmartServer IoT to send data.



  4. Add a key for the service account.



  5. Set permissions for your service account in the Pub/Sub Topic.



  6. Create a subscription for the Pub/Sub.



  7. On the SmartServer CMS Sequencing widget, add the GCP pubsub node to a flow. (See also SmartServer IoT Node-RED Tutorial for more information about how to work with flows.)



  8. On the GCP, create and download a private key file for the service account.



  9. On the SmartServer CMS Sequencing widget, double-click the pubsub node. 

    The Edit pubsub node view appears.



  10. Enter projects/white-dynamo-303610/topics/iap in the Topic field.



  11. Click the Credentials Edit button ().



    The Edit google-cloud-credentials node view appears.



  12. Copy the entire downloaded JSON service account credentials (as shown in the example below) to the Key field on the Edit google-cloud-credentials node view.





  13. Click Update.

    The Edit pubsub node view appears.



  14. Click Done to return to the Sequencing widget.

  15. Set the GCP environment variable in etc/systemd/system/node-red.service.d/override.conf by performing the following steps:

     

    1. From an SSH or console connection, create an override.conf file using the following command:

      sudo systemctl edit node-red

    2. Environment="GOOGLE_CLOUD_PROJECT=<your project id>"

      <your project id> can be found in Step 12, "project_id" line.
    3. Save the file as override.conf (not with the suggested temporary file name).

  16. On the SmartServer CMS Sequencing widget, add IAP Input, function, and debug nodes to the flow.



  17. Double-click the IAP Input nodes on the Sequencing widget and configure them as needed (shown in the example below).


    Define the IAP Input node name and properties

    Note: The monitoring interval set in Datapoint Properties widget overrides Polling Interval.



    Set filters as needed


    Select the datapoint

  18. Click Done on the Edit iap-input node view to save your changes. 

  19. Modify the function node code as needed. 

    The incoming ev/data topic has a "topic" key value with additional information that can uniquely identify the source of a given update, for example:

    "topic":"glp/0/17qampp/fb/dev/lon/11/if/LightSensor/0" that can be appended to the ev/data "message" key value in the example below.



    Scalar type example


    Structured type example

  20. Click Done on the Edit function node view to save your changes. 

  21. Connect IAP Input, function, GCP pubsub nodes as shown in the example below, and click Deploy on the Sequencing widget to deploy the flow.




  22. On the SmartServer CMS Datapoint Properties widget, for a given datapoint, enable Monitoring, adjust the Poll Interval, Publish Interval, Minimum Publish Interval, Expected Update Interval, and Publish Minimum Delta Value settings to control the datapoint transmission rate to the cloud as shown in the example below. 



  23. View messages for the subscription in the GCP client.

Scaling for Large Datapoint Counts

For system integrations where there is a large number of datapoints and it is not practical to have an IAP Input node for each datapoint, the standard Node-RED mqtt in node can be used with the SmartServer IoT’s glp/0/+/ev/data topic, which publishes updates for all monitored datapoints. Additionally, viewing datapoints in the SmartServer CMS Datapoints widget (called the Datapoint Browser widget for SmartServer 3.3 and prior releases), or triggering on-demand GET requests in custom web pages or custom applications, or using IAP input nodes is other flows, can also cause datapoint updates. 

To associate device names with device DID in the datapoint update, you can use a second mqtt in node with a glp/+/+/fb/dev/+/+/cfg topic. The mqtt in node appears on the SmartServer Sequencing widget with the network nodes:

To scale for large datapoint counts, two function nodes, with the code below, can be used to select the datapoints (MQTT DP Filter function node) and to reduce the number of duplicate value updates (DP Throttle function node) that are sent to the cloud. 

The MQTT DP Filter function node allows you to specify which datapoints are sent to the cloud by setting up a list of datapoint path names and defining a specific datapoint or using wildcards (e.g., "Tstat-01/AV/7/Room Temperature" or "~.PulseGen/*sensor/*/nvolux*"). The DP Throttle function node reduces the sending of duplicate updates to the cloud. 

For large datapoint count integrations, perform the following steps using these function nodes:

  1. Set the monitoring interval for all datapoints of interest using the Datapoint Properties widget.

  2. On the SmartServer CMS Sequencing widget, create the following flow:


    The debug nodes, appearing as the olive green nodes, are not required.

    1. Use the GCP pubsub node that is described in the Integrating SmartServer IoT and GCP Nodes section.
    2. Place an mqtt in node on the flow and configure it as shown below.
    3. Click the Server Edit button () to configure the Edit mqtt-broker node settings.



    4. Click Done on the Edit mqtt in node view.
    5. Add a second mqtt in node and change the topic to glp/+/+/fb/dev/+/+/cfg.



    6. Add a function node for the MQTT DP Filter. The MQTT DP Filter function node filters the datapoint updates for specified datapoints only.

      Use the code shown below for the On Start tab and On Message tab.

      In the On Start tab, specify the list of datapoints (datapointList) that you want to send to the cloud using the following format: 
      {deviceName}/{block XIF Name}/{blockIndex}/{datapoint XIF Name}

      or

      Specify the field name (fields with nested structures and field arrays are not supported):
      {deviceName}/{block XIF Name}/{blockIndex}/{datapoint XIF Name}/{fieldName}

      For each name in the path you can use the following filter criteria:

      "*" for any name
      "~{name}" contains "{name}"
      "{name}*" starts with "{name}"
      "*{name}" ends with "{name}"
      exact match on name
      MQTT DP Filter – On Start Tab
      // Code added here will be run once
      // whenever the node is started.
      // Block names and datapoints names are XIF names
      // Examples
      //    "Tstat-01/AV/7/Room Temperature",
      //    "Pulsegen*/AV/7/Room Temperature",
      //    "*/*/*/~Light",
      //    "*/*/*/nviLamp",
      //    "*/*/*/nvoLampFb/value",
      //    "~.PulseGen/*/*/~lux",
      //    "*/*/*/*Fb/state",   
      //    "*/*/*/nviD*"
      
      const datapointList = [
          "Tstat-01/AV/7/Room Temperature",
          "*/*/*/nviLightL*",
          "*/*/*/Occupied Heat Setpoint",
          "*/*/1/*iLamp",
          "*/*/*/nvoLampFb/value",
          "~.PulseGen/*/*/~lux",
          "*/*/*/nviD*",
          "*/*/*/nvoD*/state"
          ];
          
      var dpList = [];
      var pathsNames, pathNames1;
      var obj;
      var i;
      
      if(context.get("dpList") === undefined) {
          if(typeof datapointList !== undefined) {
              if(datapointList.length > 0) {
                  for(i=0; i < datapointList.length; i++)
                  {
                      obj = {};
                      if(datapointList[i] !== "") {
                          pathNames = datapointList[i].toLowerCase().split("/");
                          pathNames1 = [];
                      
                          for(j=0; j < pathNames.length; j++)
                          {
                              pathNames[j] = pathNames[j].trim();
                              obj1 = {};
                              obj1.name = pathNames[j];
                              obj1.type = 2; // 0= "*", "~{device name}", 2 = {device name}, 3=starts with "*", 4 = ends with "*"
                              if(j === 0)
                                  obj.needDeviceDid = true;
                              if(pathNames[j] === "*") {
                                  if(j === 0)
                                      obj.needDeviceDid = false;
                                  obj1.type = 0;
                              }
                              else {
                                  if(pathNames[j].charAt(0) === "~") {
                                      // "~{datapoint substr}"
                                      obj1.type = 1;
                                      obj1.name = pathNames[j].substr(1);
                                  }
                                  else if(pathNames[j].charAt(0) === "*") {
                                      if(pathNames[j].endsWith("*")) {
                                          // "*{datapoint substr}*" same as "~{datapoint substr}"
                                          obj1.type = 1;
                                          obj1.name = pathNames[j].substr(1, (pathNames[j].length - 2));
                                      }
                                      else {
                                          // "*{datapoint substr}"
                                          obj1.type = 3;
                                          obj1.name = pathNames[j].substr(1);
                                      }
                                      
                                  }
                                  else if(pathNames[j].endsWith("*")) {
                                          // "{datapoint substr}*"
                                          obj1.type = 4;
                                          obj1.name = pathNames[j].substr(0, (pathNames[j].length - 1));
                                  }
                                  
                              }
                              pathNames1.push(obj1);
                          }
                              
                          obj.path = datapointList[i];
                          obj.pathNames = [];
                          obj.origPathnames = pathNames;
                          obj.pathNames = pathNames1;
                          obj.deviceList = [];
                          dpList.push(obj);  
                          
                      }
                  }
                  context.set("dpList",dpList);
                  context.set("datapointList",datapointList);
      
              }
          }
      }


      MQTT DP Filter – On Message Tab
      // User variables
      //  - specify datapointList in "On Start" tab
      var bCacheInitialValues = true;
      var iCacheTimeout = 30; // seconds
      var bSendDeviceState = true; //false: payload=data, true: payload = {"data":{{DP value}},"deviceHealth":{{health}} }
      var iMaxCachedDps = 1000;
      
      
      //*************************************
      // Function code - don't touch
      var dpList = context.get('dpList');
      var deviceList = context.get('deviceList');
      var dpValueList = context.get('dpValueList'); //cached datapoint list
      var iInitialTimeMs = context.get('iInitialTimeMs');
      var i, j, k, m, d, count = 0, topic, topic1, message,  messagePath, data, obj, obj1, path, payload, health;
      var deviceName, did,blockName, blockName1, blockIndex, blockIndex1, datapoint, datapoint1, field;
      var bNotFound, bContinue;
      var pathNames;
      var bDeviceNameFound;
      var bDatapointUpdate = false;
      var bDeviceUpdate = false;
      
      if(iInitialTimeMs === undefined) {
          if(bCacheInitialValues)
              iInitialTimeMs = 0;
          else
              iInitialTimeMs = -1;
          context.set('iInitialTimeMs', iInitialTimeMs);
          dpValueList = [];
          context.set('dpValueList', dpValueList);
          deviceList = [];
          context.set('deviceList', deviceList);
          node.status({fill:"red",shape:"dot",text:"initialized"});
      }
      
      if(dpList !== undefined) {
          if(dpList === null)
              return;
          if(dpList.length > 0) {
              topic = msg.topic;
              
              if(topic.endsWith("/cfg")) {
                  payload = JSON.parse(msg.payload);
                  deviceName = payload.name;
                  bNotFound = true;
                  if(iInitialTimeMs === 0) {
                      d = new Date();
                      iInitialTimeMs = d.getTime() + (iCacheTimeout * 1000);
                      context.set('iInitialTimeMs', iInitialTimeMs);
                      node.status({fill:"yellow",shape:"dot",text:"Device cfg"})
                  }
                  for(i=0; i < deviceList.length; i++)
                  {
                      if(deviceList[i].name === deviceName) {
                          
                          bNotFound = false;
                          break;
                      }
                  }
              
                  if(bNotFound) {
                      obj = {};
                      obj.name = payload.name;
                      obj.name1 = payload.name.toLowerCase();
                      obj.topic = topic;
                      pathNames = topic.split("/");
                      obj.did = pathNames[6]; //DID
                      deviceList.push(obj);
                      context.set('deviceList',deviceList);
                  
                      // check 
                      for(i=0; i < dpList.length; i++)
                      {
                          bContinue = false;
                          // type: 0= "*", 1=">{device name", 2 = {device name}, 3=starts with "*", 4 = ends with "*"
                          if(dpList[i].pathNames[0].type === 1) {
                              if(obj.name1.indexOf(dpList[i].pathNames[0].name) !== -1) {
                                  bContinue = true;
                              }
                          }
                          else if(dpList[i].pathNames[0].type === 3) {
                              if(obj.name1.endsWith(dpList[i].pathNames[0].name)) 
                                  bContinue = true;
                          }
                          else if(dpList[i].pathNames[0].type === 4) {
                              if(obj.name1.startsWith(dpList[i].pathNames[0].name)) 
                                  bContinue = true;
                          }
                          else if(dpList[i].pathNames[0].type === 2) {
                              if(dpList[i].pathNames[0].name === obj.name1) 
                                  bContinue = true;
                          }
                      
                          if(bContinue) {
                              obj1 = {};
                              obj1.name = obj.name;
                              obj1.name1 = obj.name1;
                              obj1.did = obj.did;
                              dpList[i].deviceList.push(obj1);
                              context.set('dpList', dpList);
                              
                          }
                          if(iInitialTimeMs > 0) {
                              // cache
                              if(dpValueList.length > 0) {
                                  for(k=0; k <dpValueList.length; k++)
                                  {
                                      if(dpValueList[k].did === obj.did) {
                                          bDeviceUpdate = true;  //device used for at least one cached datapoint
                                          break;
                                      }
                                  }
                                  
                              }
                          }
                      }
                  }
              }
              else if(topic.endsWith("/ev/data")) {
                  bDatapointUpdate = true;
              }    
              if(bDatapointUpdate || bDeviceUpdate)   {
                  
                  
                  count = 1;
                  if(bDeviceUpdate)
                      count  = dpValueList.length;
                  
                  for(m=0; m < count; m++)
                  {
                      bContinue = true;
                      if(bDeviceUpdate) {
                          topic1 = dpValueList[m].topic;
                          payload = dpValueList[m].payload;
                          bContinue = dpValueList[m].bSend;
                      }
                      else {
                          payload = JSON.parse(msg.payload);
                          topic1 = payload.topic;
                      }
                      if(bContinue) { //(topic1.indexOf("/fb/dev/") !== -1) {
                          message = payload.message
                          data = payload.data
                          
                          pathNames = topic1.split("/");
                          if(pathNames.length === 10) {
                      
                              did = pathNames[6];
                              blockName = pathNames[8];
                              blockName1 = blockName.toLowerCase();
                              blockIndex = pathNames[9];
                              messagePath = message.split("/");
                              datapoint = messagePath[0];
                              datapoint1 = datapoint.toLowerCase();
                              field = "";
                              for(i=0; i < dpList.length; i++)
                              {
                                  bContinue = true;
                                  path = "";
                                  // device
                                  bDeviceNameFound = false;
                                  if(dpList[i].pathNames[0].name !== "*") {
                                      if(deviceList === undefined)
                                          deviceList = [];
              
                                      bContinue = false;
                                      for(j=0; j < dpList[i].deviceList.length; j++)
                                      {
                       
                                          if(dpList[i].deviceList[j].did === did) {
                                              bContinue = true;
                                              path = dpList[i].deviceList[j].name;
                                              bDeviceNameFound = true;
                                              break;
                                          }
                                      }
                                      if(iInitialTimeMs >= 0) {
                                          if(!bContinue) {
                                              // type: 0= "*", 1=">{device name", 2 = {device name}, 3=starts with "*", 4 = ends with "*"
                                              bContinue = true; // save to cache file
                                          }
                                      }
                          
                                  }
                                  if(bContinue) {
                                      
                                      // blockname
                                      if(dpList[i].pathNames[1].name !== "*") {
                                          bContinue = false;
                                          if(dpList[i].pathNames[1].type === 1) {
                                              if(blockName1.indexOf(dpList[i].pathNames[1].name) !== -1)
                                                  bContinue = true;
                                          }
                                          else if(dpList[i].pathNames[1].type === 3) {
                                              // "*{datapoint substr}"
                                              if(blockName1.endsWith(dpList[i].pathNames[1].name)) 
                                                  bContinue = true;
                                                  
                                          }
                                          else if(dpList[i].pathNames[1].type === 4) {
                                              // "{datapoint substr}*"
                                              if(blockName1.startsWith(dpList[i].pathNames[1].name)) 
                                                  bContinue = true;
                                          }
                                          else {
                                              if(dpList[i].pathNames[1].name === blockName1)
                                                  bContinue = true;
                                          }
                                      }
                                  }
                                  if(bContinue) {
                                      // blockIndex
                                      if(dpList[i].pathNames[2].name !== "*") {
                                          bContinue = false;
                                          if(dpList[i].pathNames[2].type === 1) {
                                              if(blockIndex.indexOf(dpList[i].pathNames[2].name) !== -1)
                                                  bContinue = true;
                                          }
                                          else if(dpList[i].pathNames[2].type === 3) {
                                              // "*{datapoint substr}"
                                              if(blockIndex.endsWith(dpList[i].pathNames[2].name)) 
                                                  bContinue = true;
                                          }
                                          else if(dpList[i].pathNames[2].type === 4) {
                                              // "{datapoint substr}*"
                                              if(blockIndex.startsWith(dpList[i].pathNames[2].name)) 
                                                  bContinue = true;
                                          }
                                          else {
                                              if(dpList[i].pathNames[2].name === blockIndex)
                                                  bContinue = true;
                                          }
                                      }
                                  }
                                  if(bContinue) {
                                      // datapoint
                                      if(dpList[i].pathNames[3].name !== "*") {
                                          bContinue = false;
                                          if(dpList[i].pathNames[3].type === 1) {
                                              // "~{datapoint substr}"
                                              
                                              if(datapoint1.indexOf(dpList[i].pathNames[3].name) !== -1) 
                                                  bContinue = true;
                                          }
                                          else if(dpList[i].pathNames[3].type === 3) {
                                              // "*{datapoint substr}"
                                              if(datapoint1.endsWith(dpList[i].pathNames[3].name)) 
                                                  bContinue = true;
                                          }
                                          else if(dpList[i].pathNames[3].type === 4) {
                                              // "{datapoint substr}*"
                                              if(datapoint1.startsWith(dpList[i].pathNames[3].name)) 
                                                  bContinue = true;
                                          }
                                          else {
                                              if(dpList[i].pathNames[3].name === datapoint1)
                                                  bContinue = true;
                                          }
                                      }
                                      if(bContinue) {
                                          // check for fields
                                          data = payload.data;
                                          health = payload.health;
                                          if(dpList[i].pathNames.length === 5) {
                                              // field exact match only - case sensitive
                                              if(data.hasOwnProperty(dpList[i].pathNames[4].name)) {
                                                  field = "/" + dpList[i].pathNames[4].name;
                                                  data = data[dpList[i].pathNames[4].name];
                                              }
                                              else 
                                                  bContinue = false;
                                          }
                                      }
                                  }
                                  if(bContinue) {
                                      // valid datapoint so send datapoint update
                                      if(dpList[i].pathNames[0].name === "*") {
                                          bContinue = false;
                                          if(deviceList === undefined)
                                              deviceList = [];
                                          for(j=0; j < deviceList.length; j++)
                                          {
                                              if(deviceList[j].did === did) {
                                                  bDeviceNameFound = true;
                                                  bContinue = true;
                                                  path = deviceList[j].name;
                                                  break;
                                              }
                                          }
                                      }
                                      if(bContinue && bDeviceNameFound) {
                                          path += "/" + blockName + "/" + blockIndex + "/" + datapoint + field;
                                          if(bDeviceUpdate) {
                                              dpValueList[m].bSend = false;
                                          }
                                      }
                                      else {
                                          bContinue = false;
                                          if(bDatapointUpdate) {
                                              if(iInitialTimeMs >= 0) {
                                                  // no device name but block/blockIndex/datapoint matches at least one
                                                  // check if datapoint already in 
                                                  node.status({fill:"blue",shape:"dot",text:"caching DP values"})
                                                   
                                                   // add to dpValueList or if already there update value/health
                                                  bNotFound = true; 
                                                  for(k=0; k <dpValueList.length; k++)
                                                  {
                                                      if(dpValueList[k].topic === topic1) {
                                                          dpValueList[k].payload = payload;
                                                          bNotFound = false;
                                                          context.set('dpValueList', dpValueList);
                                                          break;
                                                      }
                                                  }
                                                  if(bNotFound) {
                                                      if(dpValueList.length < iMaxCachedDps) {
                                                          obj = {};
                                                          obj.did = did; 
                                                          obj.topic = topic1;
                                                          obj.payload = payload;
                                                          obj.bSend = true;
                                                          dpValueList.push(obj);
                                                          context.set('dpValueList', dpValueList);
                                                      }
                                                  }
                                              }
                                          }
                                      }
                                      break;
                                  }
                              }
                              if(bContinue) {
                                  msg = {};
                                  msg.topic = path;
                                  if(typeof data === "object") {
                                      data = JSON.stringify(data);
                                  }
                                  if(bSendDeviceState) {
                                      obj = {};
                                      obj.data = data;
                                      obj.deviceHealth = health; // his may need to be customized for each site
                                      msg.payload = JSON.stringify(obj);
                                  }
                                  else
                                      msg.payload = data;
                                  node.send(msg);
                                  if(bDatapointUpdate)
                                      break;
                              }
                          }
                      }
                  } //for(m=0; m < count; m++)
              }
              if(iInitialTimeMs > 0) {
                  d = new Date();
                  if(d.getTime() > iInitialTimeMs) {
                      iInitialTimeMs = -1; //stop cacheing datapoints
                      context.set('iInitialTimeMs', iInitialTimeMs);
                      dpValueList = [];
                      context.set('dpValueList', dpValueList);
                      node.status({fill:"green",shape:"dot",text:"running"})
                  }
              }
          }
      }
      return;
    7. Add a function node for DP Throttle. The DP Throttle function node code provided below reduces the sending of duplicate updates to the cloud.

      All datapoint changes (value change or device status change) are always sent to the cloud. For example, if you view any of the datapoints in the SmartServer CMS Datapoints widget (called the Datapoint Browser widget for SmartServer 3.3 and prior releases) with an update of 5 seconds, the cloud will see all of the updates even though the data has not changed. 

      Using the DP Throttle function node, you can specify the timeInterval for duplicate updates (default 30 seconds). If the time between duplicate updates is in excess of the timeInterval, then the datapoint update will be sent to the cloud. If the time is less than the timeInterval, then the duplicate update will be ignored. 

      DP Throttle – On Message Tab
      // User variables
      var timeInterval = 30; // min time in seconds allowed for duplicate updates
                              // 0=allow all updates, -1=allow only changes
      
      //*************************************
      // Function code - don't touch
      var dpList = context.get('dpList');
      var i, j, topic, obj, payloadStr = "";
      var bNotFound = true, bContinue = false;
      var d, currentTimeMs;
      
      if(dpList === undefined)
          dpList = [];
      topic = msg.topic;
      payloadStr = msg.payload;
      d = new Date();
      currentTimeMs = d.getTime();
      for(i=0; i < dpList.length; i++)
      {
          if(dpList[i].topic === topic) {
              
              if((timeInterval === 0) || ((timeInterval !== -1) && (currentTimeMs > dpList[i].timeMs))) {
                  bContinue = true;
              }
              else {
                  if(payloadStr !== dpList[i].payload) {
                      bContinue = true;
                  }
              }
              if(bContinue) {
                  dpList[i].timeMs = currentTimeMs + (timeInterval * 1000);
                  dpList[i].payload = payloadStr;
                  context.set('dpList', dpList);
              }
              bNotFound = false;
              break;
          }
      }
      if(bNotFound) {
          obj = {};
          obj.topic = topic;
          obj.payload = payloadStr;
          obj.timeMs = currentTimeMs + (timeInterval * 1000);
          dpList.push(obj);
          context.set('dpList', dpList);
          bContinue = true;
          
      }
      if(bContinue) {
          return msg;
      }
    8. When you are finished making changes to the flow, click Deploy to save the flow.