August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)

Axon.ThreadedComponent

Thread based components

A threaded component is like an ordinary component but where the main() method is an ordinary method that is run inside its own thread. (Normally main() is a generator that is given slices of execution time by the scheduler).

This is really useful if your code needs to block - eg. wait on a system call, or if it is better off being able to run on another CPU (though beware python's limited ability to scale across multiple CPUs).

If you don't need these capabilities, consider making your component an ordinary Axon.Component.component instead.

Just like writing an ordinary component

This is nearly identical to writing an ordinary Axon.Component.component. For example this ordinary component:

class MyComponent(Axon.Component.component):

    Inboxes = { "inbox"   : "Send the FOO objects to here",
                "control" : "NOT USED",
              }
    Outboxes = { "outbox" : "Emits BAA objects from here",
                 "signal" : "NOT USED",
               }

    def main(self):
        while 1:
            if self.dataReady("inbox"):
                msg = self.recv("inbox")
                result = ... do something to msg ...
                self.send(result, "outbox")

            yield 1

Can be trivially written as a threaded component simply by removing the yield statements, turning main() into a normal method:

class MyComponent(Axon.ThreadedComponent.threadedcomponent):

    Inboxes = { "inbox"   : "Send the FOO objects to here",
                "control" : "NOT USED",
              }
    Outboxes = { "outbox" : "Emits BAA objects from here",
                 "signal" : "NOT USED",
               }

    def main(self):
        while 1:
            if self.dataReady("inbox"):
                msg = self.recv("inbox")
                result = ... do something to msg ...
                self.send(result, "outbox")

What can a threaded component do?

Exactly the same things any other component can. The following method calls are all implemented in a thread safe manner and function exactly as you should expect:

  • self.link()
  • self.unlink()
  • self.dataReady()
  • self.anyReady()
  • self.recv()
  • self.send()

self.pause() behaves slightly differently:

  • calling self.pause() pauses immediately - not at the next yield statement (since there are no yield statements!)
  • self.pause() has an extra optional 'timeout' argument to allow you to write timer code that can be interrupted, for example, by incoming messages.

In addition, threadedadaptivecommscomponent also supports the extra methods in Axon.AdaptiveCommsComponent.AdaptiveCommsComponent:

  • self.addInbox()
  • self.deleteInbox()
  • self.addOutbox()
  • self.deleteOutbox()
  • etc..

Inbox and Outbox queues

There is one difference: because the main() method runs in a different thread it does not actually interact directly with its own inboxes and outboxes. Internal queues are used to get data between your thread and the component's actual inboxes and outboxes. This is hidden for the most part - the method calls you make to receive and send messages are exactly the same.

When initialising a threadedcomponent you may wish to specify the size limit (queue length) for these queues. There is a size limit so that if your threaded component is delivering to a size limited inbox, the effects of the inbox becoming full propagate back to your thread.

In some ways this is a bit like nesting one component within another - where all the parent component's inboxes and outboxes are forwarded to the child:

  +-----------------------------------------+
  |           threaded component            |
  |                                         |
  |              +----------+               |
  |              |  main()  |               |
INBOX -------> inbox      outbox -------> OUTBOX
  |    queue     |          |     queue     |
  |              +----------+               |
  +-----------------------------------------+

What does this mean in practice?

  • More messages get buffered. - Suppose your threaded component has an internal queues of size 5 and is delivering messages to an inbox on another component with a size limit of 10. From the perspective of your threaded component you will actually be able to send 15 messages before you might start to get Axon.AxonExceptions.noSpaceInBox exceptions.
  • Threaded components that output lots of messages might see unexpected 'box full' exceptions - Suppose your threaded component has a small internal queue size but produces lots of messages very quickly. The rest of the system may not be able to pick up those messages quickly enough to put them into the destination inbox. So even though the destination might not have a size limit you may still get these exceptions.

The secret is to choose a sensible queue size to balance between it being able to buffer enough messages without generating errors whilst not being so large as to render a size limited inbox pointless!

Regulating speed

In addition to being able to pause (with an optional timeout), a threaded component can also regulate its speed by briefly synchronising with the rest of the system. Calling the sync() method simply briefly blocks until the rest of the system can acknowledge.

Stopping a threaded component

Note that it is not safe to forcibly stop a threaded component (by calling the stop() method before the microprocess in it has terminated). This is because there is no true support in python for killing a thread.

Calling stop() prematurely will therefore kill the internal microprocess that handles inbox/outbox traffic on behalf of the thread, resulting in undefined behaviour.

When the thread terminates...

threadedcomponent will terminate - as you would expect. However, there are some subtleties that may need to be considered. These are due to the existence of the intermediate queues used to communicate between the thread and the actual inboxes and outboxes (as described above).

  • When main() terminates, even if it has just recently checked its inqueues (inboxes) there might still be items of data at the inboxes. This is because there is a gap between data that arriving at an inbox, and it being forwarded into an inqueue going to the thread.
  • When main() terminates, threadedcomponent will keep executing until it has finished successfully sending any data in outqueues, out of the respective "outboxes". This means that anything main() thinks it has sent is guaranteed to be sent. But if the destination is a size limited inbox that has become full (and that never gets emptied), then threadedcomponent will indefinitely stall because it cannot finish sending.

How is threaded component implemented?

threadedcomponet subclasses Axon.Components.component. It overrides the activate() method, to force activation to use a method called _localmain() instead of the usual main(). The code that someone writes for their main() method is instead run in a separate thread.

The code running in main() does not directly access inboxes our outboxes and doesn't actually create or destroy linkages itself. _localmain() can be thought of as a kind of proxy for the thread - acting on its behalf within the main scheduler run thread.

main() is wrapped by _threadmain() which tries to trap any unhandled exceptions generated in the thread and pass them back to _localmain() to be rethrown.

Internal state:

  • _threadrunning - flag, cleared by the thread when it terminates
  • queuelengths - size to be used for internal queues between thread and inboxes and outboxes
  • _threadmainmethod - the main method to be run as a thread
  • _thethread - the thread object itself

Internal to _localmain():

  • running - flag tracking if the thread is still runnning
  • stuffWaiting - flag tracking if there is are things that need to be done (if there is stuff waiting then _localmain() should not pause or terminate until it finishes)

Communication between the thread and _localmain():

  • inqueues - dictionary of thread safe queues for getting data from inboxes to the thread
  • outqueues - dictionary of thread safe queues for getting data from the thread to outboxes
  • threadtoaxonqueue - thread safe queue for making requests to _localmain()
  • axontothreadqueue - thread safe queue for replies from _localmain()
  • threadWakeUp - thread safe event flag for waking up the thread if sleeping
  • _threadId - unique id that is given to the thread as its 'name'
  • _localThreadId - the thread id (name) of the thread _localmain() and the scheduler run in

The relationship between _localmain() and the main() method (running in a separate thread) looks like this:

   +---------------------------------------------------------------------+
   |                         threaded component                          |
   |                                                                     |
   |           +--------------------------------------------+            |
   |           |                _localmain()                |            |
 INBOX ------> |                                            | -------> OUTBOX
   |           |    Ordinary generator based microprocess   |            |
CONTROL -----> |       in same thread as rest of system     | -------> SIGNAL
   |           |                                            |            |
   |           +--------------------------------------------+            |
   |              |          ^               ^            |              |
   |              |          |               |            |              |
   |          inqueues   outqueues   threadtoaxonqueue    |              |
   |           "inbox"    "outbox"           |            |              |
   |          "control"   "signal"           |    axontothreadqueue      |
   |              |          |               |            |              |
   |              V          |               |            V              |
   |           +--------------------------------------------+            |
   |           |                   main()                   |            |
   |           |                                            |            |
   |           |        Runs in a separate thread           |            |
   |           +--------------------------------------------+            |
   |                                                                     |
   +---------------------------------------------------------------------+

When a message arrives at an inbox, _localmain() collects it and places it into the thread safe queue self.inqueues[boxname] from which the thread can collect it. self.dataReady() and self.recv() are both overridden so they access the queues instead of the normal inboxes.

Similarly, when the thread wants to send to an outbox; self.send() is overridden so that it is actually sent to a thread safe queue self.outqueues[boxname] from which _localmain() collects it and sends it on.

Because all queues have a size limit (specified in at initialisation of the threaded component) this enables the effects of size limited inboxes to propagate up to the separate thread, via the queue. The implementation of self.send() is designed to mimic the behaviour

For other methods such as link(), unlink() and (in the case of threadedadaptivecommscomponent) addInbox(), deleteInbox(), addOutbox() and deleteOutbox(), the _localmain() microprocess also acts on the thread's behalf. The request to do something is sent to _localmain() through the thread safe queue self.threadtoaxonqueue. When the operation is complete, an acknowledgement is sent back via another queue self.axontothreadqueue:

_localmain()                         main() [thread]
     |                                 |
     |       please call 'link()'      |
     | ------------------------------> |
     |                                 |
     |                         self.link() called
     |                                 |
     |      return value from call     |
     | <------------------------------ |
     |                                 |

The thread does not continue until it receives the acknowledgement - this is to prevent inconsistencies in state. For example, a call to create an inbox might be followed by a query to determine whether there is data in it - the inbox therefore must be fully set up before that query can be handled.

This is implemented by the _do_threadsafe() method. This method detects whether it is being called from the same thread as the scheduler (by comparing thread IDs). If it is not the same thread, then it puts a request into self.threadtoaxonqueue and blocks on self.axontothreadqueue waiting for a reply. The request is simply a tuple of a method or object to call and the associated arguments. When _localmain() collects the request it issues the call and responds with the return value.

self.pause() is overridden and equivalent functionality reimplemented by blocking the thread on a threading.Event() object which can be signalled by _localmain() whenever the thread ought to wake up.

The 'Adaptive' version does not ensure the resource tracking and retrieval methods thread safe. This is because it is assumed these will only be accessed internally by the component itself from within its separate thread. _localmain() does not touch these resources.

XXX TODO: Thread shutdown - no true support for killing threads in python
(if ever). stop() method therefore doesn't stop the thread. Only stops the internal _localmain() microprocess, which therefore cuts the thread off from communicating with the rest of the system.

Test documentation

Tests passed:

  • addInbox - adds a new inbox with the specified name. Component can then receive from that inbox.
  • addOutbox - adds a new outbox with the specified name. Component can then send to that inbox.
  • __init__ - can accept no arguments
  • __init__ - class constructor is called with no arguments.
  • __init__ - accepts one argument
  • Setting the queue size in the initializer limits the number of messages that can queue up waiting to be sent out by the main thread.
  • If a threadedcomponent outbox is linked to a size restricted inbox, then the thread can send at most inbox_size+internal_queue_size messages before it receives a noSpaceInBox exception.
  • Setting the inbox size means at most inbox_size+internal_queue_size messages can queue up before the sender receives a noSpaceInBox exception
  • test_TakingFromDestinationAllowsMoreToBeDelivered (__main__.threadedcomponent_Test)
  • There is a default limit on the number of messages that can queue up waiting to be sent out by the main thread.
  • main() - can receive data sent to the component's inbox(es) using the standard dataReady() and recv() methods.
  • main() - can send data to the component's outbox(es) using the standard send() method.
  • link() unlink() - thread safe when called. The postoffice link() and unlink() methods are not expected to be capable of re-entrant use.
  • _localmain() microprocess also terminates when the thread terminates
  • threadedcomponent ensures that if the thread terminates, any messages still pending in outqueues (waiting to be sent out of outboxes) get sent, even if it is held up for a while by noSpaceInBox exceptions
  • threadedcomponent terminates when the thread terminates, even if data is clogged in one of the inqueues
  • __init__ - can accept no arguments
  • __init__ - class constructor is called with no arguments.
  • __init__ - accepts one argument
  • main() -runs in a separate thread of execution

Axon.ThreadedComponent.threadedadaptivecommscomponent

class threadedadaptivecommscomponent(threadedcomponent, Axon.AdaptiveCommsComponent._AdaptiveCommsable)

threadedadaptivecommscomponent([queuelengths]) -> new threadedadaptivecommscomponent

Base class for a version of an Axon adaptivecommscomponent that runs main() in a separate thread (meaning it can, for example, block).

Subclass to make your own.

Internal queues buffer data between the thread and the Axon inboxes and outboxes of the component. Set the default queue length at initialisation (default=1000).

Like an adaptivecommscomponent, inboxes and outboxes can be added and deleted at runtime.

A simple example:

class IncrementByN(Axon.ThreadedComponent.threadedcomponent):

    Inboxes = { "inbox" : "Send numbers here",
                "control" : "NOT USED",
              }
    Outboxes = { "outbox" : "Incremented numbers come out here",
                "signal" : "NOT USED",
               }

    def __init__(self, N):
        super(IncrementByN,self).__init__()
        self.n = N

    def main(self):
        while 1:
            while self.dataReady("inbox"):
                value = self.recv("inbox")
                value = value + self.n
                self.send(value,"outbox")

            if not self.anyReady():
                self.pause()

Methods defined here

__init__(self, *argL, **argD)

_unsafe_addInbox(self, *args)

Internal thread-unsafe code for adding an inbox.

_unsafe_addOutbox(self, *args)

_unsafe_deleteInbox(self, name)

_unsafe_deleteOutbox(self, name)

addInbox(self, *args)

Allocates a new inbox with name based on the name provided. If a box with the suggested name already exists then a variant is used instead.

Returns the name of the inbox added.

addOutbox(self, *args)

Allocates a new outbox with name based on the name provided. If a box with the suggested name already exists then a variant is used instead.

Returns the name of the outbox added.

deleteInbox(self, name)

Deletes the named inbox. Any messages in it are lost.

Try to ensure any linkages to involving this outbox have been destroyed - not just ones created by this component, but by others too! Behaviour is undefined if this is not the case, and should be avoided.

deleteOutbox(self, name)

Deletes the named outbox.

Try to ensure any linkages to involving this outbox have been destroyed - not just ones created by this component, but by others too! Behaviour is undefined if this is not the case, and should be avoided.

Methods inherited from Axon.ThreadedComponent.threadedcomponent :

Methods inherited from Axon.Component.component :

Methods inherited from Axon.Microprocess.microprocess :

Methods inherited from Axon.AdaptiveCommsComponent._AdaptiveCommsable :

Axon.ThreadedComponent.threadedcomponent

class threadedcomponent(Axon.Component.component)

threadedcomponent([queuelengths]) -> new threadedcomponent

Base class for a version of an Axon component that runs main() in a separate thread (meaning it can, for example, block). Subclass to make your own.

Internal queues buffer data between the thread and the Axon inboxes and outboxes of the component. Set the default queue length at initialisation (default=1000).

A simple example:

class IncrementByN(Axon.ThreadedComponent.threadedcomponent):

    Inboxes = { "inbox" : "Send numbers here",
                "control" : "NOT USED",
              }
    Outboxes = { "outbox" : "Incremented numbers come out here",
                 "signal" : "NOT USED",
               }

    def __init__(self, N):
        super(IncrementByN,self).__init__()
        self.n = N

    def main(self):
        while 1:
            while self.dataReady("inbox"):
                value = self.recv("inbox")
                value = value + self.n
                self.send(value,"outbox")

            if not self.anyReady():
                self.pause()

Methods defined here

__init__(self, queuelengths, **argd)

_do_threadsafe(self, cmd, argL, argD)

Internal method for ensuring a method call takes place in the main scheduler's thread.

_handlemessagefromthread(self, msg)

Unpacks a message containing a request to run a method of the form (objectToCall, argList, argDict) then calls it and places the result in the axontothreadqueue queue.

Used to execute methods on behalf of the separate thread. Results are returned to it via the return queue.

_localmain(self)

Do not overide this unless you reimplement the pass through of the boxes to the threads, and state management.

_threadmain(self)

Exception trapping wrapper for main().

Runs in the separate thread. Catches any raised exceptions and attempts to pass them back to _localmain() to be re-raised.

activate(self[, Scheduler][, Tracker][, mainmethod])

Call to activate this microprocess, so it can start to be executed by a scheduler. Usual usage is to simply call x.activate().

See Axon.Microprocess.microprocess.activate() for more info.

closeDownComponent(self)

Stub method. This method is designed to be overridden.

dataReady(self[, boxname])

Returns true if data is available in the requested inbox.

Used by the main() method of a component to check an inbox for ready data.

Call this method to periodically check whether you've been sent any messages to deal with!

You are unlikely to want to override this method.

forwardInboxToThread(self, box)

initialiseComponent(self)

Stub method. This method is designed to be overridden.

link(self, source, sink[, passthrough])

Creates a linkage from one inbox/outbox to another.

-- source - a tuple (component, boxname) of where the link should start from -- sink - a tuple (component, boxname) of where the link should go to

Other optional arguments:

  • passthrough=0 - (the default) link goes from an outbox to an inbox
  • passthrough=1 - the link goes from an inbox to another inbox
  • passthrough=2 - the link goes from an outbox to another outbox

See Axon.Postoffice.link(...) for more information.

main(self)

Override this method, writing your own main thread of control as an ordinary method. When the component is activated and the scheduler is running, this what gets executed.

Write it as an ordinary method. Because it is run in a separate thread, it can block.

If you do not override it, then a default main method exists instead that will:

  1. Call self.initialiseComponent()
  2. Loop forever calling self.mainBody() repeatedly until mainBody() returns a False/zero result.
  3. Call self.closeDownComponent()

mainBody(self)

Stub method. This method is designed to be overridden.

pause(self[, timeout])

Pauses the thread and blocks - does not return until the something happens to re-awake it, or until it times out (if the optional timeout is specified)

Must only be called from within the main() method - ie. from within the separate thread.

Keyword arguments:

  • timeout -- Optional. None, or the number of seconds after which this call should unblock itself (default=None)

recv(self[, boxname])

returns the first piece of data in the requested inbox.

Used by the main() method to recieve a message from the outside world. All comms goes via a named box/input queue

You will want to call this method to actually recieve messages you've been sent. You will want to check for new messages using dataReady first though.

You are unlikely to want to override this method.

send(self, message[, boxname])

appends message to the requested outbox.

Used by the main() method to send a message to the outside world. All comms goes via a named box/output queue

You will want to call this method to send messages.

Raises Axon.AxonExceptions.noSpaceInBox if this outbox is linked to a destination inbox that is full, or if your component is producing messages faster than Axon can pass them on.

You are unlikely to want to override this method.

sync(self)

Call this from main() to synchronise with the main scheduler's thread.

You may wish to do this to throttle your component's behaviour This is akin to posix.sched_yield or shoving extra "yield" statements into a component's generator.

unlink(self[, thecomponent][, thelinkage])

Destroys all linkages to/from the specified component or destroys the specific linkage specified.

Only destroys linkage(s) that were created by this component itself.

Keyword arguments:

  • thecomponent -- None or a component object
  • thelinakge -- None or the linkage to remove

Methods inherited from Axon.Component.component :

Methods inherited from Axon.Microprocess.microprocess :

Feedback

Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!

Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.

-- Automatic documentation generator, 09 Dec 2009 at 04:00:25 UTC/GMT