Welcome, guest | Sign In | My Account | Store | Cart
#! /usr/bin/env python
"""
silence.py
Peter Waller
March 2010
"""


from __future__ import with_statement

from contextlib import contextmanager, nested
from threading import Thread

from tempfile import mkdtemp
from os.path import join as pjoin
from os import (dup, fdopen, open as osopen, O_NONBLOCK, O_RDONLY, remove,
                rmdir
, mkfifo)
from fcntl import fcntl, F_GETFL, F_SETFL
from select import select
from sys import stdout, stderr

from ctypes import PyDLL, CDLL, c_void_p, c_char_p, py_object

pyapi
= PyDLL(None)
this_exe
= CDLL(None)

def make_fn(what, res, *args):
    what
.restype = res
    what
.argtypes = args
   
return what
   
FILE_p
= c_void_p
   
PyFile_AsFile = make_fn(pyapi.PyFile_AsFile, FILE_p, py_object)
freopen
= make_fn(this_exe.freopen, FILE_p, c_char_p, c_char_p, FILE_p)

@contextmanager
def fifo():
   
"""
    Create a fifo in a temporary place.
    """

    tmpdir
= mkdtemp()
    filename
= pjoin(tmpdir, 'myfifo')
   
try:
        mkfifo
(filename)
   
except OSError, e:
       
print >>stderr, "Failed to create FIFO: %s" % e
       
raise
   
else:
       
yield filename
        remove
(filename)
        rmdir
(tmpdir)

def reader_thread_func(filename, filter_, real_stdout):
   
"""
    Sit there, reading lines from the pipe `filename`, sending those for which
    `filter_()` returns False to `real_stdout`
    """

   
with fdopen(osopen(filename, O_NONBLOCK | O_RDONLY)) as fd:
       
while True:
            rlist
, _, _ = select([fd], [], [])
           
            line
= fd.readline()
           
if not line:
               
break
               
           
elif not filter_(line):
                real_stdout
.write(line)

@contextmanager
def threaded_file_reader(*args):
   
"""
    Operate a read_thread_func in another thread. Block with statement exit
    until the function completes.
    """

    reader_thread
= Thread(target=reader_thread_func, args=args)
    reader_thread
.start()
   
try:
       
yield
   
finally:
        reader_thread
.join()

@contextmanager
def silence(filter_=lambda line: True, file_=stdout):
   
"""
    Prevent lines matching `filter_` ending up on `file_` (defaults to stdout)
    """

   
if not filter_:
       
yield
       
return
   
    saved_stdout
= dup(file_.fileno())
    stdout_file
= PyFile_AsFile(file_)
   
   
with nested(fdopen(saved_stdout, "w"), fifo()) as (real_stdout, filename):
       
with threaded_file_reader(filename, filter_, real_stdout):
           
# Redirect stdout to pipe
            freopen
(filename, "w", stdout_file)
           
try:
               
yield
           
finally:
               
# Redirect stdout back to it's original place
                freopen
("/dev/fd/%i" % saved_stdout, "w", stdout_file)

def test():
   
   
def filter_stupid(line):
       
if line.startswith("Stupid"):
           
return True
           
   
print "Before with block.."
   
   
with silence(filter_stupid):
       
print "Stupid output from a C library I don't want to hear"
       
print "Sensible stuff!"
       
   
print "After the silence block"

if __name__ == "__main__":
    test
()

History

  • revision 2 (14 years ago)
  • previous revisions are not available