9. ThingFlow-Python API Reference

This is the main package for antevents. Directly within this package you fill find the following module:

  • base - the core abstractions and classes of the system.

The rest of the functionality is in sub-packages:

  • adapters - components to read/write events outside the system
  • internal - some internal definitions
  • filters - filters that allow linq-style query pipelines over event streams
  • sensors - interfaces to sensors go here

thingflow.base

Base functionality for ThingFlow. All the core abstractions are defined here. Everything else is just subclassing or using these abstractions.

The key abstractions are:

  • Thing - a unit of computation in the data flow graph. Things can be
    Filters (with inputs and outputs) or Adapters (with only inputs or only outputs).
  • OutputThing - Base class and interface for things that emit event streams on
    output ports.
  • Sensor - an object that is (indirectly) connected to the physical world.
    It can provide its current value through a sample() method. Sensors can be turned into Things by wrapping them with the SensorAsInputThing class.
  • InputThing - interface for things that receive a stream of events on one or
    more input ports.
  • Filter - a thing that is both an InputThing and an OutputThing, with one
    input and one output. Filters transform data streams.
  • Scheduler - The scheduler wraps an event loop. It provides periodic and
    one-time scheduling of OutputThings that originate events.
  • event - ThingFlow largely does not care about the particulars of the
    events it processes. However, we define a generic SensorEvent datatype that can be used when the details of the event matter to a thing.

See the README.rst file for more details.

class thingflow.base.BlockingInputThing(scheduler, ports=None)

This implements a InputThing which may potential block when sending an event outside the system. The InputThing is run on a separate thread. We create proxy methods for each port that can be called directly - these methods just queue up the call to run in the worker thread.

The actual implementation of the InputThing goes in the _on_next, _on_completed, and _on_error methods. Note that we don’t dispatch to separate methods for each port. This is because the port is likely to end up as just a message field rather than as a separate destination in the lower layers.

request_stop()

This can be called to stop the thread before it is automatically stopped when all ports are closed. The close() method will be called and the InputThing cannot be restarted later.

class thingflow.base.CallableAsInputThing(on_next=None, on_error=None, on_completed=None, port=None)

Wrap any callable with the InputThing interface. We only pass it the on_next() calls. on_error and on_completed can be passed in or default to noops.

class thingflow.base.DirectOutputThingMixin

This is the interface for OutputThings that should be directly scheduled by the scheduler (e.g. through schedule_recurring(), schedule_periodic(), or schedule_periodic_on_separate_thread).

class thingflow.base.EventLoopOutputThingMixin

OutputThing that gets messages from an event loop, either the same loop as the scheduler or a separate one.

exception thingflow.base.ExcInDispatch

Dispatching an event should not raise an error, other than a fatal error.

exception thingflow.base.FatalError

This is the base class for exceptions that should terminate the event loop. This should be for out-of-bound errors, not for normal errors in the data stream. Examples of out-of-bound errors include an exception in the infrastructure or an error in configuring or dispatching an event stream (e.g. publishing to a non-existant port).

class thingflow.base.Filter(previous_in_chain)

A filter has a default input port and a default output port. It is used for data transformations. The default implementations of on_next(), on_completed(), and on_error() just pass the event on to the downstream connection.

class thingflow.base.FunctionFilter(previous_in_chain, on_next=None, on_completed=None, on_error=None, name=None)

Implement a filter by providing functions that implement the on_next, on_completed, and one_error logic. This is useful when the logic is really simple or when a more functional programming style is more convenient.

Each function takes a “self” parameter, so it works almost like it was defined as a bound method. The signatures are then:

on_next(self, x)
on_completed(self)
on_error(self, e)

If a function is not provided to __init__, we just dispatch the call downstream.

class thingflow.base.FunctionIteratorAsOutputThing(initial_state, condition, iterate, result_selector)

Generates an OutputThing sequence by running a state-driven loop producing the sequence’s elements. Example:

   res = GenerateOutputThing(0,
                             lambda x: x < 10,
                             lambda x: x + 1,
                             lambda x: x)

initial_state: Initial state.
condition: Condition to terminate generation (upon returning False).
iterate: Iteration step function.
result_selector: Selector function for results produced in the sequence.

Returns the generated sequence.
class thingflow.base.InputThing

This is the interface for the default input port of a Thing. Other (named) input ports will define similar methods with the names as on_PORT_next(), on_PORT_error(), and on_PORT_completed().

class thingflow.base.IterableAsOutputThing(iterable, name=None)

Convert any interable to an OutputThing. This can be used with the schedule_recurring() and schedule_periodic() methods of the scheduler.

class thingflow.base.OutputThing(ports=None)

Base class for event generators (output things). The non-underscore methods are the public end-user interface. The methods starting with underscores are for interactions with the scheduler.

connect(input_thing, port_mapping=None)

Connect the InputThing to events on a specific port. The port mapping is a tuple of the OutputThing’s port name and InputThing’s port name. It defaults to (default, default).

This returns a fuction that can be called to remove the connection.

pp_connections()

pretty print the set of connections

print_downstream()

Recursively print all the downstream paths. This is for debugging.

trace_downstream()

Install wrappers that print a trace message for each event on this thing and all downsteam things.

class thingflow.base.Scheduler(event_loop)

Wrap an asyncio event loop and provide methods for various kinds of periodic scheduling.

run_forever()

Call the event loop’s run_forever(). We don’t really run forever: the event loop is exited if we run out of scheduled events or if stop() is called.

schedule_on_main_event_loop(output_thing)

Schedule an OutputThing that runs on the main event loop. The OutputThing is assumed to implement EventLoopOutputThingMixin. Returns a callable that can be used to unschedule the OutputThing.

schedule_on_private_event_loop(output_thing)

Schedule an OutputThing that has its own event loop on another thread. The OutputThing is assumed to implement EventLoopOutputThingMixin. Returns a callable that can be used to unschedule the OutputThing, by requesting that the event loop stop.

schedule_periodic(output_thing, interval)

Returns a callable that can be used to remove the OutputThing from the scheduler.

schedule_periodic_on_separate_thread(output_thing, interval)

Schedule an OutputThing to run in a separate thread. It should implement the DirectOutputThingMixin. Returns a callable that can be used to unschedule the OutputThing, by requesting that the child thread stop.

schedule_recurring(output_thing)

Takes a DirectOutputThingMixin and calls _observe() to get events. If, after the call, there are no downstream connections, the scheduler will deschedule the output thing.

This variant is useful for something like an iterable. If the call to get the next event would block, don’t use this! Instead, one of the calls that runs in a separate thread (e.g. schedule_recuring_separate_thread() or schedule_periodic_separate_thread()).

Returns a callable that can be used to remove the OutputThing from the scheduler.

schedule_sensor(sensor, interval, *input_thing_sequence, make_event_fn=<function make_sensor_event>, print_downstream=False)

Create a OutputThing wrapper for the sensor and schedule it at the specified interval. Compose the specified connections (and/or thunks) into a sequence and connect the sequence to the sensor’s OutputThing. Returns a thunk that can be used to remove the OutputThing from the scheduler.

schedule_sensor_on_separate_thread(sensor, interval, *input_thing_sequence, make_event_fn=<function make_sensor_event>)

Create a OutputThing wrapper for the sensor and schedule it at the specified interval. Compose the specified connections (and/or thunks) into a sequence and connect the sequence to the sensor’s OutputThing. Returns a thunk that can be used to remove the OutputThing from the scheduler.

stop()

Stop any active schedules for output things and then call stop() on the event loop.

class thingflow.base.SensorAsOutputThing(sensor, make_event_fn=<function make_sensor_event>)

OutputThing that samples a sensor upon its observe call, creates an event from the sample, and dispatches it forward. A sensor is just an object that has a sensor_id property and a sample() method. If the sensor wants to complete the stream, it should throw a StopIteration exception.

By default, it generates SensorEvent instances. This behavior can be changed by passing in a different function for make_event_fn.

class thingflow.base.SensorEvent(sensor_id, ts, val)
sensor_id

Alias for field number 0

ts

Alias for field number 1

val

Alias for field number 2

class thingflow.base.XformOrDropFilter(previous_in_chain)

Implements a slightly more complex filter protocol where events may be transformed or dropped. Subclasses just need to implement the _filter() and _complete() methods.

on_completed()

Passes on any final event and then passes the notification to the next Thing. If you need to clean up any state, do it in _complete().

on_error(e)

Passes on any final event and then passes the notification to the next Thing. If you need to clean up any state, do it in _complete().

on_next(x)

Calls _filter(x) to process the event. If _filter() returns None, nothing futher is done. Otherwise, the return value is passed to the downstream connection. This allows you to both transform as well as send only selected events.

Errors other than FatalError are handled gracefully by calling self.on_error() and then disconnecing from the upstream OutputThing.

thingflow.base.filtermethod(base, alias=None)

Function decorator that creates a linq-style filter out of the specified function. As described in the thingflow.linq documentation, it should take a OutputThing as its first argument (the source of events) and return a OutputThing (representing the end the filter sequence once the filter is included. The returned OutputThing is typically an instance of thingflow.base.Filter.

The specified function is used in two places:

  1. A method with the specified name is added to the specified class (usually the OutputThing base class). This is for the fluent (method chaining) API.
  2. A function is created in the local namespace for use in the functional API. This function does not take the OutputThing as an argument. Instead, it takes the remaining arguments and then returns a function which, when passed a OutputThing, connects to it and returns a filter.

Decorator arguments:

  • param T base: Base class to extend with method (usually thingflow.base.OutputThing)
  • param string alias: an alias for this function or list of aliases
    (e.g. map for select, etc.).
  • returns: A function that takes the class to be decorated.
  • rtype: func -> func

This was adapted from the RxPy extensionmethod decorator.

thingflow.base.make_sensor_event(sensor, sample)

Given a sensor object and a sample taken from that sensor, return a SensorEvent tuple.

thingflow.sensors

The sensors are not included in the auto-generated documentation, as importing the code requires external libraries (not possible for automated documentation generation). Here is a list of available sensor modules in the ThingFlow-Python distribution:

  • rpi.adxl345_py3 - interface to the adxl345 accelerometer
  • rpi.arduino - interface an Arduino to the Raspberry Pi
  • rpi.gpio - read from the Raspberry Pi GPIO pins
  • lux_sensor - read from a TSL2591 lux sensor

Please see the source code for more details on these sensors.

thingflow.filters

This sub-module provides a collection of filters for providing linq-style programming (inspired by RxPy).

Each function appears as a method on the OutputThing base class, allowing for easy chaining of calls. For example:

sensor.where(lambda x: x > 100).select(lambda x: x*2)

If the @filtermethod decorator is used, then a standalone function is also defined that takes all the arguments except the publisher and returns a function which, when called, takes a publisher and subscribes to the publisher. We call this returned function a “thunk”. Thunks can be used with combinators (like compose(), parallel(), and passthrough(), all defined in combinators.py) as well as directly with the scheduler. For example:

scheduler.schedule_sensor(sensor, where(lambda x: x> 100),
                                  select(lambda x: x*2))

The implementation code for a linq-style filter typically looks like the following:

@filtermethod(OutputThing)
def example(this, ...):
    def _filter(self, x):
        ....
    return FunctionFilter(this, _filter, name="example")

Note that, by convention, we use this as the first argument of the function, rather than self. The this parameter corresponds to the previous element in the chain, while the self parameter used in the _filter() function represents the current element in the chain. If you get these mixed up, you can get an infinite loop!

In general, a linq-style filter takes the previous OutputThing/filter in a chain as its first input, parameters to the filter as subsequent inputs, and returns a OutputThing/filter that should be used as the input to the next step in the filter chain.

thingflow.filters.buffer

class thingflow.filters.buffer.BufferEventUntilTimeoutOrCount(previous_in_chain, event_watcher, scheduler, interval=None, count=None)

A class that passes on the events on the default channel to a buffer (maintained by a BufferEventWatcher). When a timeout fires, the BufferEventWatcher returns the buffer of all events so far.

on_timeout_next(x)

We got the buffered events from the timeout – send it to the subscribers and reset the timer

thingflow.filters.combinators

This module defines combinators for linq-style functions: compose, parallel, and passthrough. A linq-style function takes the previous OutputThing/filter in a chain as its first input (“this”), parameters to the filter as subsequent inputs, and returns a OutputThing/filter that should be used as the input to the next step in the filter chain.

We use the term “thunk” for the special case where the linq-style function takes only a single input - the previous OutputThing/filter in the chain. The Scheduler.schedule_sensor() method and the functions below can accept thunks in place filters. If a linq-style filter F was defined using the @filtermethod decorator, then calling the function directly (not as a method of a OutputThing) returns a thunk.

thingflow.filters.combinators.compose(*thunks)

Given a list of thunks and/or filters, compose them in a sequence and return a thunk.

thingflow.filters.combinators.parallel(*connectees)

Take one or more InputThings/thunks and create a thunk that will connect all of them to “this” when evaluated. Note that the entire set of InputThings acts as spurs - the original OutputThing is returned as the next OutputThing in the chain.

thingflow.filters.dispatch

class thingflow.filters.dispatch.Dispatcher(previous_in_chain, dispatch_rules)

Dispatch rules are a list of (predicate, port) pairs. See the documentation on the dispatch() extension method for details.

thingflow.filters.first

thingflow.filters.json

thingflow.filters.map

Transform each event in the stream. thingflow.filters.select and thingflow.filters.map have the same functionality. Just import one - the @filtermethod decorator will create the other as an alias.

thingflow.filters.never

class thingflow.filters.never.Never

An OutputThing that never calls its connections: creates an empty stream that never goes away

thingflow.filters.output

thingflow.filters.scan

thingflow.filters.select

Transform each event in the stream. thingflow.filters.select and thingflow.filters.map have the same functionality. Just import one - the @filtermethod decorator will create the other as an alias.

thingflow.filters.skip

thingflow.filters.some

thingflow.filters.take

thingflow.filters.timeout

Timeout-related output things and filters.

class thingflow.filters.timeout.EventWatcher

Watch the event stream and then produce an event for a timeout when asked. This can be subclassed to implement different policies.

class thingflow.filters.timeout.SupplyEventWhenTimeout(previous_in_chain, event_watcher, scheduler, interval)

This filter sits in a chain and passes incoming events through to its output. It also passes all events to the on_next() method of the event watcher. If no event arrives on the input after the interval has passed since the last event, event_watcher.produce_event_for_timeout() is called to get a dummy event, which is passed upstream.

on_timeout_completed()

This won’t get called, as the timeout thing does not propate any completions. We just use the primary event stream to figure out when things are done and clear any pending timeouts at that time.

on_timeout_error(e)

This won’t get called, as the Timeout thing does not republish any errors it receives.

on_timeout_next(x)

This method is connected to the Timeout thing’s output. If it gets called, the timeout has fired. We need to reschedule the timeout as well, so that we continue to produce events in the case of multiple consecutive timeouts.

class thingflow.filters.timeout.Timeout(scheduler, timeout_thunk)

An output thing that can shedule timeouts for itself. When a timeout occurs, an event is sent on the default port. The timeout_thunk is called to get the actual event.

thingflow.filters.transducer

Transducers for streams. A transducer maintains internal state which is updated every time on_next is called. It implements a function f: Input X State -> Output X State

For those who speak automata, this is a Mealy machine.

class thingflow.filters.transducer.PeriodicMedianTransducer(period=5)

Emit an event once every period input events. The value is the median of the inputs received since the last emission.

class thingflow.filters.transducer.SensorSlidingMean(history_samples)

Given a stream of SensorEvents, output a new event representing the mean of the event values in the window. The state we keep is the sum of the .val fields within the window. We assume that all events are from the same sensor.

class thingflow.filters.transducer.SlidingWindowTransducer(history_samples)

Transducer that processses a sliding window of events. The most recent history_samples events are kept internally in a deque. When an event arrives, it is pushed onto the deque and an old event is popped off. There are three cases: the very first event, events before the buffer is full, and events after the buffer is full. For each case, the new event, old event (if one is being popped off), and a accumulated state value are passed to a template method. The method returns the transduced event and a new value for the accumulated state. This makes it easy to efficently implement algorithms like a running average or min/max, etc.

Note that the window here is based on the number of samples, not a time period.

thingflow.filters.where

thingflow.adapters

Adapters are components that connect ThingFlows to the external world. Readers are event output things which source an event stream into an ThingFlow process. Writers are input things that translate an event stream to a form used outside of the ThingFlow process. For example, CsvReader is a output thing that reads events from a CSV-formatted spreadsheet file and CsvWriter is an input thing that writes events to a CSV file.

Why don’t we just call adapters OutputThings and InputThings? We want to avoid confusion do to the fact that an OutputThing is used to connect to external inputs while external outputs interface via InputThings.

thingflow.adapters.csv

Adapters for reading/writing event streams to CSV (spreadsheet) files.

class thingflow.adapters.csv.EventSpreadsheetMapping

Define the mapping between an event record and a spreadsheet.

get_header_row()

Return a list of header row column names.

class thingflow.adapters.csv.RollingCsvWriter(previous_in_chain, directory, base_name, mapper=<thingflow.adapters.csv.SensorEventMapping object>, get_date=<function default_get_date_from_event>, sub_port=None)

Write an event stream to csv files, rolling to a new file daily. The filename is basename-yyyy-mm-dd.cvv. Typically, basename is the sensor id. If sub_port is specified, the writer will subscribe to the specified port in the previous filter, rather than the default port. This is helpful when connecting to a dispatcher.

class thingflow.adapters.csv.SensorEventMapping

A maping that works for SensorEvent tuples. We map the time values twice - as the raw timestamp and as an iso-formatted datetime.

thingflow.adapters.generic

Generic reader and writer classes, to be subclassed for specific adapters.

class thingflow.adapters.generic.DirectReader(iterable, mapper, name=None)

A reader that can be run in the current thread (does not block indefinitely). Reads rows from the iterable, converts them to events using the mapping and passes them on.

class thingflow.adapters.generic.EventRowMapping

Interface that converts between events and “rows”

event_to_row(event)

Convert an event to the row representation (usually a list of values).

row_to_event(row)

Convert a row to an event.

Other Adapters

Many adapters are not included in the auto-generated documentation, as importing the code requires external libraries (not possible for the auto document generation). Here is a list of additional adapters in the ThingFlow-Python distirbution:

  • bokeh - interface to the Bokeh visualization framework
  • influxdb - interface to the InfluxDb time series database
  • mqtt - interface to MQTT via paho.mqtt
  • mqtt_async - interface to MQTT via hbmqtt
  • pandas - convert ThingFlow events to Pandas Series data arrays
  • predix - send and query data with the GE Predix Time Series API
  • postgres - interface to the PostgreSQL database
  • rpi.gpio - output on the Raspberry Pi GPIO pins

Please see the source code for more details on these adapters.