6. More Examples

This section contains more examples of ThingFlow in action. The code for these examples may be found in the examples subdirectory of the thingflow-python repository.

Solar Water Heater

In this scenario, we have a solar water heater that provides hot water for a residence. There is a water temperature sensor on the output pipe of the heater. There is also an actuator which controls a bypass valve: if the actuator is ON, the hot water is redirected to a spa, instead of going to the house. The spa is acting as a heat sink, taking up the extra heat, so that the water in the house never gets too hot.

We will implement a state machine which looks at the data from the temperature sensor and turns on the bypass valve when the heated water is too hot. To avoid oscillations, we use the following logic:

  1. If the running average of the temperature exceeds T_high, turn on the bypass
  2. When the running average dips below T_low (where T_low<T_high), then turn off the bypass.

Here is a diagram of the ThingFlow flow which implements this application:


We see that the water sensor’s output is run through a low pass filter to reduce noise in the reading. It is then passed to a dispatcher 1[#]_ which sends each event to one of several output ports, depending on how it compares to T_low and T_high. The control state machine determines when to turn on the actuator, based on the current state and the port of the input event.

Here is the ThingFlow code connecting everything together:

T_high = 110 # Upper threshold (degrees fahrenheit)
T_low = 90 # Lower threshold
sensor = TempSensor(gpio_port=1)

# The dispatcher converts a sensor reading into
# threshold events
dispatcher = sensor.transduce(RunningAvg(5)) \
                   .dispatch([(lambda v: v[2]>=T_high, 't_high'),
                              (lambda v: v[2]<=T_low, 't_low')])
controller = Controller()
dispatcher.connect(controller, port_mapping=('t_high','t_high'))
dispatcher.connect(controller, port_mapping=('t_low', 't_low'))
dispatcher.connect(controller, port_mapping=('default', 'between'))

actuator = Actuator()

Here is a the state machine to be implemented by the Controller class:


We start in the state Initial. If the first incoming event is T_high, we go to the Normal state. Othewise, we immediately go to Too Hot, which will turn off the actuator. After the initial event, we move from Normal to Too Hot if we receive a T_high event and then back to Normal when we receive T_low. Here is the code which implements this state machine as a ThingFlow filter:

class Controller(OutputThing):
    def __init__(self):
        self.state = INITIAL
        self.completed = False

    def _make_event(self, val):
        return SensorEvent(ts=time.time(), sensor_id='Controller', val=val)

    def on_t_high_next(self, event):
        if self.state==NORMAL or self.state==INITIAL:
            self.state = TOO_HOT
    def on_t_high_completed(self):
        if not self.completed:
            self.completed = True
    def on_t_high_error(self, e):

    def on_t_low_next(self, event):
        if self.state==TOO_HOT or self.state==INITIAL:
            self.state = NORMAL
    def on_t_low_completed(self):
        if not self.completed:
            self.completed = True
    def on_t_low_error(self, e):

    def on_between_next(self, x):
        if self.state==INITIAL:
            self.state = NORMAL
            pass # stay in current state
    def on_between_error(self, e):
    def on_between_completed(self):
        # don't want to pass this forward,
        # as it will happen after the first item

As you can see, we have on_next, on_completed, and on_error methods for each of the three input ports. A nice property of this design is that the state machine logic is isolated to a single class and does not ever deal with actual sensor readings. This makes it easy to test to test the controller logic independent of the physical sensor and actuator.

The full code for this example may be found at examples/solar_heater_scenario.py.


GE Predix Adapters

GE Digital’s Predix platform is a public cloud service optimized for building IoT data analyses and applications. The Time Series Service supports the storage and retrieval of cloud sensor event data. ThingFlow events map very naturally to this service, and adapters are provided in the thingflow.adapters.predix module. This allows us to build ThingFlow applications that run “on the edge” and upload their event data to Predix for analysis.

Initial Configuration

Before using the Predix adapters, you will need to configure on Predix a UAA (User Authentication and Authorization) service and a Timeseries service. You will also need to install some client side CLI appliations to query and update Predix configurations. Instructions and hints on this may be found in a separate Github repository: https://github.com/jfischer/ge-predix-python-timeseries-example.


The PredixWriter class is an InputThing which accepts ThingFlow events and sends them to the Predix Time Series Service via a websocket API. Here is some example code which instantiates a writer, connects it to a sensor and runs the resulting flow:

# The next three values are specific to your install
TOKEN = ...
# The data comes from a light sensor
sensor1 = SensorAsOutputThing(LuxSensor('test-sensor1'))
writer = PredixWriter(INGEST_URL, PREDIX_ZONE_ID, TOKEN,
scheduler = Scheduler(asyncio.get_event_loop())
scheduler.schedule_periodic(sensor1, 0.5)

The INGEST_URL, PREDIX_ZONE_ID, and TOKEN parameters are described in the example repository’s README file. The batch_size parameter indicates how many events to buffer in memory before sending them up within a single message.

By default, PredixWriter expects to receive instances of SensorEvent, which each include a Unix timestamp (in seconds as is the Python convention), a sensor id, and a value. If you want to sent data using a different representation in ThingFlow, you can subclass from EventExtractor, which defines how to obtain the Predix event fields from an incoming ThingFlow event. This extractor is then passed as an extractor keyword parameter to the PredixWriter constructor. Note that Predix timestamps are expressed in milliseconds rather than seconds.


The PredixReader class queries the Predix Time Series service for events from a specified sensor over a specified time range. These are then mapped to events in the ThingFlow world and passed to any connected things. Here is a short example which instantiates a PredixReader to query for events in the last minute, connects it print, schedules the reader to be run once every 60 seconds, and starts the flow.

# The next three values are specific to your install
TOKEN = ...
reader = PredixReader(QUERY_URL, PREDIX_ZONE_ID, TOKEN,
                      'test-sensor1', start_time=time.time()-3600,
scheduler.schedule_periodic(reader, 60)

As with the writer, you must pass in a Query URL, Predix Zone Id, and bearer token as determined through your Predix configuration. The start_time parameter is the starting timestamp for the query. By specifying one_shot as False, we are requesting that the reader be run until the process is killed. If one_shot was set to True, the reader will close its event stream after one query.

If you want the reader to emit a different type of event, pass in a value for the build_event_fn keyword parameter of the PredixReader constructor. The function should take as arguments the sensor id, the predix timestamp (in milliseconds), the sensor value, and a quality code (which is dropped in the default implementation). The function should return a corresponding event for use in ThingFlow.

A complete version of this example may be found at examples/predix_example.py. This script can be executed from the command line.