3. Implementing an OutputThing

In most cases, one can simply wrap a sensor in the SensorAsOutputThing class and not worry about the details of how to implement output things. There are also several pre-defined readers under thingflow.adapters that can obtain events from external sources like message brokers, flat files, and databases.

The most likly reason for implmenting a new OutputThing is that you want to create a new adapter type that does not exist in the standard ThingFlow library. We will walk through the details in this document.

Subclassing

When implmenting an output thing, one subclasses from thingflow.base.OutputThing. To emit a new event, the subclass calls the _dispatch_next method with the event and port name. To signal an error or completion of the event stream, one calls _dispatch_error or _dispatch_completed, respectively. The base class implementation of these methods are responsible for calling the on_next, on_error, and on_completed methods for each of the connected things.

The code to call these _dispatch methods goes into a well-known method to be called by the scheduler. The specific method depends how the output thing will interact with the scheduler. There are two cases supported by ThingFlow and three associated mixin-classes that define the methods:

  1. DirectOutputThingMixin defines an _observe method that can be called directly by the scheduler either in the main thread (via Scheduler.schedule_period() or Scheduler.schedule_recurring()) or in a separate thread (via Scheduler.schedule_periodic_on_separate_thread()).
  2. EventLoopOutputThingMixin is used for an output thing that has its own separate event loop. This is run in a separate thread and the connected input things are called in the main thread.

Simple CSV Reader

OK, with all that out of the way, let us define a simple OutputThing. We will create a simple CSV-formatted spreadsheet file reader. Each row in the file corresponds to an event. Here is the class definition (found in examples/simple_csv_reader.py):

import csv
from thingflow.base import OutputThing, DirectOutputThingMixin,\
                           SensorEvent, FatalError

class SimpleCsvReader(OutputThing, DirectOutputThingMixin):
    def __init__(self, filename, has_header_row=True):
        super().__init__() # Make sure the output_thing class is initialized
        self.filename = filename
        self.file = open(filename, 'r', newline='')
        self.reader = csv.reader(self.file)
        if has_header_row:
            # swallow up the header row so it is not passed as data
            try:
                self.reader.__next__()
            except Exception as e:
                raise FatalError("Problem reading header row of csv file %s: %s" %
                                 (filename, e))

    def _observe(self):
        try:
            row = self.reader.__next__()
            event = SensorEvent(ts=float(row[0]), sensor_id=row[1],
                                val=float(row[2]))
            self._dispatch_next(event)
        except StopIteration:
            self.file.close()
            self._dispatch_completed()
        except FatalError:
            self._close()
            raise
        except Exception as e:
            self.file.close()
            self._dispatch_error(e)

The SimpleCsvReader class subclasses from both OutputThing and DirectOutputThingMixin. Subclassing from OutputThing provides the machinery needed to register connections and propagate events to downstream input things. DirectOutputThingMixin defines an empty _observe() method and indicates that the scheduler should call _observe() to dispatch events whenever the reader has been scheduled.

In the __init__() constructor, we first make sure that the base class infrastructure is initialized through super().__init__(). Next, we open the file, set up the csv reader, and read the header (if needed).

The main action is happening in _observe(). When scheduled, it reads the next row from the csv file and creates a SensorEvent from it. This event is passed on to the output port’s connections via _dispatch_next(). If the end of the file has been reached (indicated by the StopIteration exception), we instead call _dispatch_completed(). There are two error cases:

  1. If a FatalError exception is thrown, we close our connection and propagate the error up. This will lead to an early termination of the event loop.
  2. If any other exception is thrown, we pass it downstream via _dispatch_error(). It will also close the event stream and cause the SimpleCsvReader to be de-scheduled. The main event loop may continue, assuming that there are other scheduled objects.

We could save some work in implementing our reader by subclassing from thingflow.adapters.generic.DirectReader. It provides the dispatch behavior common to most readers.

Reading a File

Now, let us create a simple data file test.csv:

ts,id,value
1,1,2
2,1,3
3,1,455
4,1,55

We can instantiate a SimpleCsvReader to read in the file via:

reader = SimpleCsvReader("test.csv")

Now, let’s hook it to an printing input thing and then run it in the event loop:

import asyncio
from thingflow.base import Scheduler
import thingflow.adapters.output # load the output method

reader.output()
scheduler = Scheduler(asyncio.get_event_loop())
scheduler.schedule_recurring(reader)
scheduler.run_forever()

We use schedule_recurring() instead of schedule_periodic(), as we expect all the data to be already present in the file. There is no sense in taking periodic samples.

The output looks as follows:

SensorEvent(sensor_id='1', ts=1.0, val=2.0)
SensorEvent(sensor_id='1', ts=2.0, val=3.0)
SensorEvent(sensor_id='1', ts=3.0, val=455.0)
SensorEvent(sensor_id='1', ts=4.0, val=55.0)
No more active schedules, will exit event loop

Note that the event loop terminates on its own. This is due to the call to _dispatch_completed() when the csv reader throws StopIteration.

Output Things with Private Event Loops

There can be cases when the underlying API to be called by the OutputThing requires its own event loop / event listener. To handle this situation, use the interface provided by EventLoopOutputThingMixin. Your main event loop for the output ting is implemented in the _observe_event_loop(). If you call the scheduler’s schedule_on_private_event_loop() method, it will run this method in a separate thread and then dispatch any events to the scheduler’s main event loop (running in the main thread).

To see some example code demonstrating an output thing using a private event loop, see thingflow.adapters.mqtt.MQTTReader.