Just stumbled upon the need to move data chunks between subprocesses in a non-linear way with some logic in-between, so tee(1) and fifo(7)'s weren't too good option. Inspired by 440554, but rewritten from scratch to remove unnecessary delays due to sleep(3) calls and suboptimal try/sleep-based polling.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from subprocess import Popen, PIPE
import errno, fcntl
from time import time
import os, sys
# Exit conditions (states)
class Time: pass # hit-the-time-limit state
class Size: pass # hit-the-size-limit state
class End: pass # hit-the-end state
class AWrapper(object):
'''Async I/O objects' wrapper'''
bs_default = 8192
bs_max = 65536
def __init__(self, pipe):
if isinstance(pipe, int):
fd = self._fd = pipe
pipe = os.fromfd(pipe)
else: fd = self._fd = pipe.fileno()
self._poll_in, self._poll_out = epoll(), epoll()
self._poll_in.register(fd, EPOLLIN | EPOLLERR | EPOLLHUP)
self._poll_out.register(fd, EPOLLOUT | EPOLLERR | EPOLLHUP)
self.close = pipe.close
self.reads = pipe.read
self.writes = pipe.write
def __del__(self):
self._poll_in.close()
self._poll_out.close()
self.close()
def read(self, bs=-1, to=-1, state=False): # read until timeout
if to < 0: # use regular sync I/O
buff = self.reads(bs)
if state: return (buff, Size) if len(buff) == bs else (buff, End) # "Size" might mean "Size | End" here
else: return buff
try:
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
deadline = time() + to
buff = buffer('')
while bs:
try: fd, event = self._poll_in.poll(to, 1)[0] # get first event, fd should be eq self._fd
except IndexError:
if state: state = Time
break
if event != EPOLLHUP: # some data or error present
ext = self.reads(min(bs, self.bs_max) if bs > 0 else self.bs_default) # min() for G+ reads
buff += ext
if event & EPOLLHUP: # socket is closed on the other end
if state: state = End
break
to = deadline - time()
if to < 0:
if state: state = Time
break
bs -= len(ext)
else: state = Size # got bs bytes
finally:
try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags) # restore blocking state
except: pass # in case there was an error, caused by wrong fd/pipe (not to raise another one)
return buff if not state else (buff, state)
def write(self, buff, to=-1, state=False): # mostly similar (in reverse) to read
if to < 0:
bs = self.writes(buff)
if state: return (bs, Size) if len(buff) == bs else (bs, End) # "Size" might mean "Size | End" here
else: return bs
try:
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
bs = 0
deadline = time() + to
while buff:
try: fd, event = self._poll_out.poll(to, 1)[0]
except IndexError:
if state: state = Time
break
if event != EPOLLHUP:
ext = os.write(fd, buff)
bs += ext
if event & EPOLLHUP:
if state: state = End
break
to = deadline - time()
if to < 0:
if state: state = Time
break
buff = buffer(buff, ext)
finally:
try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags)
except: pass
return bs if not state else (bs, state)
import signal
class AExec(Popen):
def __init__(self, *argz, **kwz): # keywords aren't used yet
if len(argz) == 1: argz = (argz[0],) if isinstance(argz[0], (str, unicode, buffer)) else argz[0]
try: sync = kwz.pop('sync')
except KeyError: sync = True
super(AExec, self).__init__(argz, **kwz)
if self.stdin:
if not sync: self.stdin = AWrapper(self.stdin)
self.write = self.stdin.write
if self.stdout:
if not sync: self.stdout = AWrapper(self.stdout)
self.read = self.stdout.read
if self.stderr:
if not sync: self.stderr = AWrapper(self.stderr)
self.read_err = self.stderr.read
def wait(self, to=-1):
if to > 0:
ts, fuse, action = time(), signal.alarm(to), signal.getsignal(signal.SIGALRM)
def quit(s,f): raise StopIteration
signal.signal(signal.SIGALRM, quit)
try: status = super(AExec, self).wait()
except StopIteration: return Time
signal.signal(signal.SIGALRM, action)
if fuse:
fuse = int(time() - ts + fuse)
if fuse > 0: signal.alarm(fuse)
else: # trigger it immediately
signal.alarm(0)
os.kill(os.getpid(), signal.SIGALRM)
else: signal.alarm(0)
else: status = super(AExec, self).wait()
return status
def close(self, to=-1, to_sever=3):
try:
if self.stdin: # try to strangle it
try: self.stdin.close()
except: pass
if to_sever and to > to_sever: # wait for process to die on it's own
status = self.wait(to_sever)
if not status is Time: return status
else: to -= to_sever
self.terminate() # soft-kill
status = self.wait(to)
if status is Time:
self.kill() # hard-kill
return Time
except: return None # already taken care of
__del__ = close
|
Linux epoll(7) reactor can be easily replaced by kqueue or select for cross-platform compatibility.
Example usage:
## Simple tar to secure remote system and insecure one thru gpg
## (simple case, can also be implemented by tee & fifos)
# Note that only one pipe is actually asynchronous
source = AExec('tar', '-czf', '-', '/')
target1 = AExec('ssh', 'backup@somehost', 'cat > /mnt/backups/backup.tar.gz')
filter = AExec('gpg', '--encrypt', '--recipient', 'admin@somehost', sync=False)
target2 = AExec('ssh', 'backup@publichost', 'cat > /public/my_backup.tar.gz.gpg')
max_timeout = 10
to = 0 # filter timeout, raised in case of jam
while True:
chunk = source.read(8192) # 8 KiB chunks, shouldn't be larger than pipe read/write buffers
if chunk and not to: # got data chunk and filter isn't jammed
target1.write(chunk)
while chunk:
bytes = filter.write(chunk, to) # try feeding the filter (could be overflown)
if bytes: chunk = buffer(chunk, bytes) # unsent leftover (should be zero, if chunk < buffers)
buff = filter.read(-1, 0) # return all we can grab from the filter at the moment
if buff:
target2.write(buff)
to = 0
elif not bytes:
if not to: to = max_timeout # try harder, one last time
else:
log.error('Filter looks jammed (not responsible to I/O) for %ds'%max_timeout)
break
else: to = 0
else: to = 0 # also indicates chunk sending success
else: # either there's no more data to read or filter is jammed - cleanup & break
if not to: # it's not a jam
filter.stdin.close() # sever data intake, so filter would flush the output buffer
chunk, state = filter.read(-1, max_timeout, state=True) # squeeze the last bits
if state is Time: log.error("Filter stdout wasn't closed properly on the other end")
target2.write(chunk)
if filter.wait(max_timeout) is Time:
log.error("Filter process hasn't died off, will be terminated")
if filter.close(max_timeout, 0) is Time: log.error("Filter had to be -KILL'ed")
source.close(max_timeout)
target1.close(max_timeout)
target2.close(max_timeout)
break
Класс!!! Хочу добавить поддержку винды