All computer source code presented on this page, unless it includes attribution to another author, is provided by Ed Halley under the Artistic License. Use such code freely and without any expectation of support. I would like to know if you make anything cool with the code, or need questions answered.
python/
    bindings.py
    boards.py
    buzz.py
    caches.py
    cards.py
    constraints.py
    csql.py
    english.py
    getch.py
    getopts.py
    gizmos.py
    goals.py
    improv.py
    interpolations.py
    namespaces.py
    nihongo.py
    nodes.py
    octalplus.py
    patterns.py
    physics.py
    pids.py
    pieces.py
    quizzes.py
    recipes.py
    relays.py
    romaji.py
    ropen.py
    sheets.py
    stores.py
    strokes.py
    subscriptions.py
    svgbuild.py
    testing.py
    things.py
    timing.py
    ucsv.py
    useful.py
    uuid.py
    vectors.py
    weighted.py
java/
    CSVReader.java
    CSVWriter.java
    GlobFilenameFilter.java
    RegexFilenameFilter.java
    StringBufferOutputStream.java
    ThreadSet.java
    Throttle.java
    TracingThread.java
    Utf8ConsoleTest.java
    droid/
        ArrangeViewsTouchListener.java
        DownloadFileTask.java
perl/
    CVQM.pm
    Kana.pm
    Typo.pm
cxx/
    CCache.h
    equalish.cpp
Download nodes.py
# nodes - a dataflow model of processing

'''

The nodes module provides a dataflow model of processing.

ABSTRACT

    Each node consists of a set of data inputs called "sources," a set of
    data outputs called "results," and a processing routine which can be
    executed whenever data is available.

    A process is a collection of nodes, with pipeline connections wired
    up between each node, from the results of one node to the sources of
    others.  The process can be stored as a simple configuration list
    that describes how the individual nodes are organized.

AUTHOR

    Ed Halley (ed@halley.cc) 31 March 2007

'''

import subscriptions ; from subscriptions import Subscription

#----------------------------------------------------------------------------

class Node (object):

    '''A node is an *instance* of a given computational component type,
    an instance as used at a given place within a larger system of parts.
    It has a number of ports by internally unique names.  Source ports
    expect a certain data type.  Results ports publish a certain data type
    (or an exception object in cases of failure).  When executed, data is
    parked on the results ports and can be collected at any time.'''

    #  +---------------+
    # (s)              |
    #  |              (r)
    # (s)              |
    #  +---------------+

    import namespaces
    names = namespaces.Namespace()

    SOURCE = -1
    RESULT = +1

    DEBUG = False

    def debug(self): return Node.DEBUG or self._debug

    #---------------
    # Infrastructure

    def __init__(self):
        super(Node, self).__init__()
        self.name = None
        self.rename(self.__class__.__name__)
        self.parent = None
        self.sources = {}
        self.results = {}
        self.terms = {}
        self.ports = {}
        self.gender = {}
        self.first = True
        self._debug = False

    def __str__(self):
        return self.name

    def __repr__(self):
        me = self.__class__.__name__ + '():'
        if self.name != self.__class__.__name__:
            me += '\n\t.name = ' + self.name
        if self.parent is not None:
            me += '\n\t.parent = ' + self.parent
        if self.ports:
            me += '\n\t.ports = {'
            for each in self.ports:
                kind = self.ports[each]
                if isinstance(kind, type):
                    kind = kind.__name__
                gender = self.gender[each]
                gender = { Node.SOURCE: 'source', Node.RESULT: 'result' }[gender]
                me += '\n\t\t:%s [%s] (%s)' % (each, gender, kind)
            me += '\n\t\t}'
        return me

    #-----------------------------------------
    # Defining Names for Self, Terms and Ports

    def rename(self, name):
        if Node.names.object(name) is self:
            return name
        if self.name:
            Node.names.unregister(self, self.name)
        self.name = Node.names.register(self, name)
        return self.name

    def term(self, name, value=None):
        if value is not None:
            self.terms[name] = value
        if name in self.terms:
            return self.terms[name]
        if self.parent:
            parent = Node.names.object(self.parent)
            return parent.term(name)
        return None

    def port(self, name, gender=None, type=object, subscription=None):
        if gender is None:
            if name in self.gender:
                return self.gender[name]
            return None
        if not gender in [Node.RESULT, Node.SOURCE]:
            raise ValueError('Port must be of type RESULT or SOURCE.')
        if name in self.ports:
            if type is not self.ports[name] or gender != self.gender[name]:
                raise ValueError('Cannot redefine port type after creation.')
            return
        self.ports[name] = type
        self.gender[name] = gender
        self.results[name] = None
        if gender == Node.RESULT:
            if subscription is not None:
                if not isinstance(subscription, Subscription):
                    subscription = Subscription()
                self.results[name] = subscription

    #--------------------------------
    # Support for Container/Parentage

    def contains(self, node):
        '''Returns False because plain Nodes cannot contain children.
        '''
        return False

    def within(self, node):
        '''Returns True iff the given node is anywhere up our parent chain.
        '''
        if self.parent == self.name:
            return True
        if self.parent:
            parent = Node.names.object(self.parent)
            return parent.within(node)
        return False

    #-----------------------------
    # General Processing Callbacks

    def init(self):
        pass

    def process(self):
        return True

    def reset(self):
        pass

    #--------------------
    # Process Port Access

    def peek(self, port, count):
        if port in self.sources:
            if self.ready(port, count):
                peeks = self.sources[port].peek(count)
                return peeks
        return None

    def receive(self, port):
        received = None
        if port in self.sources:
            received = self.sources[port]
            if isinstance(received, Subscription):
                received = received.receive(self.name+':'+port)
        return received

    def publish(self, port, product):
        if port in self.results:
            if not isinstance(product, self.ports[port]):
                raise TypeError('Cannot publish a %s, expected a %s.' %
                                ( type(product).__name__,
                                  self.ports[port].__name__ ))
            subscription = self.results[port]
            if isinstance(subscription, Subscription):
                product = subscription.publish(product)
            else:
                self.results[port] = product
        return product

    def flush(self):
        for port in self.results:
            if not isinstance(self.results[port], Subscription):
                self.results[port] = None

    #-----------------------------------
    # Processing and Port Infrastructure

    def setup(self, sources={}):
        if self.first:
            self.init()
            self.first = False
        self.sources = {}
        for each in self.ports:
            if self.gender[each] == Node.SOURCE:
                self.sources[each] = None
            if self.gender[each] == Node.RESULT:
                if not each in self.results:
                    self.results[each] = None
                elif not isinstance(self.results[each], Subscription):
                    self.results[each] = None
        for each in sources:
            if each in self.ports:
                if self.gender[each] == Node.SOURCE:
                    self.sources[each] = sources[each]
                if isinstance(self.sources[each], Subscription):
                    self.sources[each].register(self.name+':'+each)

    def ready(self, port=None, count=1):
        if port is None:
            for port in self.ports:
                if self.gender[port] == Node.SOURCE:
                    if not self.ready(port=port, count=count):
                        return False
            return True
        if isinstance(self.sources[port], Subscription):
            if not self.sources[port].ready(self.name+':'+port):
                return False
        return True

    def go(self, sources={}):
        if self.debug():
            print "%s{'%s'}.go(...)" % (self.__class__.__name__, self.name)
        self.setup(sources)
        if not self.ready():
            return None
        if not self.process():
            return None
        return self.results

#----------------------------------------------------------------------------

class Compound (Node):

    '''A compound node may have children nodes joined into it. There is
    no particular connectivity enforced internally; extensions to this
    class may combine, compare or filter the results internally as
    desired.  The compound node can execute the processing capability for
    any child.
    '''

    #  +---------------+
    # (s)              |
    #  |  +---+       (r)
    #  | (s) (r)       |
    #  |  +---+        |
    #  |        +---+  |
    #  |       (s) (r) |
    #  |        +---+  |
    #  |               |
    #  +---------------+

    def __init__(self):
        super(Compound, self).__init__()
        self.children = set({})

    def __repr__(self):
        me = super(Compound, self).__repr__()
        if self.children:
            me += '\n\t.children = [' + ', '.join(self.children) + ']'
        return me

    def child(self, node):
        '''Given a name or a Node instance which is an immediate child,
        returns the Node instance.  Returns None if the given argument is
        not an immediate child.
        '''
        if node in self.children:
            return Node.names.object(node)
        if isinstance(node, Node):
            if node.name in self.children:
                return node
        return None

    def contains(self, node):
        '''Returns True iff the given node is one of our immediate children.
        '''
        return str(node) in self.children

    def enjoin(self, node):
        if isinstance(node, list) or isinstance(node, tuple):
            nodes = node
            for each in nodes:
                self.enjoin(each)
            return nodes
        node = Node.names.object(str(node))
        if not isinstance(node, Node):
            raise TypeError('Must give a Node instance.')
        if node is self:
            raise ValueError('Cannot join a node into itself.')
        if self.within(node):
            raise ValueError('The node cannot be joined within itself.')
        if node.parent == self.name:
            return node
        if node.parent:
            raise ValueError('The node is already enjoined elsewhere.')
        node.parent = self.name
        self.children.add(node.name)
        if node.first:
            node.init()
            node.first = False
        return node

    def unjoin(self, node):
        if not self.contains(node):
            raise ValueError('The given node is not an immediate child.')
        node = Node.names.object(str(node))
        self.children.discard(node.name)
        node.parent = None

    def execute(self, node, sources={}):
        node = self.child(node)
        if not node:
            raise ValueError('Can only execute an immediate child node.')
        return node.go(sources)

    def reset(self):
        super(Compound, self).reset()
        for each in self.children:
            node = self.child(each)
            node.reset()
            node.flush()

#----------------------------------------------------------------------------

class Process (Compound):

    '''A process is a compound node, in which a number of internal nodes
    are executed to develop an ultimate set of results.  The internal
    nodes are wired with a directed acyclic graph of connections, and
    only those subnodes which contribute to the desired results are
    executed.'''

    #  +---------------------+
    # (s)-+                  |
    #  |  |  +---+        +-(r)
    #  |  +-(s) (r)+      |  |
    #  |     +---+ |      |  |
    #  |          /       |  |
    #  |         | +---+  |  |
    #  |         +(s) (r)-+  |
    #  |           +---+     |
    #  |                     |
    #  +---------------------+

    def __init__(self):
        super(Process, self).__init__()
        self.connections = set({})

    def __repr__(self):
        me = super(Process, self).__repr__()
        if self.connections:
            me += '\n\t.connections = ['
            for each in self.connections:
                (upstream, result, downstream, source) = each
                upstream = str(upstream)
                downstream = str(downstream)
                me += ( '\n\t\t(%s:%s -> %s:%s),' %
                        (upstream, result, downstream, source) )
            me += '\n\t\t]'
        return me

    #-------------------------------
    # Relationships between Children

    def upstream(self, node, deep=False):
        '''Returns a list of nodes upstream of the given node.'''
        node = str(node)
        uplinks = [ link[0] for link in self.connections if link[2] == node ]
        if deep:
            deep = uplinks[:]
            for other in deep:
                uplinks.extend(self.upstream(other, True))
        return uplinks

    def downstream(self, node, deep=False):
        '''Returns a list of nodes downstream of the given node.'''
        node = str(node)
        uplinks = [ link[2] for link in self.connections if link[0] == node ]
        if deep:
            deep = uplinks[:]
            for other in deep:
                uplinks.extend(self.upstream(other, True))
        return uplinks

    def original(self, node):
        '''Checks if a given node is original, or has no upstream ports.'''
        node = self.child(node)
        if not node.ports:
            return True
        if not self.upstream(node):
            return True
        return False

    def originals(self):
        '''Returns a list of our nodes which are considered original.'''
        return [ node for node in self.children if self.original(node) ]

    def terminal(self, node):
        '''Checks if a given node is terminal, or has no downstream ports.'''
        node = self.child(node)
        if not node.ports:
            return True
        if not self.downstream(node):
            return True
        return False

    def terminals(self):
        '''Returns a list of our nodes which are considered terminal.'''
        return [ node for node in self.children if self.terminal(node) ]

    #------------------------
    # Connecting the Children

    def connected(self, upstream, result, downstream, source):
        '''Checks if a given connection already exists.'''
        upstream = str(upstream)
        downstream = str(downstream)
        return (upstream, result, downstream, source) in self.connections

    def connect(self, upstream, result, downstream, source):
        '''Adds a connection from any port to another compatible port.
        A results port can feed into any source port of the same type, as
        long as it does not publish a loop (cycle) in the graph of
        connections. Redundant identical connections are ignored.'''
        upstream = self.child(upstream)
        downstream = self.child(downstream)
        # every possible reason you can't connect, from easiest to hardest
        if upstream is downstream:
            raise ValueError('A node cannot source from its own results.')
        if not self.contains(upstream):
            raise ValueError('Upstream must be a Node instance or name.')
        if not upstream.port(result):
            raise ValueError('Upstream node needs port "%s".' % result)
        if upstream.gender[result] != Node.RESULT:
            raise ValueError('Upstream port "%s" must be output.' % result)
        if not self.contains(downstream):
            raise ValueError('Downstream must be a Node instance or name.')
        if not downstream.port(source):
            raise ValueError('Downstream node needs port "%s".' % source)
        if downstream.gender[source] != Node.SOURCE:
            raise ValueError('Downstream port "%s" must be input.' % source)
        if upstream.ports[result] != downstream.ports[source]:
            if True: # if there's no type conversion from source to result,
                raise ValueError('Result type conflicts with source type.')
        if downstream.name in self.upstream(upstream):
            raise ValueError('A node result cannot cycle to an upstream node.')
        link = (upstream.name, result, downstream.name, source)
        for c in self.connections:
            if c[2:3] == link[2:3] and c[0:1] != link[0:1]:
                raise ValueError('Downstream port "%s" already connected.' %
                                 source)
        if result in upstream.results:
            if isinstance(upstream.results[result], Subscription):
                upstream.results[result].register(link[2]+':'+link[3])
        self.connections.add(link)
        return True

    def disconnect(self, upstream, result, downstream, source):
        '''Removes any existing connection from one port to another.'''
        upstream = self.child(upstream)
        downstream = self.child(downstream)
        link = (upstream.name, result, downstream.name, source)
        if link in self.connections:
            self.connections.remove(link)
        return True

    #-------------------------
    # Connections Data Caching

    def feed(self, node):
        '''Gathers available data from connections destined for the
        source ports of a given node.  Returns a sources={} dict if all
        of the source ports have data ready.  Returns None if the node is
        not yet ready after collecting all available data.
        '''
        node = self.child(node)
        sources = {}
        #TODO: propagate from our own source ports to our children's sources
        pass
        # propagate from upstream connections
        connects = [ c for c in self.connections if c[2] == node.name ]
        for c in connects:
            (upstream, result, downstream, source) = c
            upstream = self.child(upstream)
            if result in upstream.results:
                product = upstream.results[result]
                if product is None:
                    return None
                if isinstance(product, Subscription):
                    if not product.ready(downstream+':'+source):
                        return None
                sources[source] = product
        # see if we're 100% ready
        for port in node.ports:
            if node.gender[port] != Node.SOURCE: continue
            if not port in sources:
                return None
        return sources

    #----------------
    # Main Processing

    def process(self, targets=[], drain=False):
        '''Processes those nodes required to fulfill the targets.'''
        # figure out targets (result-most nodes that will demand data)
        if not targets:
            targets = self.terminals()
        if not targets:
            targets = self.children.keys()
        for i in range(len(targets)):
            if hasattr(targets[i], 'name'):
                targets[i] = targets[i].name
        # all targets and their upstreams are pending
        pending = {}
        for each in targets:
            pending[each] = True
            nodes = self.upstream(each, True)
            for node in nodes:
                pending[node] = True
                self.child(node).flush()
        if drain:
            for each in pending.keys():
                if self.original(each):
                    del pending[each]
        if self.debug():
            print 'Process.process() will try %s' % repr(pending.keys())
        # only subscriptions persist in the connection data
        sources = self.sources
        # while there is input available,
        if self.ready():
            # try to give every pending node a nudge
            while pending:
                progress = False
                for each in pending.keys():
                    # if the node can process, do so
                    sources = self.feed(each)
                    if sources is None:
                        if self.debug():
                            print 'Node "%s" not fed, must retry it.' % each
                        continue
                    results = self.execute(each, sources)
                    if results is None:
                        if self.debug():
                            print 'Node "%s" said to fail.' % each
                        continue
                    if self.debug():
                        print 'Node "%s" succeeded.' % each
                    del pending[each]
                    progress = True
                if not progress:
                    break
        # if any of our targets are stalled, we are stalled
        if pending:
            if self.debug():
                print ( 'Still had %s for pending; stalled for more data.' %
                        pending.keys() )
            return False
        # for each original target, post the output as results
        for each in targets:
            pass
        if self.debug():
            print 'Process.process() success'
        return True

    #----------------
    # Other Overrides

    def unjoin(self, node):
        '''Disconnects and removes a child node from this process node.'''
        node = str(node)
        if not self.contains(node):
            raise ValueError('The node is not joined to this process.')
        for c in self.connections.keys():
            if c[0] == node or c[2] == node:
                del self.connections[c]
        super(Process).unjoin(self, node)

#----------------------------------------------------------------------------

if __name__ == "__main__":

    # Unit tests are run by executing this module directly on command-line.
    # Tests are successful unless you see a python traceback in the output.

    class _Filter (Node):
        def process(self):
            print '_Filter.process()'
            value = self.receive('in')
            self.publish('out', value + 1.1)
            return True

    # construction tests

    a = _Filter() ; a.rename("A")
    a.port("out", Node.RESULT, float)
    a.port("in", Node.SOURCE, float)
    if a.port("out"): print "A has a port 'out'"
    print repr(a)

    b = Compound() ; b.rename("B")
    b.port("out", Node.RESULT, float)
    b.port("in", Node.SOURCE, float)
    if b.port("in"): print "B has a port 'in'"
    b.port("snork", Node.SOURCE, float)
    if b.port("snork"): print "B has a port 'snork'"
    print repr(b)

    c = Node() ; c.rename("C")
    c.port("out", Node.RESULT, float)
    c.port("in", Node.SOURCE, float)
    if c.port("out"): print "C has a port 'out'"
    print repr(c)

    p = Process() ; p.rename("P")
    if not p.contains(a): print "P does not contain A"

    p.enjoin(a)
    if p.contains(a): print "P contains A"

    p.enjoin(b)
    if p.contains(b): print "P contains B"

    p.enjoin(c)
    if p.contains(c): print "P contains C"

    if not a.contains(b): print "A does not contain B"
    if not b.contains(a): print "B does not contain A"

    # connections

    try: p.connect(a,'nonexistant',a,'nonexistant')
    except Exception, detail: print detail

    try: p.connect(a,'nonexistant',b,'nonexistant')
    except Exception, detail: print detail

    try: p.connect(a,'out',b,'nonexistant')
    except Exception, detail: print detail

    try: p.connect(a,'out',a,'out')
    except Exception, detail: print detail

    try: p.connect(a,'out',a,'in')
    except Exception, detail: print detail

    try: p.connect(a,'out',b,'out')
    except Exception, detail: print detail
    print repr(p)

    p.connect(a,'out',b,'in')
    if p.connected(a,'out',b,'in'): print "A.out -> B.in"

    try: p.connect(c,'out',b,'in')
    except Exception, detail: print detail

    try: b.join(p)
    except Exception, detail: print detail

    p.connect(a,'out',c,'in')
    if p.connected(a,'out',c,'in'): print "A.out -> C.in"

    print "Upstream from A is", p.upstream(a)
    print "Downstream from A is", p.downstream(a)

    print "Upstream from B is", p.upstream(b)
    print "Downstream from B is", p.downstream(b)

    print repr(p)

    # single-node processing

    print '====='

    a = _Filter() ; a.rename("A")
    a.port("out", Node.RESULT, float, subscription=False)
    a.port("in", Node.SOURCE, float)
    results = a.go({'in': 2.2})
    print repr(results)

    print '--'

    a = _Filter() ; a.rename("A")
    a.port("out", Node.RESULT, float, subscription=True)
    a.port("in", Node.SOURCE, float)
    data = Subscription()
    data.publish(1.1)
    data.publish(2.2)

    results = a.go({'in': data})
    print repr(results)
    
    results = a.go({'in': data})
    print repr(results)

    # processing a simple A->B process with a non-subscription connection

    print '====='

    class _GeneratorFlat (Node):
        def init(self):
            self.port('out', Node.RESULT, float)
            self.foo = 1.0
        def process(self):
            print '_GeneratorFlat.process()'
            self.publish('out', self.foo)
            print '  _generator > ', self.foo
            self.foo += 1.0
            return True

    class _Reporter (Node):
        def init(self):
            self.port('in', Node.SOURCE, float)
        def process(self):
            print '_Reporter.process()'
            self.foo = self.receive('in')
            print '> _reporter    ', self.foo
            return True

    p = Process()

    a = _GeneratorFlat()
    p.enjoin(a)

    b = _Reporter()
    p.enjoin(b)

    p.connect(a, 'out', b, 'in')

    print repr(p)

    p.process()
    print '--'
    p.process()

    if b.foo != 2.0:
        raise ValueError('final test did not pipeline a 2.0 result')

    # processing a simple A->B process with a subscription connection

    print '====='

    class _Generator (Node):
        def init(self):
            self.port('out', Node.RESULT, float, subscription=True)
            self.foo = 1.0
        def process(self):
            print '_Generator.process()'
            self.publish('out', self.foo)
            print '  _generator > ', self.foo
            self.foo += 1.0
            return True

    p = Process()

    a = _Generator()
    p.enjoin(a)

    b = _Reporter()
    p.enjoin(b)

    p.connect(a, 'out', b, 'in')

    print repr(p)

    p.process()
    print '--'
    p.process()

    if b.foo != 2.0:
        raise ValueError('final test did not pipeline a 2.0 result')

    # processing a fork with subscription connections

    print '====='

    p = Process()

    a = _Generator()
    p.enjoin(a)

    b = _Reporter()
    p.enjoin(b)

    c = _Reporter()
    p.enjoin(c)

    p.connect(a, 'out', b, 'in')
    p.connect(a, 'out', c, 'in')

    print repr(p)
    print '--'

    p.process()
    print '--'
    p.process()

    if b.foo != 2.0:
        raise ValueError('final test on node B did not get a 2.0 result')

    if c.foo != 2.0:
        raise ValueError('final test on node C did not get a 2.0 result')



Contact Ed Halley by email at ed@halley.cc.
Text, code, layout and artwork are Copyright © 1996-2013 Ed Halley.
Copying in whole or in part, with author attribution, is expressly allowed.
Any references to trademarks are illustrative and are controlled by their respective owners.
Make donations with PayPal - it's fast, free and secure!