The 'subprocess' module in Python 2.4 has made creating and accessing subprocess streams in Python relatively convenient for all supported platforms, but what if you want to interact with the started subprocess? That is, what if you want to send a command, read the response, and send a new command based on that response?
Now there is a solution. The included subprocess.Popen subclass adds three new commonly used methods: recv(maxsize=None), recv_err(maxsize=None), and send(input), along with a utility method: send_recv(input='', maxsize=None).
recv() and recv_err() both read at most maxsize bytes from the started subprocess. send() sends strings to the started subprocess. send_recv() will send the provided input, and read up to maxsize bytes from both stdout and stderr.
If any of the pipes are closed, the attributes for those pipes will be set to None, and the methods will return None.
v. 1.3 fixed a few bugs relating to *nix support v. 1.4,5 fixed initialization on all platforms, a few bugs relating to Windows support, added two utility functions, and added an example of how to use this module. v. 1.6 fixed linux _recv() and test initialization thanks to Yuri Takhteyev at Stanford. v. 1.7 removed _setup() and __init__() and fixed subprocess unittests thanks to Antonio Valentino. Added 4th argument 'tr' to recv_some(), which is, approximately, the number of times it will attempt to recieve data. Added 5th argument 'stderr' to recv_some(), where when true, will recieve from stderr. Cleaned up some pipe closing. v. 1.8 Fixed missing self. parameter in non-windows _recv method thanks to comment. v. 1.9 Fixed fcntl calls for closed handles.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
import os import subprocess import errno import time import sys PIPE = subprocess.PIPE if subprocess.mswindows: from win32file import ReadFile, WriteFile from win32pipe import PeekNamedPipe import msvcrt else: import select import fcntl class Popen(subprocess.Popen): def recv(self, maxsize=None): return self._recv('stdout', maxsize) def recv_err(self, maxsize=None): return self._recv('stderr', maxsize) def send_recv(self, input='', maxsize=None): return self.send(input), self.recv(maxsize), self.recv_err(maxsize) def get_conn_maxsize(self, which, maxsize): if maxsize is None: maxsize = 1024 elif maxsize < 1: maxsize = 1 return getattr(self, which), maxsize def _close(self, which): getattr(self, which).close() setattr(self, which, None) if subprocess.mswindows: def send(self, input): if not self.stdin: return None try: x = msvcrt.get_osfhandle(self.stdin.fileno()) (errCode, written) = WriteFile(x, input) except ValueError: return self._close('stdin') except (subprocess.pywintypes.error, Exception), why: if why in (109, errno.ESHUTDOWN): return self._close('stdin') raise return written def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None try: x = msvcrt.get_osfhandle(conn.fileno()) (read, nAvail, nMessage) = PeekNamedPipe(x, 0) if maxsize < nAvail: nAvail = maxsize if nAvail > 0: (errCode, read) = ReadFile(x, nAvail, None) except ValueError: return self._close(which) except (subprocess.pywintypes.error, Exception), why: if why in (109, errno.ESHUTDOWN): return self._close(which) raise if self.universal_newlines: read = self._translate_newlines(read) return read else: def send(self, input): if not self.stdin: return None if not select.select(, [self.stdin], , 0): return 0 try: written = os.write(self.stdin.fileno(), input) except OSError, why: if why == errno.EPIPE: #broken pipe return self._close('stdin') raise return written def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None flags = fcntl.fcntl(conn, fcntl.F_GETFL) if not conn.closed: fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) try: if not select.select([conn], , , 0): return '' r = conn.read(maxsize) if not r: return self._close(which) if self.universal_newlines: r = self._translate_newlines(r) return r finally: if not conn.closed: fcntl.fcntl(conn, fcntl.F_SETFL, flags) message = "Other end disconnected!" def recv_some(p, t=.1, e=1, tr=5, stderr=0): if tr < 1: tr = 1 x = time.time()+t y =  r = '' pr = p.recv if stderr: pr = p.recv_err while time.time() < x or r: r = pr() if r is None: if e: raise Exception(message) else: break elif r: y.append(r) else: time.sleep(max((x-time.time())/tr, 0)) return ''.join(y) def send_all(p, data): while len(data): sent = p.send(data) if sent is None: raise Exception(message) data = buffer(data, sent) if __name__ == '__main__': if sys.platform == 'win32': shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n') else: shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n') a = Popen(shell, stdin=PIPE, stdout=PIPE) print recv_some(a), for cmd in commands: send_all(a, cmd + tail) print recv_some(a), send_all(a, 'exit' + tail) print recv_some(a, e=0) a.wait()
In wxPython, one can use wxExecute and wxProcess to the same effect, though then one is required to use wxPython.
With the above subclass, writing a multi-platform 'expect' module would be relatively straightforward.
I had originally used a variant of the above for running arbitrary commands in a version of PyPE. Ultimately I went with a wxExecute/wxProcess combination due to the convenience of being notified when a process terminates, not having to ship subprocess.py for use with Python 2.3, and not needing for Windows users to install pywin32.
Both have an issue in that some commands will buffer their output strangely in Windows (though I have not observed such behavior on *nix, that doesn't mean that such platforms are immune). For example, running some python scripts via either method results in no observed output from python until python closes (even when a fairly large volume of data is printed via Python), neither cygwin bash nor sh display their prompts, etc.
Useful addition to the standard library. There exist already some 'expect'-like modules but they would all benefit from a 'non-pty' based approach.
The author's subprocess module is a gem and I really hope that the functionality described in the above subclass would become part of the Python standard library.
Possible alternative unix approach. I was successful using nonblocking io, for example.
Once the process instance is created change its stdout to nonblocking reads. I am aware of one downside to this approach is the resource temporarily unavailable exceptions upon reading. These can be safely ignored. Another may be the need to poll the subprocess, so I suspect the select method with 0 for the timeout is more appropriate.
Error Message under winXP. Instead of calling "get_conn_maxsize(which, maxsize)"
in the function _recv
I use "self.get_conn_maxsize(which, maxsize)"
Otherwise I get an error message
Sorry about that, I had forgotten the self. portion during a refactoring.
Well, select with 0 for a timeout only tells you if data is ready. If one attempts to read more than is available, it will block. I've included the fcntl trick to change the pipes to non-blocking. Thank you!
Maybe a Use Case Example? Sorry, maybe too much to ask for... But a couple examples would be very helpful. Maybe one executing a file and another executing an OS call such as 'dir' or 'echo'.
I've added a bit to the end of the recipe.
non-blocking not required. You don't need to set non-blocking mode. This is a common missconception.
os.read and os.write only block in blocking mode if nothing could be read or written. Otherwise they will not block and will do a partial read/write. You are already checking with select() that at least something could be read/written, so they will not block, even without non-blocking mode.
Python buffering in Windows. I was able to avoid buffering problems with Windows Python by invoking Python with the '-u' option.
Python buffering in Windows. I was able to avoid buffering problems with Windows Python by invoking the Python subprocess with the '-u' option.
That doesn't always work.
Long running processes. Nice piece of code. I was dealing with the output from a long running process and found recv_some() would occasionally freeze. If the proc.recv() happened to take long enough, time.time() could be greater than x, which results in a negative argument for time.sleep(). It apparently tries to sleep forever in this case. I just added a check:
Also, on Linux, I had to add 'import select'. Nice work, though, definitely saved me some time.
Can't capture child output. After adding "import select", the code runs (SUSE 10). However, changing the line:
shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')
shell, commands, tail = ('sh', ('./mttst.py'), '\n')
results in an immediate error "Other end disconnected!"
mttst.py is executable and runs from the CL.
print "mttst.py started"
for i in range(5):
the next two lines should be indented. Don't know how.
print "mttst.py finished"
Suggestions on what I'm doing wrong much appreciated.
Standard subprocess unittest. The _setup method raises an exception if one of the stdin, stdout, stderr file descriptors is None.
Setting non-blocking mode at sub-process startup (POSIX case) seems to break the unit tests (test_subproces.py) of the standard subprocess module. I set the non-blocking flag in _send and _recv methods and restore the original ones before they returns.
This fixes the problem. The _setup method now is no more needed.
If you look at other pieces of the demo, it asumes each item run will execute almost instaneously. In this case, the program it is attempting to run takes around 5 seconds (plus the starting up of Python, which hopefully should be fast).
Even if it did run successfully, it wouldn't necessarily print everything as output (especially considering Python's sometimes wonky stdout buffering semantics on certain platforms).
I tried running it on Windows, and it printed out the "starting" message along with the first timestamp. Then it paused for 5 seconds waiting for the subprocess to close (while not recieving information), and quit without exception.
forgot 'self.' It looks like you used this code (I did a visual compare if this code was included that could have been more clear from your answer).
But actually 'self.' is missing in the non mswindows version:
So you cannot currently download the code and have it running on non-windows.
Fixed the code that you mention. Thank you.
Running your examples gives me a "ValueError: I/O operation on closed file" When I run your example on Linux with Python 2.4.4 I get an exception. The stack dump is
Changing the line 123 to
fixes the problem for me.
I modified a few more lines to fix it for all fcntl calls in the _recv method.
Have you tried Pexpect? Take a look at
for a similar (better?) module -- 100% Python, optimized for speed, very flexible.
Pexpect doesn't work on vanilla Windows, so it's not a fully portable solution (although it is more fully-featured, especially the valuable expect() method).
Control Keys? This is great, but what happens if a subordinate process needs to be interrupted? Since the examples use a shell, would there be any way to send control charaters (like ^C) to the shell?
I don't do much with Python, but recently needed to use this useful recipe. I found that I needed to add something to wait for a specific string to appear in the output prior to continuing. In looking at the output I needed to ignore the echo of the command I was sending to the shell, and only look at the output that results from the execution of the command. Here's what I added:
(I just noticed that I have \r\n in the regular expression, so that needs to be fixed for portability to *nix systems).
Every sleep() call is actually a waste of time, because sleeping IS implemented in the reactor (select), but not as "sleep for N secs then check" but "sleep until check", so the code can be easily refactored without any "tries" count or "sleep".
Below method is a piece of async wrapper class, just for reference. "self._poll_in" here is py2.6 epoll reactor object in level-triggering mode.