Developer Blog hand with wrench

Google Cloud Platform and Sensor Data: a first look

Chris Crawford

Aug 11, 2017
Tow boat pushing Maersk container ship

Introduction

IoT data usage by 2018 is expected to reach 400 Zettabytes — 10^21– by 2018. Companies looking to leverage IoT data analytics will need to rethink their data center infrastructures and strategies. We explore the implications of this fact and how leveraging Google Cloud Platform we can create data pipelines that allow for data compression in each stage.

  1. Define the data source.
  2. Extract & Transform the streaming data.
  3. Do some analytics on the data.

You can find the code for this post is on github.

Data source

For the data source we mock an industrial tank with sensors reporting:

  • Pipe Temperature
  • Tank Pressure
  • Point Level
Industrial Tanks

A device model is create on relayr developer dashboard that takes into consideration the above readings.

We now deploy a simple Node.js application that polls the sensor readings and publishes the data to Cloud Pub/Sub topic.

This example assumes that the Pub/Sub topic exists and that for this streaming dataset it is projects/relayr-pubsub/topics/sensordata.

Extract & transform

The following high-level diagram outlines the dataflow pipeline that will process the data stream that is available as a Cloud Pub/Sub topic and then write it to a BigQuery output for additional analytics.

This example is based off of the StreamingWordExtract which leverages the common classes for setup and tear down of GCP resources.

Data Pipeline Overview

Pipeline code

StreamingDataExtractOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(StreamingDataExtractOptions.class);
    options.setStreaming(true);
    options.setBigQuerySchema(FormatTelemetryFn.getSchema());
    ExampleUtils exampleUtils = new ExampleUtils(options);
    exampleUtils.setup();

    Pipeline pipeline = Pipeline.create(options);
    TableReference tableRef = new TableReference();
    tableRef.setProjectId(options.getProject());
    tableRef.setDatasetId(options.getBigQueryDataset());
    tableRef.setTableId(options.getBigQueryTable());

    PCollection<String> p = 
      pipeline.apply(
        "ReadPubSub", 
        PubsubIO.readStrings().fromSubscription(options.getPubsubSubscription()
      ));

We create the pipeline and a data PCollection by reading from PubSub. The existing PubSub topic and subscription are passed in as application arguments. It will read data from an existing PubSub that streams our devices current readings.

Since we are dealing with streaming data it is necessary to break it up into chunks so that it can be processed individually. Here our main input is windowed using fixed-time windows of one minute.

PCollection<String> p_windowed = 
  p.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

Next we will transform our collection of sensor data into a collection of Telemetry objects.

 PCollection<Telemetry> t = p_windowed.apply(ParDo.of(new ExtractDataFn()));

Next we will transform our collection of Telemetry objects into a collection of TableRows that will be used to write the results to BigQuery.

PCollection<TableRow> results = t.apply(ParDo.of(new FormatTelemetryFn()));
results.apply(BigQueryIO
       .writeTableRows()
       .to(tableRef)
       .withSchema(FormatTelemetryFn
       .getSchema()));

Raw data analytics

In less than an hour we have written 9519 records.

Sensor Data

Using the following query we can easily run analytics over the captured data. Here we are aggregating the sensor data.

SELECT DISTINCT 
  measure,
  MAX(value) OVER (PARTITION BY measure) as xMax,
  MIN(value) OVER (PARTITION BY measure) as xMin,
  AVG(value) OVER (PARTITION BY measure) as xAvg,
  COUNT(value) OVER (PARTITION BY measure) as xCount
FROM `relayr-pubsub.relayr.sensor_data`
Measure xMax xMin xAvg xCount
pointLevel 74 70 72.027 3173
pipeTemperature 69 65 66.959 3173
tankPressure 1499 1450 1474.357 3173
Big Query Sensor Data

Further refinements

We can further enhance our Pipeline by reducing (compressing) the collected data and writing the aggregated results to BigQuery.

Data Pipeline Aggregation

Windowing – window assignment

PCollection<String> p = pipeline.apply("ReadPubSub", 
  PubsubIO.readStrings().fromSubscription(options.getPubsubSubscription()));
PCollection<String> p_windowed = 
  p.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
    
PCollection<Telemetry> t = p_windowed.apply(ParDo.of(new ExtractDataFn()));

Extract & transform

PCollection<KV<String, Integer>> mv = t.apply(ParDo.of(new ExtractMeasureValueFn()));
PCollection<KV<String, Integer>> maxPerKey = mv.apply(Max.<String>integersPerKey());
PCollection<KV<String, Integer>> minPerKey = mv.apply(Min.<String>integersPerKey());
PCollection<KV<String, Double>> meanPerKey = mv.apply(Mean.<String, Integer>perKey());
PCollection<KV<String, Long>> cntPerKey = mv.apply(Count.<String, Integer>perKey());

Grouping collections – windowing applied

PCollection<KV<String, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, maxPerKey)
                         .and(tag2, minPerKey)
                         .and(tag3, meanPerKey)
                         .and(tag4, cntPerKey)
                         .apply(CoGroupByKey.<String>create());

// Create a collection reflecting the aggregrated results. 
PCollection<AggTelemetry> aggResults = 
  coGbkResultCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, 
                                       AggTelemetry>() { 
     @ProcessElement 
     public void processElement(ProcessContext c) {
     AggTelemetry at = new AggTelemetry(
         c.element().getKey(),
         c.element().getValue().getOnly(tag1), 
         c.element().getValue().getOnly(tag2), 
         c.element().getValue().getOnly(tag3).
         floatValue(),
         c.element().getValue().getOnly(tag4), 
         c.timestamp().toString());
      c.output(at); 
     } 
})); 
 
PCollection<TableRow> ar = aggResults.apply(ParDo.of(new FormatAggTelemetryFn()));
  ar.apply(
    BigQueryIO
      .writeTableRows()
      .to(aggTableRef)
      .withSchema(FormatAggTelemetryFn
      .getSchema())); 

Revisiting analytics

Over the same time period, using further refinement, we have written only 153 records versus the 9519 written without data compression.

Sensor Aggregated Data

As before, using a similar query, we can see that by adding further aggregating transformations in our pipeline we can still get surprisingly similar results with fewer records.

SELECT DISTINCT
  measure,
  MAX(max) OVER (PARTITION BY measure) as xMax,
  MIN(min) OVER (PARTITION BY measure) as xMin,
  AVG(avg) OVER (PARTITION BY measure) as xAvg,
  SUM(cnt) OVER (PARTITION BY measure) as xCount
FROM `relayr-pubsub.relayr.agg_sensor_data`
Measure xMax xMin xAvg xCount
(Agg) pointLevel 74 70 72.022 3021
pointLevel 74 70 72.027 3173
(Agg) pipeTemperature 69 65 66.959 3021
pipeTemperature 69 65 66.959 3173
(Agg) tankPressure 1499 1450 1474.334 3021
tankPressure 1499 1450 1474.357 3173
Big Query Aggregated Sensor Data

Conclusion

While we are able to reduce our overall static data consumption we are still sending and processing that data on the cloud which can result in increased costs.

A good IoT strategy will account for co-locating edge devices that have the capabilities to reduce the data locally before it even gets transmitted to the cloud.

Furthermore, a solid solution will have real-time (edge) and batch (cloud) analytic capabilities to meet a wide variety of needs from security to predictive maintenance.

relayr · Creative Commons Attribution License