|
Programmer's Notebook |
All computer source code presented on this page, unless it includes attribution to another author, is provided by Ed Halley under the Artistic License. Use such code freely and without any expectation of support. I would like to know if you make anything cool with the code, or need questions answered.
python/ bindings.py boards.py buzz.py cache.py cards.py constraints.py csql.py english.py getopts.py gizmos.py goals.py improv.py interpolations.py namespaces.py nihongo.py nodes.py octalplus.py patterns.py persist.py physics.py pids.py pieces.py quizzes.py recipes.py relays.py romaji.py ropen.py sheets.py strokes.py subscriptions.py svgbuild.py testing.py things.py timing.py ucsv.py useful.py uuid.py vectors.py weighted.py java/ CSVReader.java CSVWriter.java GlobFilenameFilter.java RegexFilenameFilter.java StringBufferOutputStream.java ThreadSet.java Throttle.java TracingThread.java Utf8ConsoleTest.java droid/ ArrangeViewsTouchListener.java DownloadFileTask.java perl/ CVQM.pm Kana.pm Typo.pm cxx/ CCache.h equalish.cpp |
Download relays.py
|
#!python import cPickle as pickle import SocketServer import threading import traceback import logging import socket import struct import time import sys import re #---------------------------------------------------------------------------- class Relay (object): '''A general-purpose non-topological TCP/IP network message relay node. Unlike most network code which focuses on differences between "servers" and "clients," the Relay mechanism just refers to generic nodes throughout the network. These relay nodes can be arranged in any sort of topology with simple routing rules. Any relay is a server to many clients; any relay is a client of many servers; a relay can be both server and client at the same time. From the point of view of any individual Relay, there are two kinds of connections: connections we initiated, and connections we accepted. Those that we initiate, we consider to be "above" us. The ones we have accepted are "below" us. The routing rules use these concepts to specify how packets are handled. A Relay has a default packet-protocol format, which can be changed to any one of several built-in formats. Custom formatting routines can also be supplied for special or external network datagram formats. ''' FREE = 0 LINES = 1 N8LEN = 2 N16LEN = 3 N32LEN = 4 NVLEN = 5 PICKLE = 6 class Conduit (SocketServer.BaseRequestHandler): '''Internal packet queuing and parsing management mechanism.''' # # Not a lot of doc-strings, as Conduit is meant for internal use. # # The python libs call it a 'request' but it's basically an open # socket session, from connection accepted to connection closed. # # This represents a handler for any socket session in a Relay, # providing the queuing of incoming raw data into application # packets, and allowing multiple threads to post output packets # to send. # def setup(self): # Called once by our base class to perform configuration. self.server.relay.conduits[self] = self.client_address self.server.relay.addresses[self.client_address] = self self.mode(self.server.relay.packet) self.pending = threading.Event() self.since = time.time() self.queue = [] self.data = '' self.icount = 0 self.ocount = 0 self.quit = False def handle(self): # Called once by our base class to perform the whole life cycle # of the open socket session. We make a separate thread for # our transmitting duties, so transmitting and receiving are # fully asynchronous. self.transmitting = threading.Thread(target=self.transmitting) self.transmitting.setDaemon(True) self.transmitting.start() self.receiving() self.shutdown() def mode(self, packet=None, parse=None, make=None): # Set up the available parsing options that can recognize # valid raw data as application packets. An extension to # this class may replace or extend this parsing process. parsers = [ parse_free, parse_line, parse_n8len, parse_n16len, parse_n32len, parse_nVlen, parse_pickle ] makers = [ make_free, make_line, make_n8len, make_n16len, make_n32len, make_nVlen, make_pickle ] if parse is None and packet >= 0 and packet < len(parsers): parse = parsers[packet] if make is None and packet >= 0 and packet < len(makers): make = makers[packet] if parse: self.parse = parse if make: self.make = make def receiving(self): # Called by handle() in the main thread. # Whenever a chunk arrives, we see if that completes a packet. # Whenever a packet successfully parses, we view() it. talk = self.request while not self.quit: chunk = talk.recv(4096) # r = repr(chunk) # if len(r) > 20: r = r[:17] + '...' + r[-1:] if len(chunk) == 0: break self.data += chunk try: (packet, self.data) = self.parse(self.data) while packet is not None: self.view(packet) (packet, self.data) = self.parse(self.data) except: address = self.client_address _LOG.error('Malformed packet from %s:%d' % address) _LOG.trace() self.shutdown() def transmitting(self): # Called by handle() as a separate thread. # We just watch our pending queue and send whatever is posted. talk = self.request while not self.quit: self.pending.wait() self.pending.clear() while self.queue: packet = self.queue.pop(0) try: talk.send(packet) except: self.shutdown() def shutdown(self): # Orderly one-step shutdown including our transmit thread. self.quit = True try: self.request.close() except: pass self.pending.set() if self.client_address: address = self.client_address self.client_address = None self.server.relay.kill(self) def view(self, packet): # Called whenever we successfully parse raw data into a packet. r = repr(packet) if len(r) > 20: r = r[:17] + '...' + r[-1:] self.icount += 1 self.server.incoming.append( (self.client_address, packet) ) self.server.pending.set() def stats(self): # Some debugging or status information formatted for display. life = time.time() - self.since text = 'i=%d, o=%d, l=%.1f' % (self.icount, self.ocount, life) if life >= 1.0: text += ', r=%.1f/sec' % ((self.icount+self.ocount) / life) return text def post(self, packet): # Add a packet to be sent in turn. self.ocount += 1 self.queue.append(self.make(packet)) self.pending.set() class RelayServer (SocketServer.ThreadingTCPServer): '''Internal listening socket and thread mechanism.''' # # Not a lot of doc-strings, as Conduit is meant for internal use. # # We add the concept of 'jilted' or banned addresses. A Relay # can check the list of jilts to see whether a connection should # be accepted to form a new connection (a Conduit). # # We support logging of opened/closed connections. # allow_reuse_address = 1 # The TCPServer should use socket.SO_REUSEADDR before bind(). # def __init__(self, address, handler, relay): SocketServer.TCPServer.__init__(self, address, handler) self.daemon_threads = True self.relay = relay self.quit = False self.incoming = [] self.outgoing = [] self.pending = threading.Event() def server_close(self): self.quit = True SocketServer.ThreadingTCPServer.server_close(self) shutdown = server_close def get_request(self): # Retrieve the 'request' (the connect session, or Conduit). (request, address) = SocketServer.ThreadingTCPServer.get_request(self) return (request, address) def verify_request(self, request, address): # New request to connect. Check if we accept it. if self.relay.jilted(address): _LOG.warning('Connection refused from %s:%d' % address) return False _LOG.info('Connection accepted from %s:%d' % address) self.relay.below.add(address) return True def finish_request(self, request, address): # Actually create the request Conduit. It self-attaches. handler = self.RequestHandlerClass(request, address, self) def handle_error(self, request, address): _LOG.error('Exception handling message from %s:%d' % address) _LOG.trace() def listening(self): # Our lifecycle is a simple loop to listen for new connections. while not self.quit: try: self.handle_request() except: pass def routing(self): while not self.quit: self.pending.wait() self.pending.clear() while self.incoming: self.relay.route_incoming(self.incoming.pop(0)) while self.outgoing: self.relay.route_outgoing(self.outgoing.pop(0)) def __init__(self, host='localhost', port=4567, packet=LINES): '''Construct a generic message relay node.''' self.address = (host, port) self.server = Relay.RelayServer(self.address, Relay.Conduit, self) self.jilts = set([]) # 'hostname' we do not accept contact self.jiltseen = set([]) # temp 'hostname' we did not accept contact self.jiltmasks = set([]) # 'host*' masks we do not accept contact self.above = set([]) # (addr,port) self.below = set([]) # (addr,port) self.conduits = {} # conduit: (addr,port) self.addresses = {} # (addr,port): conduit self.packet = packet # initial assumed packet mode being received self.listening = threading.Thread(target=self.server.listening) self.listening.setDaemon(True) self.listening.start() self.routing = threading.Thread(target=self.server.routing) self.routing.setDaemon(True) self.routing.start() _LOG.info('Startup; now listening at %s:%d' % self.address) def shutdown(self): '''Disconnect from everything and terminate our thread.''' _LOG.info('Shutdown; not listening at %s:%d' % self.address) for conduit in self.conduits.keys(): self.kill(conduit) if self.server: self.server.shutdown() _LOG.info('Shutdown complete') def find(self, target): '''Given either an address or conduit, return a tuple with both.''' conduit = address = None if target in self.conduits: conduit = target address = self.conduits[conduit] if target in self.addresses: address = target conduit = self.addresses[address] for conduit in self.conduits: if conduit.request is target: return (conduit, conduit.client_address) return (conduit, address) def route_incoming(self, envelope): '''The incoming envelope shows the conduit delivering a message.''' (source, message) = envelope self.view(source, message) def route_outgoing(self, envelope): '''Outgoing messages must be propagated.''' (targets, message) = envelope if isinstance(targets, list): for target in targets: (conduit, address) = self.find(target) if conduit: conduit.post(message) return (conduit, address) = self.find(targets) if conduit: conduit.post(message) def jilted(self, address): '''Returns True if calls from the address should not be accepted.''' if isinstance(address, tuple): address = address[0] if address in self.jilts or address in self.jiltseen: return True for mask in self.jiltmasks: if _mask_match(address, mask): self.jiltseen.add(address) return True return False def jilt(self, address): '''Add a new hostname or hostmask to refuse all incoming contact. Hostnames can be specific DNS or IPv4/IPv6 addresses. Hostmasks can use '*' wildcards as well. ''' if isinstance(address, tuple): address = address[0] if '*' in address: self.jiltmasks.add(address) else: self.jilts.add(address) def unjilt(self, address): '''Remove a hostname or hostmask from the set of jilted addresses.''' if isinstance(address, tuple): address = address[0] if address in self.jilts: self.jilts.remove(address) if address in self.jiltseen: self.jiltseen.remove(address) for mask in self.jiltmasks.keys(): within = mask.replace('*', '#') if _mask_match(within, address): self.jiltmasks.remove(mask) def kill(self, conduit): '''If the address is connected, disconnect immediately.''' if not conduit in self.conduits: return address = self.conduits[conduit] del self.conduits[conduit] del self.addresses[address] if address in self.above: self.above.remove(address) if address in self.below: self.below.remove(address) _LOG.info('Disconnecting %s:%d (%s)' % (address[0], address[1], conduit.stats())) conduit.shutdown() def call(self, address, packet=None): '''Initiate contact with a higher Relay or another plain socket. The first argument is an address tuple of the following form: ('hostname', port) The second argument is optional, and will set the packet forming/parsing mode for the created conduit. ''' (conduit, addressed) = self.find(address) if conduit: self.kill(conduit) if packet is None: packet = self.packet _LOG.info('Calling %s:%d...' % address) talk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) talk.connect(address) self.above.add(address) self.server.process_request(talk, address) _LOG.info('Contacted peer at %s:%d' % address) def mode(self, target=None, packet=None, parse=None, make=None): '''Adjust the packet parsing and/or making mode. The first argument is either a conduit or an address tuple. To apply the same packet mode to all currently connected conduits, supply target=None. Packet modes may be any of the existing values: FREE, LINES, N8LEN, N16LEN, N32LEN, NVLEN, PICKLE A custom packet parsing routine can be supplied with parse=P. The def P(data) should take one datastring argument, and return a two-element tuple (deserialized_value, remainder_of_datastring). If the datastring does not contain enough data to deserialize a value, return (None, whole_unmodified_datastring). A custom packet making routine can be supplied with make=M. The def M(value) should take one value argument, and return the serialized datastring that can be parsed by the remote system. ''' if target is None: for conduit in self.conduits: if not conduit: break self.mode(target=conduit, packet=packet, parse=parse, make=make) (conduit, address) = self.find(target) if conduit: conduit.mode(packet=packet, parse=parse, make=make) def post(self, targets, message): '''Accepts a new message into the system. The first argument is a single target which must already have been connected. This can be a conduit or an address tuple. Alternatively, this argument can be a list of valid targets. Each target will receive an identical copy of the message. The second argument is the actual data to be made into a data packet using the currently selected packet making routine. ''' _LOG.debug('Posting to %r: %r' % (targets, message)) self.server.outgoing.append( (targets, message) ) self.server.pending.set() def view(self, source, message): '''Called when receiving a message. This method is overridable. For many Relay-based applications, this may be the only method you need to override. The "source" argument refers to which conduit of this Relay delivered the message. You can reply to the same conduit by using that source value as the target in the post() method. For example, a simple echo server would perform a call like this: self.post(source, message) The "message" argument is the actual contents of the message. It is the result returned from the selected packet parsing routine, so there is no need to manage or understand many underlying protocol details. Any other tracking information or meta-routing data is not supported by Relay itself, and must be encapsulated within the message. ''' _LOG.debug('Message from %r: %r' % (source, message)) pass #---------------------------------------------------------------------------- def parse_free(data): '''Passes all available data as a single packet.''' return (data, '') def parse_line(data): r'''Parses normal ASCII/UTF-8 lines with terminating CRLF ('\r\n'). Any embedded extra solitary CR or LF characters are ignored and passed as part of the packet. ''' packet = data.find('\r\n') if packet < 0: return (None, data) packet += len('\r\n') return (data[:packet], data[packet:]) def parse_n8len(data): '''Parses data packets preceded by 8-bit unsigned packet size.''' if len(data) < 1: return (None, data) packet = ord(data[0]) + 1 if len(data) < packet: return (None, data) return (data[1:packet], data[packet:]) def parse_n16len(data): '''Parses data packets preceded by 16-bit big-endian packet size.''' if len(data) < 2: return (None, data) packet = struct.unpack('!H', data)[0] + 2 if len(data) < packet: return (None, data) return (data[2:packet], data[packet:]) def parse_n32len(data): '''Parses data packets preceded by 32-bit big-endian packet size.''' if len(data) < 4: return (None, data) packet = struct.unpack('!L', data)[0] + 4 if len(data) < packet: return (None, data) return (data[4:packet], data[packet:]) def parse_nVlen(data): '''Parses data packets preceded by variable big-endian packet size. The packet size is broken into 7-bit pieces. The most-significant 7-bits is put into the first byte, and the high bit is set if there are more bytes required to describe the packet size. This minimizes overhead of packet size descriptors while not limiting packet size. ''' v = 1 packet = 0 while data and ord(data[0]) == 128: data = data[1:] while True: if len(data) < v: return (None, data) byte = ord(data[v-1]) packet = (packet << 7) | (byte & 0x7F) if 0 == (byte & 0x80): break v += 1 if v > 6: raise ValueError, 'packet length too long' packet += v if len(data) < packet: return (None, data) return (data[v:packet], data[packet:]) def parse_pickle(data): '''Parses data packets that were formed by python serialization into "pickle" datagrams. This serialized data must be preceded by a variable size prefix such as that used by parse_nVlen(), so the caller does not work extra hard trying to parse incomplete pickle streams repeatedly. Any pickle failure exceptions will be raised, or the reconstituted object will be returned as the packet. ''' (blob, data) = parse_nVlen(data) if blob is None: return (None, data) packet = pickle.loads(blob) return (packet, data) def make_free(packet): '''Makes a datagram directly from any data packet given.''' return packet def make_line(packet): r'''Makes datagram(s) from ASCII/UTF-8 line(s). Supplies a terminating CRLF ('\r\n') pair if none is given. Removes solitary CR characters and turns solitary LF into CRLF pairs. ''' packet.replace('\r', '') packet.replace('\n', '\r\n') if not packet.endswith('\r\n'): packet += '\r\n' return packet def make_n8len(packet): '''Makes a small datagram with an 8-bit unsigned packet size prefix. It is a ValueError if the packet is too long to fit in this prefix. ''' if len(packet) > 0xFF: raise ValueError, 'packet length too long' return chr(len(packet)) + packet def make_n16len(packet): '''Makes a medium datagram with a 16-bit unsigned packet size prefix. It is a ValueError if the packet is too long to fit in this prefix. ''' if len(packet) > 0xFFFF: raise ValueError, 'packet length too long' v = struct.pack('!H', len(packet)) return v + packet def make_n32len(packet): '''Makes a large datagram with a 32-bit unsigned packet size prefix. It is a ValueError if the packet is too long to fit in this prefix. ''' if len(packet) > 0xFFFFFFFF: raise ValueError, 'packet length too long' v = struct.pack('!L', len(packet)) return v + packet def make_nVlen(packet): '''Makes a datagram with a variable packet size prefix. It is a ValueError if the packet is too long to fit in this prefix. ''' raise Exception, 'not supported yet' def make_pickle(packet): '''Makes a datagram that encapsulates a python serialization (or "pickle") of an arbitrary object. The remote system would have to have access to the identical definitions of any classes and datatypes used in the pickle. To simplify the parsing side, the serialized data is then prefixed with length information, using the make_nVlen() function. ''' blob = pickle.dumps(packet) return make_nVlen(blob) #---------------------------------------------------------------------------- def _trace(): text = traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback) text = ''.join(text) text = text.split('\n') if not text[-1]: text.pop() _LOG.debug(('\n' + ' '*28 + '| ').join(text)) if True: _LOG = logging.getLogger('Relay') _LOG.trace = _trace _format = '(%(levelname).1s) %(asctime)-15s | %(message)s' logging.basicConfig(level=logging.DEBUG, format=_format) def _mask_match(name, mask): # hostname string falls within hostmask name = name.replace('.', ':') mask = mask.replace('.', ':').replace('*', '.*') + r'\Z' if re.match(mask, name): return True return False #---------------------------------------------------------------------------- def _test_crashing_parse_line(data): packet = data.find('\r\n') if packet < 0: return (None, data) if '1' in data: raise ValueError, 'boom' packet += len('\r\n') (packet, data) = (data[:packet], data[packet:]) return (packet, data) def _test_relay_to_relay(): r = Relay(port=6600) t = Relay(port=7700) time.sleep(0.1) # a = ('localhost', 7700) r.call( a ) time.sleep(0.5) # r.post( a, 'Hello, World\r\n' ) time.sleep(0.5) # r.post( a, 'Blah blah blah\r\n' ) time.sleep(0.5) # r.post( a, 'Goodbye, Cruel World\r\n' ) time.sleep(0.5) # time.sleep(2) r.kill( a ) time.sleep(0.5) # r.shutdown() t.shutdown() def _test_socket_to_relay(): r = Relay(port=5500) # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', 5500)) s.send('Hello, World\r\n') s.send('Blah blah.\r\n') # time.sleep(0.5) s.close() time.sleep(0.5) # r.shutdown() class _socker (threading.Thread): def __init__(self, address, breadth=3): threading.Thread.__init__(self) self.address = address self.breadth = breadth self.setDaemon(True) def run(self): import random s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(self.address) for i in range(self.breadth): try: s.send('i=%d\r\n' % i) except: pass time.sleep(0.1 + random.random() * 0.3) while True: try: chunk = s.recv(1024) except: chunk = '' if len(chunk) == 0: break _LOG.debug(repr(chunk)) s.close() def _test_multi_socket_to_relay(): r = Relay(port=4400) # width = 3 breadth = 3 for i in range(width): s = _socker( ('localhost', 4400), breadth ) s.start() # time.sleep(width) r.shutdown() if __name__ == '__main__': for test in [ _test_socket_to_relay, _test_multi_socket_to_relay, _test_relay_to_relay, ]: _LOG.debug('-'*40) test() |
|
Contact Ed Halley by email at
ed@halley.cc. Text, code, layout and artwork are Copyright © 1996-2013 Ed Halley. Copying in whole or in part, with author attribution, is expressly allowed. Any references to trademarks are illustrative and are controlled by their respective owners. |
|