Welcome, guest | Sign In | My Account | Store | Cart

Just stumbled upon the need to move data chunks between subprocesses in a non-linear way with some logic in-between, so tee(1) and fifo(7)'s weren't too good option. Inspired by 440554, but rewritten from scratch to remove unnecessary delays due to sleep(3) calls and suboptimal try/sleep-based polling.

Python, 151 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from subprocess import Popen, PIPE
import errno, fcntl
from time import time
import os, sys

# Exit conditions (states)
class Time: pass # hit-the-time-limit state
class Size: pass # hit-the-size-limit state
class End: pass # hit-the-end state

class AWrapper(object):
	'''Async I/O objects' wrapper'''

	bs_default = 8192
	bs_max = 65536

	def __init__(self, pipe):
		if isinstance(pipe, int):
			fd = self._fd = pipe
			pipe = os.fromfd(pipe)
		else: fd = self._fd = pipe.fileno()
		self._poll_in, self._poll_out = epoll(), epoll()
		self._poll_in.register(fd, EPOLLIN | EPOLLERR | EPOLLHUP)
		self._poll_out.register(fd, EPOLLOUT | EPOLLERR | EPOLLHUP)
		self.close = pipe.close
		self.reads = pipe.read
		self.writes = pipe.write

	def __del__(self):
		self._poll_in.close()
		self._poll_out.close()
		self.close()

	def read(self, bs=-1, to=-1, state=False): # read until timeout
		if to < 0: # use regular sync I/O
			buff = self.reads(bs)
			if state: return (buff, Size) if len(buff) == bs else (buff, End) # "Size" might mean "Size | End" here
			else: return buff
		try:
			flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
			fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
			deadline = time() + to
			buff = buffer('')
			while bs:
				try: fd, event = self._poll_in.poll(to, 1)[0] # get first event, fd should be eq self._fd
				except IndexError:
					if state: state = Time
					break
				if event != EPOLLHUP: # some data or error present
					ext = self.reads(min(bs, self.bs_max) if bs > 0 else self.bs_default) # min() for G+ reads
					buff += ext
				if event & EPOLLHUP: # socket is closed on the other end
					if state: state = End
					break
				to = deadline - time()
				if to < 0:
					if state: state = Time
					break
				bs -= len(ext)
			else: state = Size # got bs bytes
		finally:
			try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags) # restore blocking state
			except: pass # in case there was an error, caused by wrong fd/pipe (not to raise another one)
		return buff if not state else (buff, state)

	def write(self, buff, to=-1, state=False): # mostly similar (in reverse) to read
		if to < 0:
			bs = self.writes(buff)
			if state: return (bs, Size) if len(buff) == bs else (bs, End) # "Size" might mean "Size | End" here
			else: return bs
		try:
			flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
			fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
			bs = 0
			deadline = time() + to
			while buff:
				try: fd, event = self._poll_out.poll(to, 1)[0]
				except IndexError:
					if state: state = Time
					break
				if event != EPOLLHUP:
					ext = os.write(fd, buff)
					bs += ext
				if event & EPOLLHUP:
					if state: state = End
					break
				to = deadline - time()
				if to < 0:
					if state: state = Time
					break
				buff = buffer(buff, ext)
		finally:
			try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags)
			except: pass
		return bs if not state else (bs, state)



import signal

class AExec(Popen):
	def __init__(self, *argz, **kwz): # keywords aren't used yet
		if len(argz) == 1: argz = (argz[0],) if isinstance(argz[0], (str, unicode, buffer)) else argz[0]
		try: sync = kwz.pop('sync')
		except KeyError: sync = True
		super(AExec, self).__init__(argz, **kwz)
		if self.stdin:
			if not sync: self.stdin = AWrapper(self.stdin)
			self.write = self.stdin.write
		if self.stdout:
			if not sync: self.stdout = AWrapper(self.stdout)
			self.read = self.stdout.read
		if self.stderr:
			if not sync: self.stderr = AWrapper(self.stderr)
			self.read_err = self.stderr.read

	def wait(self, to=-1):
		if to > 0:
			ts, fuse, action = time(), signal.alarm(to), signal.getsignal(signal.SIGALRM)
			def quit(s,f): raise StopIteration
			signal.signal(signal.SIGALRM, quit)
			try: status = super(AExec, self).wait()
			except StopIteration: return Time
			signal.signal(signal.SIGALRM, action)
			if fuse:
				fuse = int(time() - ts + fuse)
				if fuse > 0: signal.alarm(fuse)
				else: # trigger it immediately
					signal.alarm(0)
					os.kill(os.getpid(), signal.SIGALRM)
			else: signal.alarm(0)
		else: status = super(AExec, self).wait()
		return status

	def close(self, to=-1, to_sever=3):
		try:
			if self.stdin: # try to strangle it
				try: self.stdin.close()
				except: pass
				if to_sever and to > to_sever: # wait for process to die on it's own
					status = self.wait(to_sever)
					if not status is Time: return status
					else: to -= to_sever
			self.terminate() # soft-kill
			status = self.wait(to)
			if status is Time:
				self.kill() # hard-kill
				return Time
		except: return None # already taken care of
	__del__ = close

Linux epoll(7) reactor can be easily replaced by kqueue or select for cross-platform compatibility.

Example usage:

## Simple tar to secure remote system and insecure one thru gpg
##  (simple case, can also be implemented by tee & fifos)
# Note that only one pipe is actually asynchronous

source = AExec('tar', '-czf', '-', '/')

target1 = AExec('ssh', 'backup@somehost', 'cat > /mnt/backups/backup.tar.gz')

filter = AExec('gpg', '--encrypt', '--recipient', 'admin@somehost', sync=False)
target2 = AExec('ssh', 'backup@publichost', 'cat > /public/my_backup.tar.gz.gpg')

max_timeout = 10

to = 0 # filter timeout, raised in case of jam
while True:
    chunk = source.read(8192) # 8 KiB chunks, shouldn't be larger than pipe read/write buffers

    if chunk and not to: # got data chunk and filter isn't jammed
        target1.write(chunk)
        while chunk:
            bytes = filter.write(chunk, to) # try feeding the filter (could be overflown)
            if bytes: chunk = buffer(chunk, bytes) # unsent leftover (should be zero, if chunk < buffers)
            buff = filter.read(-1, 0) # return all we can grab from the filter at the moment
            if buff:
                target2.write(buff)
                to = 0
            elif not bytes:
                if not to: to = max_timeout # try harder, one last time
                else:
                    log.error('Filter looks jammed (not responsible to I/O) for %ds'%max_timeout)
                    break
            else: to = 0
        else: to = 0 # also indicates chunk sending success

    else: # either there's no more data to read or filter is jammed - cleanup & break
        if not to: # it's not a jam
            filter.stdin.close() # sever data intake, so filter would flush the output buffer
            chunk, state = filter.read(-1, max_timeout, state=True) # squeeze the last bits
            if state is Time: log.error("Filter stdout wasn't closed properly on the other end")
            target2.write(chunk)
        if filter.wait(max_timeout) is Time:
            log.error("Filter process hasn't died off, will be terminated")
            if filter.close(max_timeout, 0) is Time: log.error("Filter had to be -KILL'ed")
        source.close(max_timeout)
        target1.close(max_timeout)
        target2.close(max_timeout)
        break

1 comment

Dmitry 11 years, 7 months ago  # | flag

Класс!!! Хочу добавить поддержку винды