August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)

Axon.Ipc

IPC message classes

Some standard IPC messages used by Axon. The base class for all IPC classes is ipc.

Some purposes for which these can be used are described below. This is not exhaustive and does not cover all of the Ipc classes defined here!

Shutting down components

If you want to order a component to stop, most will do so if you send either of these ipc objects to their "control" inbox. When a component does stop, most send on the same message they received out of their "signal" outbox (or a message of their own creation if they decided to shutdown without external prompting)

Producer components, such as file readers or network connections will send a producerFinished() ipc object rather than shutdownMicroprocess() to indicate that the shutdown is due to them finishing producing data.

You can therefore link up components in a chain - linking "signal" outboxes to "control" inboxes to allow shutdown messages to cascade - making it easier to shutdown and clean up a system.

How should components behave when they receive either of these Ipc messages? In most cases, components simply shut down as soon as possible and send the same message on out of their "signal" outbox. However many components behave slightly more subtley to ensure the last few items of data passing throuhg a chain of components are not accidentally lost:

  • If the message is a producerFinished() message, then a component may wish to finish processing any data still left in its inboxes or internal buffers before terminating and passing on the producerFinished() message.
  • If the message is a shutdownMicroprocess() message, then a component should ideally try to terminate rather than finish what it is doing.

Many components therefore containg logic similar to this:

class MyComponent(Axon.Component.component):

    def main(self):

        while still got things to do and not received "shutdownMicroprocess":
            ..do things..
            ..check "control" inbox..
            yield 1

        if not received any shutdown message:
            self.send(Axon.Ipc.shutdownMicroprocess(), "signal")
        else:
            self.send(message received, "signal")

producerFinished() can be likened to notification of a clean shutdown - rather like a unix process closing its stdout file handle when it finishes. shutdownMicroproces() is more like a hard termination due to a system being interrupted.

Knock-on shutdowns between microprocesses

When a microprocess terminates, the scheduler calls its Axon.Microprocess.microprocess._closeDownMicroprocess() method. This method can return an Axon.Ipc.shutdownMicroprocess ipc object, for example:

def _closeDownMicroprocess(self):
    return Axon.Ipc.shutdownMicroprocess(anotherMicroprocess)

The scheduler will ensure that other microprocess is also shut down.

shutdown vs shutdownMicroprocess

You may notice that shutdownMicroprocess appears to be used for two purposes - knock-on shutdowns and signalling component shutdown.

Axon.Ipc.shutdown was originally intended to be used rather than Axon.Ipc.shutdownMicroprocess; however because most components support the latter (which was an accidental mistake) the latter should continue to be used.

Axon may at some stage make these two Ipc classes synonyms for each other to resolve this issue, but this decision has not been taken yet.

Setting off a new microprocess and waiting for it to complete

Used by:

A microprocess can yield a WaitComplete() Ipc message to the scheduler to ask for another microprocess to be started. When that second microprocess completes, the original one resumes - it waits until the second one completes.

This is a nice little way to sidestep the restriction in python that you can't nest yield statements for a given generator inside methods/functions it calls.

For example, here's a clean way to wait for data arriving at the "inbox" inbox of a component:

class MyComponent(Axon.Component.component):

    def main(self):
        ...
        yield WaitComplete(self.waitForInbox())
        msg = self.recv("inbox")

    def waitForInbox(self):
        while not self.dataReady("inbox"):
            yield 1

Internally, the scheduler uses Axon.Ipc.reactivate to ensure the original microprocess is resumed after the one that was launched terminates.

Test documentation

Tests passed:

  • errorInformation.__init__ - Takes the supplied caller, and creates an errorInformation object. Checks errorInformation object is an instance of ipc.
  • errorInformation.__init__ - An exception & message (any object) in addition to the caller to provide a more meaningful errorInformation message where appropriate. ttbw
  • errorInformation.__init__ - Called without arguments fails - must include caller.
  • ipc - Should be derived from object.
  • newComponent.__init__ - Groups all the arguments as a tuple of components that need to be activated/added to the run queue. Order is unimportant, scheduler doesn't care.
  • newComponent.__init__ - Should work without problems.
  • newComponent.components - Returns a tuple of components that need to be added to the run queue/activated. Same test as for __init__ as they are counterparts.
  • notify.__init__ - Creates a message from a specific caller with some data payload to notify part of the system of an event.
  • notify.__init__ - Called without arguments fails.
  • test_SmokeTest.__init__ - Creates a producerFinished message with specified caller & shutdown 'last' message.
  • producerFinished.__init__ - Called without arguments defaults to a caller of None, message of None. Checks producerFinished is a subclass of ipc
  • shutdownMicroprocess.__init__ - Treats all the arguments as a tuple of microprocesses that need to be shutdown.
  • shutdownMicroprocess.__init__ - Should work without problems.
  • shutdownMicroprocess.microprocesses- Returns the list of microprocesses that need to be shutdown. This is essentially the counterpart to the __init__ test.
  • status.__init__ - Stores the status message - for extraction by the recipient of the message. Checks object is instance of ipc.
  • status.__init__ - Called without arguments fails.
  • status.status - Returns the status message stored inside the status object. Counterpart to __init__ test.
  • wouldblock.__init__ - Stores the caller in the wouldblock message. Allows the scheduler to make a decision. Checks wouldblock is a subclass of ipc.
  • wouldblock.__init__ - Called without arguments fails.

Axon.Ipc.WaitComplete

class WaitComplete(ipc)

WaitComplete(generator) -> new WaitComplete object.

Message to ask the scheduler to temporarily suspect this microprocess and run a new one instead based on the generator provided; resuming the original when the new one completes.

Use within a microprocess by yielding one back to the scheduler.

Arguments:

  • the generator to be run as the separate microprocess

Methods defined here

__init__(self, *args, **argd)

Axon.Ipc.errorInformation

class errorInformation(ipc)

errorInformation(caller[,exception][,message]) -> new errorInformation ipc message.

A message to indicate that a non fatal error has occured in the component. It may skip processing errored data but should respond correctly to future messages.

Keyword arguments:

  • caller -- the source of the error information. Assigned to self.caller
  • exception -- Optional. None, or the exception that caused the error. Assigned to self.exception
  • message -- Optional. None, or a message describing the problem. Assigned to self.message

Methods defined here

__init__(self, caller[, exception][, message])

Axon.Ipc.ipc

class ipc(object)

Message base class

Axon.Ipc.newComponent

class newComponent(ipc)

newComponent(*components) -> new newComponent ipc message.

Message used to inform the scheduler of a new component that needs a thread of control and activating.

Use within a microprocess by yielding one back to the scheduler.

Arguments:

  • the components to be activated

Methods defined here

__init__(self, *components)

components(self)

Returns the list of components to be activated

Axon.Ipc.notify

class notify(ipc)

notify(caller,payload) -> new notify ipc message.

Message used to notify the system of an event. Subclass to implement your own specific notification messages.

Keyword arguments:

  • caller -- a reference to whoever/whatever issued this notification. Assigned to self.caller
  • payload -- any relevant payload relating to the notification. Assigned to self.object

Methods defined here

__init__(self, caller, payload)

Axon.Ipc.producerFinished

class producerFinished(ipc)

producerFinished([caller][,message]) -> new producerFinished ipc message.

Message to indicate that the producer has completed its work and will produce no more output. The receiver may wish to shutdown.

Keyword arguments:

  • caller -- Optional. None, or the producer who has finished. Assigned to self.caller
  • message -- Optional. None, or a message giving any relevant info. Assigned to self.message

Methods defined here

__init__(self[, caller][, message])

Axon.Ipc.reactivate

class reactivate(ipc)

reactivate(original) -> new reactivate ipc message.

Returned by Axon.Microprocess.microprocess._closeDownMicroprocess() to the scheduler to get another microprocess reactivated.

Keyword arguments:

  • original -- The original microprocess to be resumed. Assigned to self.original

Methods defined here

__init__(self, original)

Axon.Ipc.shutdown

class shutdown(ipc)

Message used to indicate that the component recieving it should shutdown.

Due to legacy mistakes, use shutdownMicroprocess instead.

Axon.Ipc.shutdownMicroprocess

class shutdownMicroprocess(ipc)

shutdownMicroprocess(*microprocesses) -> new shutdownMicroprocess ipc message.

Message used to indicate that the component recieving it should shutdown. Or to indicate to the scheduler a shutdown knockon from a terminating microprocess.

Arguments:

  • the microprocesses to be shut down (when used as a knockon)

Methods defined here

__init__(self, *microprocesses)

microprocesses(self)

Returns the list of microprocesses to be shut down

Axon.Ipc.status

class status(ipc)

status(status) -> new status ipc message.

General Status message.

Keyword arguments:

  • status -- the status.

Methods defined here

__init__(self, status)

status(self)

Returns what the status is

Axon.Ipc.wouldblock

class wouldblock(ipc)

wouldblock(caller) -> new wouldblock ipc message.

Message used to indicate to the scheduler that the system is likely to block now.

Keyword arguments:

  • caller -- who it is who is likely to block (presumably a microprocess). Assigned to self.caller

Methods defined here

__init__(self, caller)

Feedback

Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!

Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.

-- Automatic documentation generator, 09 Dec 2009 at 04:00:25 UTC/GMT