August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)
A threaded component is like an ordinary component but where the main() method is an ordinary method that is run inside its own thread. (Normally main() is a generator that is given slices of execution time by the scheduler).
This is really useful if your code needs to block - eg. wait on a system call, or if it is better off being able to run on another CPU (though beware python's limited ability to scale across multiple CPUs).
If you don't need these capabilities, consider making your component an ordinary Axon.Component.component instead.
This is nearly identical to writing an ordinary Axon.Component.component. For example this ordinary component:
class MyComponent(Axon.Component.component):
Inboxes = { "inbox" : "Send the FOO objects to here",
"control" : "NOT USED",
}
Outboxes = { "outbox" : "Emits BAA objects from here",
"signal" : "NOT USED",
}
def main(self):
while 1:
if self.dataReady("inbox"):
msg = self.recv("inbox")
result = ... do something to msg ...
self.send(result, "outbox")
yield 1
Can be trivially written as a threaded component simply by removing
the yield
statements, turning
main() into a normal method:
class MyComponent(Axon.ThreadedComponent.threadedcomponent):
Inboxes = { "inbox" : "Send the FOO objects to here",
"control" : "NOT USED",
}
Outboxes = { "outbox" : "Emits BAA objects from here",
"signal" : "NOT USED",
}
def main(self):
while 1:
if self.dataReady("inbox"):
msg = self.recv("inbox")
result = ... do something to msg ...
self.send(result, "outbox")
Exactly the same things any other component can. The following method calls are all implemented in a thread safe manner and function exactly as you should expect:
self.pause() behaves slightly differently:
In addition, threadedadaptivecommscomponent also supports the extra methods in Axon.AdaptiveCommsComponent.AdaptiveCommsComponent:
There is one difference: because the main() method runs in a different thread it does not actually interact directly with its own inboxes and outboxes. Internal queues are used to get data between your thread and the component's actual inboxes and outboxes. This is hidden for the most part - the method calls you make to receive and send messages are exactly the same.
When initialising a threadedcomponent you may wish to specify the size limit (queue length) for these queues. There is a size limit so that if your threaded component is delivering to a size limited inbox, the effects of the inbox becoming full propagate back to your thread.
In some ways this is a bit like nesting one component within another - where all the parent component's inboxes and outboxes are forwarded to the child:
+-----------------------------------------+
| threaded component |
| |
| +----------+ |
| | main() | |
INBOX -------> inbox outbox -------> OUTBOX
| queue | | queue |
| +----------+ |
+-----------------------------------------+
What does this mean in practice?
The secret is to choose a sensible queue size to balance between it being able to buffer enough messages without generating errors whilst not being so large as to render a size limited inbox pointless!
In addition to being able to pause (with an optional timeout), a threaded component can also regulate its speed by briefly synchronising with the rest of the system. Calling the sync() method simply briefly blocks until the rest of the system can acknowledge.
Note that it is not safe to forcibly stop a threaded component (by calling the stop() method before the microprocess in it has terminated). This is because there is no true support in python for killing a thread.
Calling stop() prematurely will therefore kill the internal microprocess that handles inbox/outbox traffic on behalf of the thread, resulting in undefined behaviour.
threadedcomponent will terminate - as you would expect. However, there are some subtleties that may need to be considered. These are due to the existence of the intermediate queues used to communicate between the thread and the actual inboxes and outboxes (as described above).
threadedcomponet subclasses Axon.Components.component. It overrides the activate() method, to force activation to use a method called _localmain() instead of the usual main(). The code that someone writes for their main() method is instead run in a separate thread.
The code running in main() does not directly access inboxes our outboxes and doesn't actually create or destroy linkages itself. _localmain() can be thought of as a kind of proxy for the thread - acting on its behalf within the main scheduler run thread.
main() is wrapped by _threadmain() which tries to trap any unhandled exceptions generated in the thread and pass them back to _localmain() to be rethrown.
Internal state:
Internal to _localmain():
Communication between the thread and _localmain():
The relationship between _localmain() and the main() method (running in a separate thread) looks like this:
+---------------------------------------------------------------------+
| threaded component |
| |
| +--------------------------------------------+ |
| | _localmain() | |
INBOX ------> | | -------> OUTBOX
| | Ordinary generator based microprocess | |
CONTROL -----> | in same thread as rest of system | -------> SIGNAL
| | | |
| +--------------------------------------------+ |
| | ^ ^ | |
| | | | | |
| inqueues outqueues threadtoaxonqueue | |
| "inbox" "outbox" | | |
| "control" "signal" | axontothreadqueue |
| | | | | |
| V | | V |
| +--------------------------------------------+ |
| | main() | |
| | | |
| | Runs in a separate thread | |
| +--------------------------------------------+ |
| |
+---------------------------------------------------------------------+
When a message arrives at an inbox, _localmain() collects it and places it into the thread safe queue self.inqueues[boxname] from which the thread can collect it. self.dataReady() and self.recv() are both overridden so they access the queues instead of the normal inboxes.
Similarly, when the thread wants to send to an outbox; self.send() is overridden so that it is actually sent to a thread safe queue self.outqueues[boxname] from which _localmain() collects it and sends it on.
Because all queues have a size limit (specified in at initialisation of the threaded component) this enables the effects of size limited inboxes to propagate up to the separate thread, via the queue. The implementation of self.send() is designed to mimic the behaviour
For other methods such as link(), unlink() and (in the case of threadedadaptivecommscomponent) addInbox(), deleteInbox(), addOutbox() and deleteOutbox(), the _localmain() microprocess also acts on the thread's behalf. The request to do something is sent to _localmain() through the thread safe queue self.threadtoaxonqueue. When the operation is complete, an acknowledgement is sent back via another queue self.axontothreadqueue:
_localmain() main() [thread]
| |
| please call 'link()' |
| ------------------------------> |
| |
| self.link() called
| |
| return value from call |
| <------------------------------ |
| |
The thread does not continue until it receives the acknowledgement - this is to prevent inconsistencies in state. For example, a call to create an inbox might be followed by a query to determine whether there is data in it - the inbox therefore must be fully set up before that query can be handled.
This is implemented by the _do_threadsafe() method. This method detects whether it is being called from the same thread as the scheduler (by comparing thread IDs). If it is not the same thread, then it puts a request into self.threadtoaxonqueue and blocks on self.axontothreadqueue waiting for a reply. The request is simply a tuple of a method or object to call and the associated arguments. When _localmain() collects the request it issues the call and responds with the return value.
self.pause() is overridden and equivalent functionality reimplemented by blocking the thread on a threading.Event() object which can be signalled by _localmain() whenever the thread ought to wake up.
The 'Adaptive' version does not ensure the resource tracking and retrieval methods thread safe. This is because it is assumed these will only be accessed internally by the component itself from within its separate thread. _localmain() does not touch these resources.
Tests passed:
threadedadaptivecommscomponent([queuelengths]) -> new threadedadaptivecommscomponent
Base class for a version of an Axon adaptivecommscomponent that runs main() in a separate thread (meaning it can, for example, block).
Subclass to make your own.
Internal queues buffer data between the thread and the Axon inboxes and outboxes of the component. Set the default queue length at initialisation (default=1000).
Like an adaptivecommscomponent, inboxes and outboxes can be added and deleted at runtime.
A simple example:
class IncrementByN(Axon.ThreadedComponent.threadedcomponent):
Inboxes = { "inbox" : "Send numbers here",
"control" : "NOT USED",
}
Outboxes = { "outbox" : "Incremented numbers come out here",
"signal" : "NOT USED",
}
def __init__(self, N):
super(IncrementByN,self).__init__()
self.n = N
def main(self):
while 1:
while self.dataReady("inbox"):
value = self.recv("inbox")
value = value + self.n
self.send(value,"outbox")
if not self.anyReady():
self.pause()
Internal thread-unsafe code for adding an inbox.
Allocates a new inbox with name based on the name provided. If a box with the suggested name already exists then a variant is used instead.
Returns the name of the inbox added.
Allocates a new outbox with name based on the name provided. If a box with the suggested name already exists then a variant is used instead.
Returns the name of the outbox added.
Deletes the named inbox. Any messages in it are lost.
Try to ensure any linkages to involving this outbox have been destroyed - not just ones created by this component, but by others too! Behaviour is undefined if this is not the case, and should be avoided.
Deletes the named outbox.
Try to ensure any linkages to involving this outbox have been destroyed - not just ones created by this component, but by others too! Behaviour is undefined if this is not the case, and should be avoided.
threadedcomponent([queuelengths]) -> new threadedcomponent
Base class for a version of an Axon component that runs main() in a separate thread (meaning it can, for example, block). Subclass to make your own.
Internal queues buffer data between the thread and the Axon inboxes and outboxes of the component. Set the default queue length at initialisation (default=1000).
A simple example:
class IncrementByN(Axon.ThreadedComponent.threadedcomponent):
Inboxes = { "inbox" : "Send numbers here",
"control" : "NOT USED",
}
Outboxes = { "outbox" : "Incremented numbers come out here",
"signal" : "NOT USED",
}
def __init__(self, N):
super(IncrementByN,self).__init__()
self.n = N
def main(self):
while 1:
while self.dataReady("inbox"):
value = self.recv("inbox")
value = value + self.n
self.send(value,"outbox")
if not self.anyReady():
self.pause()
Internal method for ensuring a method call takes place in the main scheduler's thread.
Unpacks a message containing a request to run a method of the form (objectToCall, argList, argDict) then calls it and places the result in the axontothreadqueue queue.
Used to execute methods on behalf of the separate thread. Results are returned to it via the return queue.
Do not overide this unless you reimplement the pass through of the boxes to the threads, and state management.
Exception trapping wrapper for main().
Runs in the separate thread. Catches any raised exceptions and attempts to pass them back to _localmain() to be re-raised.
Call to activate this microprocess, so it can start to be executed by a scheduler. Usual usage is to simply call x.activate().
See Axon.Microprocess.microprocess.activate() for more info.
Stub method. This method is designed to be overridden.
Returns true if data is available in the requested inbox.
Used by the main() method of a component to check an inbox for ready data.
Call this method to periodically check whether you've been sent any messages to deal with!
You are unlikely to want to override this method.
Stub method. This method is designed to be overridden.
Creates a linkage from one inbox/outbox to another.
-- source - a tuple (component, boxname) of where the link should start from -- sink - a tuple (component, boxname) of where the link should go to
Other optional arguments:
See Axon.Postoffice.link(...) for more information.
Override this method, writing your own main thread of control as an ordinary method. When the component is activated and the scheduler is running, this what gets executed.
Write it as an ordinary method. Because it is run in a separate thread, it can block.
If you do not override it, then a default main method exists instead that will:
Stub method. This method is designed to be overridden.
Pauses the thread and blocks - does not return until the something happens to re-awake it, or until it times out (if the optional timeout is specified)
Must only be called from within the main() method - ie. from within the separate thread.
Keyword arguments:
returns the first piece of data in the requested inbox.
Used by the main() method to recieve a message from the outside world. All comms goes via a named box/input queue
You will want to call this method to actually recieve messages you've been sent. You will want to check for new messages using dataReady first though.
You are unlikely to want to override this method.
appends message to the requested outbox.
Used by the main() method to send a message to the outside world. All comms goes via a named box/output queue
You will want to call this method to send messages.
Raises Axon.AxonExceptions.noSpaceInBox if this outbox is linked to a destination inbox that is full, or if your component is producing messages faster than Axon can pass them on.
You are unlikely to want to override this method.
Call this from main() to synchronise with the main scheduler's thread.
You may wish to do this to throttle your component's behaviour This is akin to posix.sched_yield or shoving extra "yield" statements into a component's generator.
Destroys all linkages to/from the specified component or destroys the specific linkage specified.
Only destroys linkage(s) that were created by this component itself.
Keyword arguments:
Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!
Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.
-- Automatic documentation generator, 09 Dec 2009 at 04:00:25 UTC/GMT