4. Things with Non-default Ports¶
ThingFlow provides a general dataflow architecture. Output things can
output events on different ports and input things can receive messages via
different ports. Each connect()
call can rename ports, allowing the
interconnection of any compatible ports. For example, one might have code like:
output_thing.connect(input_thing,
port_mapping=('out_port_name', 'in_port_name'))
As you know, ThingFlow provides a special default
port that does not need
any mapping. This makes it convenient for building chains of filters and is good
enough most of the time. However, when you need a more complex data flow, the
more general mapping capability can be very helpful. We will now look at it in
more detail.
Multiple Output Ports¶
To create an output thing which sends messaages on multiple output ports,
one subclasses from OutputThing
or one of its descendents. Here is a simple
thing that accepts events on the default input port and sends values to one or
more of three ports:
class MultiPortOutputThing(OutputThing, InputThing):
def __init__(self, previous_in_chain):
super().__init__(ports=['divisible_by_two', 'divisible_by_three',
'other'])
# connect to the previous filter
self.disconnect_from_upstream = previous_in_chain.connect(self)
def on_next(self, x):
val = int(round(x.val))
if (val%2)==0:
self._dispatch_next(val, port='divisible_by_two')
if (val%3)==0:
self._dispatch_next(val, port='divisible_by_three')
if (val%3)!=0 and (val%2)!=0:
self._dispatch_next(val, port='other')
def on_completed(self):
self._dispatch_completed(port='divisible_by_two')
self._dispatch_completed(port='divisible_by_three')
self._dispatch_completed(port='other')
def on_error(self, e):
self._dispatch_error(e, port='divisible_by_two')
self._dispatch_error(e, port='divisible_by_three')
self._dispatch_error(e, port='other')
In the _init__
constructor, we must be sure to call the super class’s
constructor, passing it the list of ports that will be used. If the list is
not provided, it is initialized to the default port, and sending to any other
port would be a runtime error.
This thing will accept events from the default input port, so we subclass from
InputThing
and process sensor values in the on_next()
method.
We first obtain a value from the event and round it
to the nearest integer. Next, we see if it is divisible by 2. If so, we call
_dispatch_next()
to dispatch the value to the divisible_by_two
port,
passing the port name as the second parameter (it defaults to default
).
Next, we check for divisibity by three, and dispatch the value to the
divisible_by_three
port if it is divisible. Note that a number like six
will get dispatched to both ports. Finally, if the value is not divisible by
either two or three, we dispatch it to the other
port.
For the on_completed()
and on_error()
events, we forward the
notifications to each of the output ports, by calling _dispatch_completed()
and _dispatch_next()
three times. In general, each port can be viewed as
a separate event stream with its own state. An output thing might decide to
mark completed a subset of its ports while continuing to send new events
on other ports.
Let us look at how this thing might be called:
sensor = SensorAsOutputThing(RandomSensor(1, mean=10, stddev=5,
stop_after_events=10))
mtthing = MultiPortOutputThing(sensor)
mtthing.connect(lambda v: print("even: %s" % v),
port_mapping=('divisible_by_two', 'default'))
mtthing.connect(lambda v: print("divisible by three: %s" % v),
port_mapping=('divisible_by_three', 'default'))
mtthing.connect(lambda v: print("not divisible: %s" % v),
port_mapping=('other', 'default'))
scheduler.schedule_recurring(sensor)
scheduler.run_forever()
Here, we map a different anonymous print function to each output port of the
thing. Internally, connect
is wrapping the anonymous functions with
CallableAsInputThing
. This thing only listens on a default port, so we
have to map the port names in the connect()
calls.
The full code for this example is at examples/multi_port_example.py
.
Multiple Input Ports¶
Now, let us consider a thing that supports incoming messages on multiple
ports. Messages on non-default input ports are passed to different methods on an
input thing. Specifically, given a port name PORT
, events are dispatched
to the method on_PORT_next()
, completion of the port’s stream is
dispatched to on_PORT_completed()
, and errors are dispatched to
on_PORT_error()
. Multiple ports are frequently useful
when implementing state machines or filters that combine multiple inputs.
As an example, assume that we have a state machine that reads data from two sensors: a left sensor and a right sensor. Here is how the code might be structured:
class StateMachine:
def on_left_next(self, x):
...
def on_left_completed(self):
...
def on_left_error(self):
...
def on_right_next(self, x):
...
def on_right_completed(self):
...
def on_right_error(self):
...
Here is how we might set up the connections to the sensors:
left = SensorAsOutputThing(LuxSensor('left'))
right = SensorPsOutputThing(LuxSensor('right'))
state_machine = StateMachine()
left.connect(state_machine, port_mapping=('default', 'left'))
right.connect(state_machine, port_mapping=('default', 'right'))
Each sensor outputs its data on the default port, so we map the connections
to the left
and right
ports on the state machine.
Multi-port Filters¶
A filter is an ThingFlow element that has both default input and default output ports. Filters can be easily connected into pipelines. Filters usually have a single input port and a single output port, but other topologies are possible (typically one-to-many or many-to-one). One particularly useful filter is the dispatcher. A dispatcher routes each incoming event (on the default input port) to one of several output ports, based on some criteria.
For example, consider the filter thingflow.filters.dispatch.Dispatcher
. This
filter is provided a set of routing rules in the form of (predicate function,
output port) pairs. An output port is created for each rule (plus the default
port). In the on_next()
method of the filter’s InputThing interface, an
incoming event is tested on each of the predicate functions in order. When a
predicate is found that returns true, the event is dispatched to the associated
port and the rule search stops for that event. If an event fails all the
predicate checks, it is passed to the default
port.
Here is the most relevant parts of the filter code (see dispatch.py
for the
complete code):
class Dispatcher(OutputThing, InputThing):
def __init__(self, previous_in_chain, dispatch_rules):
ports = [port for (pred, port) in dispatch_rules] + ['default']
super().__init__(ports=ports)
self.dispatch_rules = dispatch_rules
self.disconnect = previous_in_chain.connect(self)
def on_next(self, x):
for (pred, port) in self.dispatch_rules:
if pred(x):
self._dispatch_next(x, port=port)
return
self._dispatch_next(x, port='default') # fallthrough case
We will use this dispatcher within a larger example in the subsection Solar Water Heater.