Welcome, guest | Sign In | My Account | Store | Cart

The 'subprocess' module in Python 2.4 has made creating and accessing subprocess streams in Python relatively convenient for all supported platforms, but what if you want to interact with the started subprocess? That is, what if you want to send a command, read the response, and send a new command based on that response?

Now there is a solution. The included subprocess.Popen subclass adds three new commonly used methods: recv(maxsize=None), recv_err(maxsize=None), and send(input), along with a utility method: send_recv(input='', maxsize=None).

recv() and recv_err() both read at most maxsize bytes from the started subprocess. send() sends strings to the started subprocess. send_recv() will send the provided input, and read up to maxsize bytes from both stdout and stderr.

If any of the pipes are closed, the attributes for those pipes will be set to None, and the methods will return None.

v. 1.3 fixed a few bugs relating to *nix support v. 1.4,5 fixed initialization on all platforms, a few bugs relating to Windows support, added two utility functions, and added an example of how to use this module. v. 1.6 fixed linux _recv() and test initialization thanks to Yuri Takhteyev at Stanford. v. 1.7 removed _setup() and __init__() and fixed subprocess unittests thanks to Antonio Valentino. Added 4th argument 'tr' to recv_some(), which is, approximately, the number of times it will attempt to recieve data. Added 5th argument 'stderr' to recv_some(), where when true, will recieve from stderr. Cleaned up some pipe closing. v. 1.8 Fixed missing self. parameter in non-windows _recv method thanks to comment. v. 1.9 Fixed fcntl calls for closed handles.

Python, 163 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import subprocess
import errno
import time
import sys

PIPE = subprocess.PIPE

if subprocess.mswindows:
    from win32file import ReadFile, WriteFile
    from win32pipe import PeekNamedPipe
    import msvcrt
else:
    import select
    import fcntl

class Popen(subprocess.Popen):
    def recv(self, maxsize=None):
        return self._recv('stdout', maxsize)
    
    def recv_err(self, maxsize=None):
        return self._recv('stderr', maxsize)

    def send_recv(self, input='', maxsize=None):
        return self.send(input), self.recv(maxsize), self.recv_err(maxsize)

    def get_conn_maxsize(self, which, maxsize):
        if maxsize is None:
            maxsize = 1024
        elif maxsize < 1:
            maxsize = 1
        return getattr(self, which), maxsize
    
    def _close(self, which):
        getattr(self, which).close()
        setattr(self, which, None)
    
    if subprocess.mswindows:
        def send(self, input):
            if not self.stdin:
                return None

            try:
                x = msvcrt.get_osfhandle(self.stdin.fileno())
                (errCode, written) = WriteFile(x, input)
            except ValueError:
                return self._close('stdin')
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close('stdin')
                raise

            return written

        def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None
            
            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except ValueError:
                return self._close(which)
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise
            
            if self.universal_newlines:
                read = self._translate_newlines(read)
            return read

    else:
        def send(self, input):
            if not self.stdin:
                return None

            if not select.select([], [self.stdin], [], 0)[1]:
                return 0

            try:
                written = os.write(self.stdin.fileno(), input)
            except OSError, why:
                if why[0] == errno.EPIPE: #broken pipe
                    return self._close('stdin')
                raise

            return written

        def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None
            
            flags = fcntl.fcntl(conn, fcntl.F_GETFL)
            if not conn.closed:
                fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
            
            try:
                if not select.select([conn], [], [], 0)[0]:
                    return ''
                
                r = conn.read(maxsize)
                if not r:
                    return self._close(which)
    
                if self.universal_newlines:
                    r = self._translate_newlines(r)
                return r
            finally:
                if not conn.closed:
                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)

message = "Other end disconnected!"

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)
    
def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')
    
    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()

In wxPython, one can use wxExecute and wxProcess to the same effect, though then one is required to use wxPython.

With the above subclass, writing a multi-platform 'expect' module would be relatively straightforward.

I had originally used a variant of the above for running arbitrary commands in a version of PyPE. Ultimately I went with a wxExecute/wxProcess combination due to the convenience of being notified when a process terminates, not having to ship subprocess.py for use with Python 2.3, and not needing for Windows users to install pywin32.

Both have an issue in that some commands will buffer their output strangely in Windows (though I have not observed such behavior on *nix, that doesn't mean that such platforms are immune). For example, running some python scripts via either method results in no observed output from python until python closes (even when a fairly large volume of data is printed via Python), neither cygwin bash nor sh display their prompts, etc.

25 comments

Richard Philips 18 years, 6 months ago  # | flag

Useful addition to the standard library. There exist already some 'expect'-like modules but they would all benefit from a 'non-pty' based approach.

The author's subprocess module is a gem and I really hope that the functionality described in the above subclass would become part of the Python standard library.

John Pywtorak 18 years, 6 months ago  # | flag

Possible alternative unix approach. I was successful using nonblocking io, for example.

flags = fcntl.fcntl(subprocess.stdout, fcntl.F_GETFL)
fcntl.fcntl(subprocess.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)

Once the process instance is created change its stdout to nonblocking reads. I am aware of one downside to this approach is the resource temporarily unavailable exceptions upon reading. These can be safely ignored. Another may be the need to poll the subprocess, so I suspect the select method with 0 for the timeout is more appropriate.

Walter Spiegel 18 years, 4 months ago  # | flag

Error Message under winXP. Instead of calling "get_conn_maxsize(which, maxsize)"

in the function _recv

I use "self.get_conn_maxsize(which, maxsize)"

Otherwise I get an error message

Josiah Carlson (author) 18 years, 4 months ago  # | flag

Sorry about that, I had forgotten the self. portion during a refactoring.

Josiah Carlson (author) 18 years, 4 months ago  # | flag

Well, select with 0 for a timeout only tells you if data is ready. If one attempts to read more than is available, it will block. I've included the fcntl trick to change the pipes to non-blocking. Thank you!

yaipa haa 18 years, 4 months ago  # | flag

Maybe a Use Case Example? Sorry, maybe too much to ask for... But a couple examples would be very helpful. Maybe one executing a file and another executing an OS call such as 'dir' or 'echo'.

Thanks.

--Alan

Josiah Carlson (author) 18 years, 4 months ago  # | flag

I've added a bit to the end of the recipe.

Donovan Baarda 18 years, 3 months ago  # | flag

non-blocking not required. You don't need to set non-blocking mode. This is a common missconception.

os.read and os.write only block in blocking mode if nothing could be read or written. Otherwise they will not block and will do a partial read/write. You are already checking with select() that at least something could be read/written, so they will not block, even without non-blocking mode.

Bret Aarden 18 years, 2 months ago  # | flag

Python buffering in Windows. I was able to avoid buffering problems with Windows Python by invoking Python with the '-u' option.

Bret Aarden 18 years, 2 months ago  # | flag

Python buffering in Windows. I was able to avoid buffering problems with Windows Python by invoking the Python subprocess with the '-u' option.

Josiah Carlson (author) 18 years, 2 months ago  # | flag

That doesn't always work.

Rob McMullen 18 years, 2 months ago  # | flag

Long running processes. Nice piece of code. I was dealing with the output from a long running process and found recv_some() would occasionally freeze. If the proc.recv() happened to take long enough, time.time() could be greater than x, which results in a negative argument for time.sleep(). It apparently tries to sleep forever in this case. I just added a check:

delay=(x-time.time())/5
if delay>0:
    time.sleep(delay)

Also, on Linux, I had to add 'import select'. Nice work, though, definitely saved me some time.

rex senegalus 17 years, 11 months ago  # | flag

Can't capture child output. After adding "import select", the code runs (SUSE 10). However, changing the line:

shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

to

shell, commands, tail = ('sh', ('./mttst.py'), '\n')

results in an immediate error "Other end disconnected!"

mttst.py is executable and runs from the CL.

! /usr/bin/python

import time

print "mttst.py started"

for i in range(5):

the next two lines should be indented. Don't know how.

print time.time()


time.sleep(1)

print "mttst.py finished"

Suggestions on what I'm doing wrong much appreciated.

Antonio Valentino 17 years, 11 months ago  # | flag

Standard subprocess unittest. The _setup method raises an exception if one of the stdin, stdout, stderr file descriptors is None.

Setting non-blocking mode at sub-process startup (POSIX case) seems to break the unit tests (test_subproces.py) of the standard subprocess module. I set the non-blocking flag in _send and _recv methods and restore the original ones before they returns.

def _recv(self, which, maxsize):
    conn, maxsize = self.get_conn_maxsize(which, maxsize)
    if conn is None:
        return None

    flags = fcntl.fcntl(conn, fcntl.F_GETFL)
    fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)

    try:
        if not select.select([conn], [], [], 0)[0]:
            return ''

        r = conn.read(maxsize)
        if not r:
            conn.close()
            setattr(self, which, None)
            return None

        if self.universal_newlines:
            r = self._translate_newlines(r)
        return r
    finally:
        fcntl.fcntl(conn, fcntl.F_SETFL, flags)

This fixes the problem. The _setup method now is no more needed.

Josiah Carlson (author) 17 years, 11 months ago  # | flag

If you look at other pieces of the demo, it asumes each item run will execute almost instaneously. In this case, the program it is attempting to run takes around 5 seconds (plus the starting up of Python, which hopefully should be fast).

Even if it did run successfully, it wouldn't necessarily print everything as output (especially considering Python's sometimes wonky stdout buffering semantics on certain platforms).

I tried running it on Windows, and it printed out the "starting" message along with the first timestamp. Then it paused for 5 seconds waiting for the subprocess to close (while not recieving information), and quit without exception.

Josiah Carlson (author) 17 years, 11 months ago  # | flag

Thank you!

Anthon van der Neut 17 years, 5 months ago  # | flag

forgot 'self.' It looks like you used this code (I did a visual compare if this code was included that could have been more clear from your answer).

But actually 'self.' is missing in the non mswindows version:

def _recv(self, which, maxsize):
     conn, maxsize = self.get_conn_maxsize(which, maxsize)

So you cannot currently download the code and have it running on non-windows.

Josiah Carlson (author) 17 years, 4 months ago  # | flag

Fixed the code that you mention. Thank you.

Andreas Floeter 17 years, 4 months ago  # | flag

Running your examples gives me a "ValueError: I/O operation on closed file" When I run your example on Linux with Python 2.4.4 I get an exception. The stack dump is

HELLO WORLD
Traceback (most recent call last):
  File "./aspn_440554.py", line 167, in ?
    print recv_some(a, e=0)
  File "./aspn_440554.py", line 136, in recv_some
    r = pr()
  File "./aspn_440554.py", line 26, in recv
    return self._recv('stdout', maxsize)
  File "./aspn_440554.py", line 123, in _recv
    fcntl.fcntl(conn, fcntl.F_SETFL, flags)
ValueError: I/O operation on closed file

Changing the line 123 to

if not conn.closed: fcntl.fcntl(conn, fcntl.F_SETFL, flags)

fixes the problem for me.

Josiah Carlson (author) 17 years, 3 months ago  # | flag

I modified a few more lines to fix it for all fcntl calls in the _recv method.

Noah Spurrier 16 years, 8 months ago  # | flag

Have you tried Pexpect? Take a look at

http://pexpect.sourceforge.net/

for a similar (better?) module -- 100% Python, optimized for speed, very flexible.

Steven Knight 16 years, 3 months ago  # | flag

Pexpect doesn't work on vanilla Windows, so it's not a fully portable solution (although it is more fully-featured, especially the valuable expect() method).

Tennis Smith 15 years, 11 months ago  # | flag

Control Keys? This is great, but what happens if a subordinate process needs to be interrupted? Since the examples use a shell, would there be any way to send control charaters (like ^C) to the shell?

dspears 15 years, 7 months ago  # | flag

I don't do much with Python, but recently needed to use this useful recipe. I found that I needed to add something to wait for a specific string to appear in the output prior to continuing. In looking at the output I needed to ignore the echo of the command I was sending to the shell, and only look at the output that results from the execution of the command. Here's what I added:

def recv_some_restring(p, prog, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
            # print y
            result = prog.search(''.join(y))
            if result:
                print '<>'
                break
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def doCommandAndWaitfor(cmd, response, t=.1, e=1, tr=5, stderr=0):
    send_all(a,cmd+tail)
    waitfor = '[.\r\n]*'+cmd+'[.\r\n]*'+response
    prog = re.compile(waitfor)
    print recv_some_restring(a, prog, t, e, tr, stderr)

(I just noticed that I have \r\n in the regular expression, so that needs to be fixed for portability to *nix systems).

Mike Kazantsev 14 years, 10 months ago  # | flag

Every sleep() call is actually a waste of time, because sleeping IS implemented in the reactor (select), but not as "sleep for N secs then check" but "sleep until check", so the code can be easily refactored without any "tries" count or "sleep".

Below method is a piece of async wrapper class, just for reference. "self._poll_in" here is py2.6 epoll reactor object in level-triggering mode.

def read(self, bs=-1, to=-1, status=False):
    '''Read until timeout or size == bs. Returns read buffer or (buffer, return_condition) if status flag is set.'''
    if to < 0: return self.reads(bs) # use sync I/O (regular read method)
    try:
        flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
        fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        deadline = time() + to
        buff = buffer('') # several orders faster than strings
        while bs:
            try: fd, event = self._poll_in.poll(to, 1)[0] # get first event
            except IndexError: # there's nothing (more) to read
                if status: status = Timeout
                break
            else: ext = self.reads(min(bs, self.bs_max) if bs > 0 else self.bs_default) # min() for G+ reads
            buff += ext
            bs -= len(ext)
            to = deadline - time()
            if to < 0: # time's up
                if status: status = Timeout
                break
        else:
            if status: status = Limit
    finally:
        try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags) # restore blocking state
        except: pass # in case there was an error, caused by wrong fd/pipe (not to raise another one)
    return buff if not report else (buff, status)