This module is inspired by recipe 276960 and shows how external processes (istantiated with Popen) can be combined with a pipe-like syntax. Some support is added for having in the pipe also local functions (by caching their results in a ByteIO, or using operating system pipes). A similar approach, using generators, is presented in recipe 576756.
| Python |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | from io import BytesIO
from subprocess import Popen, PIPE
from os import pipe, fdopen
from threading import Thread
class Pipeable( object ):
def __init__( self ):
self.output = None
def _output( self, input = None ):
return self.output
def __or__( self, right ):
if not isinstance( right, Pipeable ): return NotImplemented
self.output = right._output( self._output() )
return self
class Shell( Pipeable ):
def __init__( self, cmd ):
Pipeable.__init__( self )
self.cmd = cmd
def _output( self, input = None ):
return Popen( self.cmd, stdin = input, stdout = PIPE ).stdout
class ThreadedFilter( Pipeable ):
def __init__( self, filter ):
self.filter = filter
_pipe = pipe()
self.pipe = fdopen( _pipe[ 1 ], 'w' )
self.output = fdopen( _pipe[ 0 ], 'r' )
def _output( self, input = None ):
def _target():
_out = self.pipe
for line in input:
_out.write( self.filter( line ) )
Thread( target = _target ).start()
return self.output
class CachedFilter( Pipeable ):
def __init__( self, filter ):
self.filter = filter
def _output( self, input = None ):
output = BytesIO()
for line in input:
output.write( self.filter( line ) )
output.seek( 0 )
return output
class Output( Pipeable ):
def __init__( self, output ):
self.output = output
class Print( object ):
def __ror__( self, left ):
print left.output.read()
class Write( object ):
def __init__( self, path ):
self.path = path
def __ror__( self, left ):
f = open( self.path, 'w' )
while True:
buf = left.output.read( 1024 )
if not buf: break
f.write( buf )
f.close()
if __name__ == '__main__':
Output( open( "/etc/passwd" ) ) | Shell( "rev" ) | ThreadedFilter( lambda str : str[::-1] ) | CachedFilter( lambda x : x ) | Print()
|
Discussion
This is just an hack, but can probably be transformed in an interesting way of combining external processes for people used to shell like languages.


Sign in to comment