Welcome, guest | Sign In | My Account | Store | Cart

As any twisted user knows, the original python subprocess module can yield to interferences with twisted's own reactor - at least unless installSignalHandlers is false, which can lead to other consequences.

This recipe simulates a stripped down version of subprocess.check_output() which returns a deferred and is twisted friendly.

Python, 59 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-
from twisted.internet.defer import Deferred
from twisted.internet.error import ProcessDone
from twisted.internet.protocol import ProcessProtocol

class SubprocessProtocol(ProcessProtocol):
    outBuffer = ""
    errBuffer = ""

    def connectionMade(self):
        self.d = Deferred()

    def outReceived(self, data):
        self.outBuffer += data

    def errReceived(self, data):
        self.errBuffer += data

    def processEnded(self, reason):
        if reason.check(ProcessDone):
            self.d.callback(self.outBuffer)
        else:
            self.d.errback(reason)

def async_check_output(args, ireactorprocess=None):
    """
    :type args: list of str
    :type ireactorprocess: :class: twisted.internet.interfaces.IReactorProcess
    :rtype: Deferred
    """
    if ireactorprocess is None:
        from twisted.internet import reactor
        ireactorprocess = reactor

    pprotocol = SubprocessProtocol()
    ireactorprocess.spawnProcess(pprotocol, args[0], args)
    return pprotocol.d

# actual code ends here, unit tests follow and may be omitted or saved in separate file, don't forget to remove appropriate
# comments 
# -*- coding: utf-8 -*-
from twisted.internet.error import ProcessTerminated

from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks
#from twisted_subprocess import async_check_output


class TestSpawning(TestCase):

    @inlineCallbacks
    def test_check_output_returns_command_output_if_success(self):
        self.assertEquals("hello world", (yield async_check_output(["echo", "hello world"])).strip())

    def test_check_output_calls_errback_if_exit_status_not_zero(self):
        return self.assertFailure(async_check_output(["false"]), ProcessTerminated)

    def test_check_output_returns_errback_if_nonexisting_executable(self):
        return self.assertFailure(async_check_output(["sdfsdfdsf329909092"]), ProcessTerminated)

Twisted's own ProcessProtocol/spawnProcess() system is a bit verbose for simple use cases, where you just want to spawn a process and wait for its output.

just call async_check_output with the required args, and a deferred will be returned. If the process exits without issues, you'll get its stdout as callback; otherwise a failure will be sent in the errback.