Welcome, guest | Sign In | My Account | Store | Cart

This module is inspired by recipe 276960 and shows how external processes (istantiated with Popen) can be combined with a pipe-like syntax. Some support is added for having in the pipe also local functions (by caching their results in a ByteIO, or using operating system pipes). A similar approach, using generators, is presented in recipe 576756.

Python, 67 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from io import BytesIO
from subprocess import Popen, PIPE
from os import pipe, fdopen
from threading import Thread

class Pipeable( object ):
	def __init__( self ):
		self.output = None
	def _output( self, input = None ):
		return self.output
	def __or__( self, right ):
		if not isinstance( right, Pipeable ): return NotImplemented
		self.output = right._output( self._output() )
		return self

class Shell( Pipeable ):
	def __init__( self, cmd ):
		Pipeable.__init__( self )
		self.cmd = cmd
	def _output( self, input = None ):
		return Popen( self.cmd, stdin = input, stdout = PIPE ).stdout

class ThreadedFilter( Pipeable ):
	def __init__( self, filter ):
		self.filter = filter
		_pipe = pipe()
		self.pipe = fdopen( _pipe[ 1 ], 'w' )
		self.output = fdopen( _pipe[ 0 ], 'r' )
	def _output( self, input = None ):
		def _target():
			_out = self.pipe
			for line in input:
				_out.write( self.filter( line ) )
		Thread( target = _target ).start()
		return self.output

class CachedFilter( Pipeable ):
	def __init__( self, filter ):
		self.filter = filter
	def _output( self, input = None ):
		output = BytesIO()
		for line in input:
			output.write( self.filter( line ) )
		output.seek( 0 )
		return output

class Output( Pipeable ):
	def __init__( self, output ):
		self.output = output

class Print( object ):
	def __ror__( self, left ):
		print left.output.read()

class Write( object ):
	def __init__( self, path ):
		self.path = path
	def __ror__( self, left ):
		f = open( self.path, 'w' )
		while True:
			buf = left.output.read( 1024 )
			if not buf: break
			f.write( buf )
		f.close()
	
if __name__ == '__main__':
	Output( open( "/etc/passwd" ) ) | Shell( "rev" ) | ThreadedFilter( lambda str : str[::-1] ) | CachedFilter( lambda x : x ) | Print()
	

This is just an hack, but can probably be transformed in an interesting way of combining external processes for people used to shell like languages.

1 comment

Daniel Lepage 14 years, 1 month ago  # | flag

You need an extra check in Pipeable.__or__:

def __or__( self, right ):
    if not isinstance( right, Pipeable ): return NotImplemented
    if self.output is not None:
        self.output = right._output(self.output)
    else:
        self.output = right._output( self._output() )
    return self

Otherwise the Shell pipeable, for example, always returns the same output even when it's been or'd with other things, so that

Shell(["cat","this.py"]) | Shell(["grep","cmd"]) | Shell(["tr","a-z","A-Z"])

is equivalent to

Shell(["cat","this.py"]) | Shell(["tr","a-z","A-Z"])