"""Asynchronous network IO based on generators which have the ability to send a result to a yield expression. xxx todo send exceptions to parent """ import urlparse, urllib, types, select, socket, sys def next(g, msg): g.gi_frame.f_locals['__cr_ret__'] = msg return g.next() def accept(): return sys._getframe().f_back.f_locals['__cr_ret__'] def getline(connection): line = '' while not line.endswith('\r\n'): yield connection.get(1) line += accept() yield SendUp(line) def getheaders(connection): headers = {} line = '' while True: yield getline(connection) line = accept() if line == '\r\n': break key, value = line.split(':', 1) headers[key] = value.rstrip() yield SendUp(headers) def gethttp(url, method='GET'): scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) host, port = urllib.splitport(netloc) connection = connect(host, port and int(port) or 80) yield connection.open() yield connection.put( '%(method)s %(path)s HTTP/1.0\r\n\r\n' % dict( method=method, path=path)) yield getline(connection) status = accept() yield getline(connection) _ = accept() yield getheaders(connection) headers = accept() yield connection.get(int(headers['Content-Length'])) body = accept() print body, yield connection.close() class connect(object): fileno = None def __init__(self, host, port): self.host = host self.port = port self.readbuffer = '' def open(self): sock = socket.socket() self.socket = sock self.fileno = sock.fileno() sock.setblocking(False) try: sock.connect((self.host, self.port)) except socket.error, e: if e[0] != 36: raise return sock def close(self): return CloseIt(self.fileno) def put(self, stuff): assert self.fileno return WriteIt(self, stuff) def get(self, howmuch=512): assert type(howmuch) is int if len(self.readbuffer) >= howmuch: tosend, self.readbuffer = ( self.readbuffer[:howmuch], self.readbuffer[howmuch:]) return SendNow(tosend) assert self.fileno return ReadIt(self, howmuch) class ReadIt(object): def __init__(self, connection, howmuch): self.connection = connection self.howmuch = howmuch def read(self): read = self.connection.socket.recv(1024) if len(read) == 0: return False self.connection.readbuffer += read if len(self.connection.readbuffer) >= self.howmuch: tosend, self.connection.readbuffer = ( self.connection.readbuffer[:self.howmuch], self.connection.readbuffer[self.howmuch:]) return tosend return read def fileno(self): return self.connection.fileno class WriteIt(object): def __init__(self, connection, writewhat): self.connection = connection self.writewhat = writewhat def write(self): oldlen = len(self.writewhat) newlen = self.connection.socket.send(self.writewhat) self.writewhat = self.writewhat[newlen:] if newlen < oldlen: return False return True def fileno(self): return self.connection.fileno class CloseIt(object): def __init__(self, fileno): self.fileno = fileno class SendNow(object): def __init__(self, whatToSend): self.whatToSend = whatToSend class SendUp(object): def __init__(self, whatToSend): self.whatToSend = whatToSend class Loop(object): selector = select.select def __init__(self): self.generators = [] self.filenos = [] self.dependencies = {} self.waiters = {} self.parents = {} self.delays = [] self.reads = [] self.writes = [] self.readmap = {} self.writemap = {} self.sends = {} def register(self, generator, send=None): self.generators.append(generator) self.sends[generator] = send def iterate(self): """Decide what this loop needs to do this iteration. Return value is a tuple of: (delay, readers, writers, xers) """ appends = [] filters = {} for gen in self.generators: try: whatsnext = next(gen, self.sends.pop(gen, None)) except StopIteration: filters[gen] = True continue if isinstance(whatsnext, socket.socket): fileno = whatsnext.fileno() self.dependencies.setdefault(fileno, []).append(gen) self.filenos.append(fileno) elif isinstance(whatsnext, ReadIt): fileno = whatsnext.fileno() self.reads.append(fileno) self.readmap[fileno] = whatsnext ## Don't iterate this until reading is done self.waiters[fileno] = gen filters[gen] = True elif isinstance(whatsnext, WriteIt): fileno = whatsnext.fileno() self.writes.append(fileno) self.writemap[fileno] = whatsnext ## Don't iterate this until writing is done filters[gen] = True elif isinstance(whatsnext, types.GeneratorType): ## The new generator is a child of the current one self.parents[whatsnext] = gen appends.append(whatsnext) ## Don't work the parent until the child is done filters[gen] = True elif isinstance(whatsnext, SendUp): ## All done with gen filters[gen] = True ## Wake up the parent appends.append(self.parents.pop(gen)) ## Send the child result to it self.sends[appends[-1]] = whatsnext.whatToSend elif isinstance(whatsnext, SendNow): self.sends[gen] = whatsnext.whatToSend elif isinstance(whatsnext, CloseIt): self.filenos.remove(whatsnext.fileno) else: raise RuntimeError, `whatsnext` self.generators = [ x for x in self.generators if x not in filters] self.generators.extend(appends) def loop(self): while self.generators or self.filenos or self.delays: while self.generators: self.iterate() if self.delays: delay = self.delays[0].delay else: delay = 5 readyread, readywrite, readyx = self.selector( self.reads, self.writes, self.filenos, delay) first = True while first or self.generators: for read in readyread: if not self.readmap.has_key(read): continue whatread = self.readmap[read].read() if whatread is False: self.reads.remove(read) self.readmap.pop(read) #for dependency in self.dependencies[read]: # self.generators.append(dependency) else: gen = self.waiters[read] self.sends[gen] = whatread self.generators.append(gen) for write in readywrite: if not self.writemap.has_key(write): continue if self.writemap[write].write(): self.writes.remove(write) self.writemap.pop(write) for dependency in self.dependencies[write]: self.generators.append(dependency) for x in readyx: self.filenos.remove(x) def recursivelyRemove(gen): ## xxx send exception here? self.generators.remove(gen) parent = self.parents.pop(gen, None) if parent is not None: recursivelyRemove(parent) recursivelyRemove(self.dependencies[x]) self.iterate() first = False if __name__ == '__main__': l = Loop() l.register(gethttp('http://localhost/')) l.loop()