August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)
(This page describes an implementation of the above)
If you're interested, this branch in the repository contains:
1) (Long awaited) unpausing of components with outboxes in a chain of
linkages whenever messages are collected from the destination
inbox.
2) Additions to the axon test suite to cover this, and aspects of
size-limited boxes that previously lacked unittests.
Apologies for the length of this posting, read the sections you're
interested in:
WHAT DOES THIS FIX?
ABOUT THE IMPLEMENTATION
TIME COMPLEXITY
TEST SUITE ADDITIONS
If anyone has some spare time to take a look, I'd be really grateful for
a sanity check on the implementation. In particular, checking whether it
breaks any code you've written. It shouldn't, but you never know!
Constructive criticism of the implementation is also welcome - the
rationale is explained below.
WHAT DOES THIS FIX?
(1) is a bugfix ... this capability existed in the orginal 1.0 release
of Axon, before we make the message delivery optimisations. The
optimised version removed the postman from the equation, so messages
would be delivered immediately and directly into the destination
inbox.
What has actually been implemented? Here's a concrete example, or what
this bugfix makes possible:
from Axon.Component import component
from Axon.ThreadedComponent import threadedcomponent
from Axon.AxonExceptions import noSpaceInBox
class Producer(component):
def main(self):
for i in range(100):
sent=False
while not sent:
try:
self.send(i, "outbox")
sent=True
except noSpaceInBox:
self.pause()
yield 1
class SlowConsumer(threadedcomponent):
def __init__(self):
super(SlowConsumer,self).__init__(queuelengths=5)
def main(self):
self.inboxes['inbox'].setSize(5)
while 1:
time.sleep(0.5)
while not self.dataReady("inbox"):
self.pause()
print self.recv("inbox")
from Kamaelia.Pipeline import Pipeline
Pipeline(Producer(),SlowConsumer()).run()
The slow consumer restricts its inbox to hold a maximum of 5 items (and
the internal queues to also a maximum of 5 items). This means the
producer receives noSpaceInBox exceptions when the box it is trying to
send to is full.
This bugfix means that if the Producer pauses, it will be woken when the
Consumer consumes an item (implying there may[*] now be space for it to
send more)
[*] "may" rather than "is" becuase if there are multiple Producers
sending to the same inbox, all are woken, but there might not be enough
free space for them all to send. Plus there are no fairness guarantees
at present.
There are more complete examples like this in the branch in:
.../Tests/Python/Axon/wakeuptest.py
.../Tests/Python/Axon/wakeuptest2.py
ABOUT THE IMPLEMENTATION
This implementation is mainly changes to Box.py ... enabling postbox
objects to build and maintain a list of notification callbacks to be
triggered when a message is 'pop'ed from a box.
The postbox class now lets you specify a notification callback when it
is constructed. This will be called whenever a message is collected
(popped) in the chain of linkages this box is part of.
Inboxes, obviously, do not use this notification callback. Outboxes
do.
Each postbox maintains, a list of all callback it needs to call if a
message is collected from it. When a linkage is added (addsource method
called on the destination of the linkage), it collects this list from
the new source, adds it to its own list, and instructs the next
component down the chain of linkages to do the same, ie. it recurses
down the chain to make sure all downstream boxes update their list of
callbacks.
For example:
outboxA ---->
outboxB ----> inbox1 ----> inbox2 ----> inbox3
outboxC ----> inbox4
The callbacks lists for all boxes will be:
outboxA : []
outboxB : []
inbox1 : [A,B]
inbox2 : [A,B]
inbox3 : [A,B]
outboxC : []
inbox4 : [C]
If a linkage is added from inbox4 ---> inbox2:
outboxA ---->
outboxB ----> inbox1 ----> inbox2 ----> inbox3
A
|
outboxC ----> inbox4 --------' new linkage
Then the callback lists change for inbox2 and inbox3 only:
outboxA : []
outboxB : []
inbox1 : [A,B]
inbox2 : [A,B,C] # C added
inbox3 : [A,B,C] # C added
outboxC : []
inbox4 : [C]
When a linkage is removed, the exact reverse process happens - the list
of callbacks is obtained from the soon-to-be-no-longer source, and is
then removed from the list of local callbacks. The same happens for all
downstream boxes.
To facilitate this implementation, the existing 'retarget' method that
is used to establish the linkages had to be modified slightly to enable
each postbox to know which postbox it is linked to next along the chain
of linkages (held in self.target). Previously it did not maintain this,
as all that was needed was direct references to the final destination
box, for delivering messages.
threadedcomponent has also been modified to ensure it wakes up the
thread if it gets unpaused.
TIME COMPLEXITY
The additional time complexity of all three aspects (issuing the
notification, creating a linkage, removing a linkage) is O(n) worst
case.
Issuing the notifications is, I suspect, by far the most common task. I
reckon this implementation is as cheap as it can be (simply iteration
through a list and making the call backs) without more radical changes
to Axon/Axon's behaviour (eg. reducing the circumstances in which
unpausing should happen)
Creating and removing linkages is therefore more expensive - since it is
effectively collating and updating lists of callbacks. But my personal
feeling is that this tradeoff is acceptable. Particuarly for a first cut
implementation.
TEST SUITE ADDITIONS
Tests have been added to cover the bugfix, for both ordinary and
threaded components.
Tests have also been added to cover setting size limits on inboxes - and
the consequent expectation that when a box becomes full, a noSpaceInBox
exception is raised. The tests again cover both ordinary and threaded
components.