This module is inspired by recipe 276960 and shows how external processes (istantiated with Popen) can be combined with a pipe-like syntax. Some support is added for having in the pipe also local functions (by caching their results in a ByteIO, or using operating system pipes). A similar approach, using generators, is presented in recipe 576756.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | from io import BytesIO
from subprocess import Popen, PIPE
from os import pipe, fdopen
from threading import Thread
class Pipeable( object ):
def __init__( self ):
self.output = None
def _output( self, input = None ):
return self.output
def __or__( self, right ):
if not isinstance( right, Pipeable ): return NotImplemented
self.output = right._output( self._output() )
return self
class Shell( Pipeable ):
def __init__( self, cmd ):
Pipeable.__init__( self )
self.cmd = cmd
def _output( self, input = None ):
return Popen( self.cmd, stdin = input, stdout = PIPE ).stdout
class ThreadedFilter( Pipeable ):
def __init__( self, filter ):
self.filter = filter
_pipe = pipe()
self.pipe = fdopen( _pipe[ 1 ], 'w' )
self.output = fdopen( _pipe[ 0 ], 'r' )
def _output( self, input = None ):
def _target():
_out = self.pipe
for line in input:
_out.write( self.filter( line ) )
Thread( target = _target ).start()
return self.output
class CachedFilter( Pipeable ):
def __init__( self, filter ):
self.filter = filter
def _output( self, input = None ):
output = BytesIO()
for line in input:
output.write( self.filter( line ) )
output.seek( 0 )
return output
class Output( Pipeable ):
def __init__( self, output ):
self.output = output
class Print( object ):
def __ror__( self, left ):
print left.output.read()
class Write( object ):
def __init__( self, path ):
self.path = path
def __ror__( self, left ):
f = open( self.path, 'w' )
while True:
buf = left.output.read( 1024 )
if not buf: break
f.write( buf )
f.close()
if __name__ == '__main__':
Output( open( "/etc/passwd" ) ) | Shell( "rev" ) | ThreadedFilter( lambda str : str[::-1] ) | CachedFilter( lambda x : x ) | Print()
|
This is just an hack, but can probably be transformed in an interesting way of combining external processes for people used to shell like languages.
You need an extra check in Pipeable.__or__:
Otherwise the Shell pipeable, for example, always returns the same output even when it's been or'd with other things, so that
is equivalent to