Welcome, guest | Sign In | My Account | Store | Cart
# -*- coding: utf-8 -*-
from twisted.internet.defer import Deferred
from twisted.internet.error import ProcessDone
from twisted.internet.protocol import ProcessProtocol

class SubprocessProtocol(ProcessProtocol):
    outBuffer = ""
    errBuffer = ""

    def connectionMade(self):
        self.d = Deferred()

    def outReceived(self, data):
        self.outBuffer += data

    def errReceived(self, data):
        self.errBuffer += data

    def processEnded(self, reason):
        if reason.check(ProcessDone):
            self.d.callback(self.outBuffer)
        else:
            self.d.errback(reason)

def async_check_output(args, ireactorprocess=None):
    """
    :type args: list of str
    :type ireactorprocess: :class: twisted.internet.interfaces.IReactorProcess
    :rtype: Deferred
    """
    if ireactorprocess is None:
        from twisted.internet import reactor
        ireactorprocess = reactor

    pprotocol = SubprocessProtocol()
    ireactorprocess.spawnProcess(pprotocol, args[0], args)
    return pprotocol.d

# actual code ends here, unit tests follow and may be omitted or saved in separate file, don't forget to remove appropriate
# comments 
# -*- coding: utf-8 -*-
from twisted.internet.error import ProcessTerminated

from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks
#from twisted_subprocess import async_check_output


class TestSpawning(TestCase):

    @inlineCallbacks
    def test_check_output_returns_command_output_if_success(self):
        self.assertEquals("hello world", (yield async_check_output(["echo", "hello world"])).strip())

    def test_check_output_calls_errback_if_exit_status_not_zero(self):
        return self.assertFailure(async_check_output(["false"]), ProcessTerminated)

    def test_check_output_returns_errback_if_nonexisting_executable(self):
        return self.assertFailure(async_check_output(["sdfsdfdsf329909092"]), ProcessTerminated)

History