# -*- coding: utf-8 -*- from twisted.internet.defer import Deferred from twisted.internet.error import ProcessDone from twisted.internet.protocol import ProcessProtocol class SubprocessProtocol(ProcessProtocol): outBuffer = "" errBuffer = "" def connectionMade(self): self.d = Deferred() def outReceived(self, data): self.outBuffer += data def errReceived(self, data): self.errBuffer += data def processEnded(self, reason): if reason.check(ProcessDone): self.d.callback(self.outBuffer) else: self.d.errback(reason) def async_check_output(args, ireactorprocess=None): """ :type args: list of str :type ireactorprocess: :class: twisted.internet.interfaces.IReactorProcess :rtype: Deferred """ if ireactorprocess is None: from twisted.internet import reactor ireactorprocess = reactor pprotocol = SubprocessProtocol() ireactorprocess.spawnProcess(pprotocol, args[0], args) return pprotocol.d # actual code ends here, unit tests follow and may be omitted or saved in separate file, don't forget to remove appropriate # comments # -*- coding: utf-8 -*- from twisted.internet.error import ProcessTerminated from twisted.trial.unittest import TestCase from twisted.internet.defer import inlineCallbacks #from twisted_subprocess import async_check_output class TestSpawning(TestCase): @inlineCallbacks def test_check_output_returns_command_output_if_success(self): self.assertEquals("hello world", (yield async_check_output(["echo", "hello world"])).strip()) def test_check_output_calls_errback_if_exit_status_not_zero(self): return self.assertFailure(async_check_output(["false"]), ProcessTerminated) def test_check_output_returns_errback_if_nonexisting_executable(self): return self.assertFailure(async_check_output(["sdfsdfdsf329909092"]), ProcessTerminated)