from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from subprocess import Popen, PIPE
import errno, fcntl
from time import time
import os, sys
# Exit conditions (states)
class Time: pass # hit-the-time-limit state
class Size: pass # hit-the-size-limit state
class End: pass # hit-the-end state
class AWrapper(object):
'''Async I/O objects' wrapper'''
bs_default = 8192
bs_max = 65536
def __init__(self, pipe):
if isinstance(pipe, int):
fd = self._fd = pipe
pipe = os.fromfd(pipe)
else: fd = self._fd = pipe.fileno()
self._poll_in, self._poll_out = epoll(), epoll()
self._poll_in.register(fd, EPOLLIN | EPOLLERR | EPOLLHUP)
self._poll_out.register(fd, EPOLLOUT | EPOLLERR | EPOLLHUP)
self.close = pipe.close
self.reads = pipe.read
self.writes = pipe.write
def __del__(self):
self._poll_in.close()
self._poll_out.close()
self.close()
def read(self, bs=-1, to=-1, state=False): # read until timeout
if to < 0: # use regular sync I/O
buff = self.reads(bs)
if state: return (buff, Size) if len(buff) == bs else (buff, End) # "Size" might mean "Size | End" here
else: return buff
try:
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
deadline = time() + to
buff = buffer('')
while bs:
try: fd, event = self._poll_in.poll(to, 1)[0] # get first event, fd should be eq self._fd
except IndexError:
if state: state = Time
break
if event != EPOLLHUP: # some data or error present
ext = self.reads(min(bs, self.bs_max) if bs > 0 else self.bs_default) # min() for G+ reads
buff += ext
if event & EPOLLHUP: # socket is closed on the other end
if state: state = End
break
to = deadline - time()
if to < 0:
if state: state = Time
break
bs -= len(ext)
else: state = Size # got bs bytes
finally:
try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags) # restore blocking state
except: pass # in case there was an error, caused by wrong fd/pipe (not to raise another one)
return buff if not state else (buff, state)
def write(self, buff, to=-1, state=False): # mostly similar (in reverse) to read
if to < 0:
bs = self.writes(buff)
if state: return (bs, Size) if len(buff) == bs else (bs, End) # "Size" might mean "Size | End" here
else: return bs
try:
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
bs = 0
deadline = time() + to
while buff:
try: fd, event = self._poll_out.poll(to, 1)[0]
except IndexError:
if state: state = Time
break
if event != EPOLLHUP:
ext = os.write(fd, buff)
bs += ext
if event & EPOLLHUP:
if state: state = End
break
to = deadline - time()
if to < 0:
if state: state = Time
break
buff = buffer(buff, ext)
finally:
try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags)
except: pass
return bs if not state else (bs, state)
import signal
class AExec(Popen):
def __init__(self, *argz, **kwz): # keywords aren't used yet
if len(argz) == 1: argz = (argz[0],) if isinstance(argz[0], (str, unicode, buffer)) else argz[0]
try: sync = kwz.pop('sync')
except KeyError: sync = True
super(AExec, self).__init__(argz, **kwz)
if self.stdin:
if not sync: self.stdin = AWrapper(self.stdin)
self.write = self.stdin.write
if self.stdout:
if not sync: self.stdout = AWrapper(self.stdout)
self.read = self.stdout.read
if self.stderr:
if not sync: self.stderr = AWrapper(self.stderr)
self.read_err = self.stderr.read
def wait(self, to=-1):
if to > 0:
ts, fuse, action = time(), signal.alarm(to), signal.getsignal(signal.SIGALRM)
def quit(s,f): raise StopIteration
signal.signal(signal.SIGALRM, quit)
try: status = super(AExec, self).wait()
except StopIteration: return Time
signal.signal(signal.SIGALRM, action)
if fuse:
fuse = int(time() - ts + fuse)
if fuse > 0: signal.alarm(fuse)
else: # trigger it immediately
signal.alarm(0)
os.kill(os.getpid(), signal.SIGALRM)
else: signal.alarm(0)
else: status = super(AExec, self).wait()
return status
def close(self, to=-1, to_sever=3):
try:
if self.stdin: # try to strangle it
try: self.stdin.close()
except: pass
if to_sever and to > to_sever: # wait for process to die on it's own
status = self.wait(to_sever)
if not status is Time: return status
else: to -= to_sever
self.terminate() # soft-kill
status = self.wait(to)
if status is Time:
self.kill() # hard-kill
return Time
except: return None # already taken care of
__del__ = close