Welcome, guest | Sign In | My Account | Store | Cart

A python class which "watches" a directory and calls a callback(filename, lines) function every time one of the files being watched gets written, in real time.

Practically speaking, this can be compared to "tail -F *.log" UNIX command, but instead of having lines printed to stdout a python function gets called.

Similarly to tail, it takes care of "watching" new files which are created after initialization and "unwatching" those ones which are removed in the meantime. This means you'll be able to "follow" and support also rotating log files.

History

  • rev5 (2013-04-05):
    • sizehint parameter
  • rev4 (2013-03-16):
    • python 3 support (also dropped support for python <= 2.5)
    • windows support
    • unit tests
    • main class can also be used as a context manager
  • rev3 (2012-01-13): initial release
Try it out on your machine

Run the command below in your terminal to instantly set up a sandboxed dev environment with this recipe.
You can view the complete code in the github repository for this recipe.

Python, 352 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#!/usr/bin/env python

"""
Real-time log files watcher supporting log rotation.
Works with Python >= 2.6 and >= 3.2, on both POSIX and Windows.

Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""

import os
import time
import errno
import stat
import sys


class LogWatcher(object):
    """Looks for changes in all files of a directory.
    This is useful for watching log file changes in real-time.
    It also supports files rotation.

    Example:

    >>> def callback(filename, lines):
    ...     print(filename, lines)
    ...
    >>> lw = LogWatcher("/var/log/", callback)
    >>> lw.loop()
    """

    def __init__(self, folder, callback, extensions=["log"], tail_lines=0,
                       sizehint=1048576):
        """Arguments:

        (str) @folder:
            the folder to watch

        (callable) @callback:
            a function which is called every time one of the file being
            watched is updated;
            this is called with "filename" and "lines" arguments.

        (list) @extensions:
            only watch files with these extensions

        (int) @tail_lines:
            read last N lines from files being watched before starting

        (int) @sizehint: passed to file.readlines(), represents an
            approximation of the maximum number of bytes to read from
            a file on every ieration (as opposed to load the entire
            file in memory until EOF is reached). Defaults to 1MB.
        """
        self.folder = os.path.realpath(folder)
        self.extensions = extensions
        self._files_map = {}
        self._callback = callback
        self._sizehint = sizehint
        assert os.path.isdir(self.folder), self.folder
        assert callable(callback), repr(callback)
        self.update_files()
        for id, file in self._files_map.items():
            file.seek(os.path.getsize(file.name))  # EOF
            if tail_lines:
                try:
                    lines = self.tail(file.name, tail_lines)
                except IOError as err:
                    if err.errno != errno.ENOENT:
                        raise
                else:
                    if lines:
                        self._callback(file.name, lines)

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

    def __del__(self):
        self.close()

    def loop(self, interval=0.1, blocking=True):
        """Start a busy loop checking for file changes every *interval*
        seconds. If *blocking* is False make one loop then return.
        """
        # May be overridden in order to use pyinotify lib and block
        # until the directory being watched is updated.
        # Note that directly calling readlines() as we do is faster
        # than first checking file's last modification times.
        while True:
            self.update_files()
            for fid, file in list(self._files_map.items()):
                self.readlines(file)
            if not blocking:
                return
            time.sleep(interval)

    def log(self, line):
        """Log when a file is un/watched"""
        print(line)

    def listdir(self):
        """List directory and filter files by extension.
        You may want to override this to add extra logic or globbing
        support.
        """
        ls = os.listdir(self.folder)
        if self.extensions:
            return [x for x in ls if os.path.splitext(x)[1][1:] \
                                           in self.extensions]
        else:
            return ls

    @classmethod
    def open(cls, file):
        """Wrapper around open().
        By default files are opened in binary mode and readlines()
        will return bytes on both Python 2 and 3.
        This means callback() will deal with a list of bytes.
        Can be overridden in order to deal with unicode strings
        instead, like this:

          import codecs, locale
          return codecs.open(file, 'r', encoding=locale.getpreferredencoding(),
                             errors='ignore')
        """
        return open(file, 'rb')

    @classmethod
    def tail(cls, fname, window):
        """Read last N lines from file fname."""
        if window <= 0:
            raise ValueError('invalid window value %r' % window)
        with cls.open(fname) as f:
            BUFSIZ = 1024
            # True if open() was overridden and file was opened in text
            # mode. In that case readlines() will return unicode strings
            # instead of bytes.
            encoded = getattr(f, 'encoding', False)
            CR = '\n' if encoded else b'\n'
            data = '' if encoded else b''
            f.seek(0, os.SEEK_END)
            fsize = f.tell()
            block = -1
            exit = False
            while not exit:
                step = (block * BUFSIZ)
                if abs(step) >= fsize:
                    f.seek(0)
                    newdata = f.read(BUFSIZ - (abs(step) - fsize))
                    exit = True
                else:
                    f.seek(step, os.SEEK_END)
                    newdata = f.read(BUFSIZ)
                data = newdata + data
                if data.count(CR) >= window:
                    break
                else:
                    block -= 1
            return data.splitlines()[-window:]

    def update_files(self):
        ls = []
        for name in self.listdir():
            absname = os.path.realpath(os.path.join(self.folder, name))
            try:
                st = os.stat(absname)
            except EnvironmentError as err:
                if err.errno != errno.ENOENT:
                    raise
            else:
                if not stat.S_ISREG(st.st_mode):
                    continue
                fid = self.get_file_id(st)
                ls.append((fid, absname))

        # check existent files
        for fid, file in list(self._files_map.items()):
            try:
                st = os.stat(file.name)
            except EnvironmentError as err:
                if err.errno == errno.ENOENT:
                    self.unwatch(file, fid)
                else:
                    raise
            else:
                if fid != self.get_file_id(st):
                    # same name but different file (rotation); reload it.
                    self.unwatch(file, fid)
                    self.watch(file.name)

        # add new ones
        for fid, fname in ls:
            if fid not in self._files_map:
                self.watch(fname)

    def readlines(self, file):
        """Read file lines since last access until EOF is reached and
        invoke callback.
        """
        while True:
            lines = file.readlines(self._sizehint)
            if not lines:
                break
            self._callback(file.name, lines)

    def watch(self, fname):
        try:
            file = self.open(fname)
            fid = self.get_file_id(os.stat(fname))
        except EnvironmentError as err:
            if err.errno != errno.ENOENT:
                raise
        else:
            self.log("watching logfile %s" % fname)
            self._files_map[fid] = file

    def unwatch(self, file, fid):
        # File no longer exists. If it has been renamed try to read it
        # for the last time in case we're dealing with a rotating log
        # file.
        self.log("un-watching logfile %s" % file.name)
        del self._files_map[fid]
        with file:
            lines = self.readlines(file)
            if lines:
                self._callback(file.name, lines)

    @staticmethod
    def get_file_id(st):
        if os.name == 'posix':
            return "%xg%x" % (st.st_dev, st.st_ino)
        else:
            return "%f" % st.st_ctime

    def close(self):
        for id, file in self._files_map.items():
            file.close()
        self._files_map.clear()


# ===================================================================
# --- tests
# ===================================================================

if __name__ == '__main__':
    import unittest
    import atexit

    TESTFN = '$testfile.log'
    TESTFN2 = '$testfile2.log'
    PY3 = sys.version_info[0] == 3

    if PY3:
        def b(s):
            return s.encode("latin-1")
    else:
        def b(s):
            return s

    class TestLogWatcher(unittest.TestCase):

        def setUp(self):
            def callback(filename, lines):
                self.filename.append(filename)
                for line in lines:
                    self.lines.append(line)

            self.filename = []
            self.lines = []
            self.file = open(TESTFN, 'w')
            self.watcher = LogWatcher(os.getcwd(), callback)

        def tearDown(self):
            self.watcher.close()
            self.remove_test_files()

        def write_file(self, data):
            self.file.write(data)
            self.file.flush()

        @staticmethod
        @atexit.register
        def remove_test_files():
            for x in [TESTFN, TESTFN2]:
                try:
                    os.remove(x)
                except EnvironmentError:
                    pass

        def test_no_lines(self):
            self.watcher.loop(blocking=False)

        def test_one_line(self):
            self.write_file('foo')
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo"])
            self.assertEqual(self.filename, [os.path.abspath(TESTFN)])

        def test_two_lines(self):
            self.write_file('foo\n')
            self.write_file('bar\n')
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo\n", b"bar\n"])
            self.assertEqual(self.filename, [os.path.abspath(TESTFN)])

        def test_new_file(self):
            with open(TESTFN2, "w") as f:
                f.write("foo")
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo"])
            self.assertEqual(self.filename, [os.path.abspath(TESTFN2)])

        def test_file_removed(self):
            self.write_file("foo")
            try:
                os.remove(TESTFN)
            except EnvironmentError:  # necessary on Windows
                pass
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo"])

        def test_tail(self):
            MAX = 10000
            content = '\n'.join([str(x) for x in range(0, MAX)])
            self.write_file(content)
            # input < BUFSIZ (1 iteration)
            lines = self.watcher.tail(self.file.name, 100)
            self.assertEqual(len(lines), 100)
            self.assertEqual(lines, [b(str(x)) for x in range(MAX-100, MAX)])
            # input > BUFSIZ (multiple iterations)
            lines = self.watcher.tail(self.file.name, 5000)
            self.assertEqual(len(lines), 5000)
            self.assertEqual(lines, [b(str(x)) for x in range(MAX-5000, MAX)])
            # input > file's total lines
            lines = self.watcher.tail(self.file.name, MAX + 9999)
            self.assertEqual(len(lines), MAX)
            self.assertEqual(lines, [b(str(x)) for x in range(0, MAX)])
            #
            self.assertRaises(ValueError, self.watcher.tail, self.file.name, 0)
            LogWatcher.tail(self.file.name, 10)

        def test_ctx_manager(self):
            with self.watcher:
                pass


    test_suite = unittest.TestSuite()
    test_suite.addTest(unittest.makeSuite(TestLogWatcher))
    unittest.TextTestRunner(verbosity=2).run(test_suite)

Basic usage

same as: tail -F /var/log/*.log

def callback(filename, lines):
    for line in lines:
        print(line)

watcher = LogWatcher("/var/log/", callback)
watcher.loop()

Also read last N lines from files before start watching

same as: tail -F /var/log/*.log -n 20

def callback(filename, lines):
    for line in lines:
        print(line)

watcher = LogWatcher("/var/log/", callback, tail_lines=20)
watcher.loop()

Tail last N lines from a single file only

same as: tail -n 10 foo.log

LogWatcher.tail('foo.log', 10)

Non blocking

import time

def callback(filename, lines):
    for line in lines:
        print(line)

watcher = LogWatcher("/var/log/", callback)
while 1:
    print("loop")
    watcher.loop(blocking=False)
    time.sleep(0.1)

Coloured logs

In case your python application is using the logging module you might want to monitor what it's doing in real time and have a coloured ouput. Assuming your log format is configured as such:

 import logging
 logging.basicConfig(level=logging.DEBUG,
                     format='[%(levelname)1.1s %(asctime)s] %(message)s',)

...you'll have log lines looking like this:

[I 2011-11-29 19:26:44,774] info message
[D 2011-11-29 19:26:44,774] debug message
[E 2011-11-29 19:26:44,774] some error message

The code below is able to parse this syntax and add shell colors, including unhandled exception tracebacks which aren't logged via logging.error():

RED = "31m"
BLUE = "34m"
GREEN = "32m"
YELLOW = "33m"
MAGENTA = "35m"

def coloured(s, color):
    return '\033[1;%s%s\033[1;m' % (color, s)

def callback(filename, lines):
    while lines:
        line = lines.pop(0).rstrip()
        noheader = False
        if line.startswith("[E ") or line.startswith("Traceback"):
            color = RED
        elif line.startswith("[D "):
            color = BLUE
        elif line.startswith("[I "):
            color = GREEN
        elif line.startswith("[W "):
            color = YELLOW
        else:
            noheader = True
            color = MAGENTA

        if noheader:
            print(line)
        else:
            endheader = line.find(']')
            header = coloured(line[0:endheader + 1], color)
            line = line[endheader + 1:]
            print(header + line)

watcher = LogWatcher("/var/log/", callback, tail_lines=10)
watcher.loop()

29 comments

qigang 10 years ago  # | flag

thanks a lot.this is what I need.I will get it and use it.

Good job!

qigang 10 years ago  # | flag

but,why just give the files list?use folder and extensions,is not easy to use.

'extensions' parameter was added to avoid subclassing (passing a parameter is easier/quicker). If you need total control you can override listdir() method in your subclass and do whatever you want in there (e.g. add globbing support or watch multiple folders).

Steve Romanow 9 years, 11 months ago  # | flag

Thank you very much. I needed exactly this today. YMMD.

leonardo turtule 9 years, 3 months ago  # | flag

Many thanks, I was using this class with python 3.3 and it was working well. Now I have to use Python 2.7, and it seems to monitor only one file per folder. Did you know if it can be compatible with 2.7? Regards

Giampaolo Rodolà (author) 9 years, 3 months ago  # | flag

Ciao Leonardo. Actually this is supposed to work with Python 2.x only. Not sure why it seems to monitor only one file per folder: have you tried to debug listdir() method?

leonardo turtule 9 years, 3 months ago  # | flag

Hi Giampaolo, thanks for the quick answer. I 've tried to debug the lisdir() and the list is complete with all the files needed. Maybe i encounter the issue in the update_files : the names of the files seems to be ok but they have all the same fid when i print the "ls".

Giampaolo Rodolà (author) 9 years, 3 months ago  # | flag

Different files with the same id might mean they are symlinks pointing to the same file, which is therefore seen as a single entity. Try to look into why get_file_id() returns the same value for different files. Another possibility is you're using some exotic filesystem upon which get_file_id() is unreliaable.

leonardo turtule 9 years, 3 months ago  # | flag

You're right, i'm using NTFS filesystems(quite exotic i agreed :)) and with os.stat function it results : st_ino=0L, st_dev=0 which is a normal behavior for windows(after some research). I don't know why it works fine with python 3.3. If I find a solution to use it with windows, i will post another comment. Thanks for your help.

Giampaolo Rodolà (author) 9 years, 3 months ago  # | flag

I've never tried this on Windows so it's entirely possible it doesn't work. And yes, st_ino and st_dev can't be used to identify a file uniquely on Windows. I think you can try st_ctime (file creation time) as in:

@staticmethod
def get_file_id(st):
    if os.name == 'posix':
        return "%xg%x" % (st.st_dev, st.st_ino)
    else:
        return "%s" % st_ctime

Please let me know if it fixes the issue in which case I'll update the recipe (and while I'm at it I'll also add python 3 support).

Giampaolo Rodolà (author) 9 years, 3 months ago  # | flag

typo in the code above:

- return "%s" % st_ctime
+ return "%s" % st.st_ctime
leonardo turtule 9 years, 3 months ago  # | flag

Yes it works with windows! many thanks. For the python 3 support some changes must be made : print(something) instead of print something and except ... as err instead of except ..., err

Jianfei WANG 9 years, 1 month ago  # | flag

While reading the code, I have some small questions about it in detail:

For seeking to the end of file, you used 2 ways:

  file.seek(os.path.getsize(file.name))  # EOF

and

  f.seek(0, os.SEEK_END)

Is there any difference of these 2 methods?

Another one is in tail() method, I see that you open the file to read some line from tail, but you don't close that file. Is that on purpose, or just because it doesn't matter if the file is closed after the static method exited?

BTW, thanks for great work of this helpful recipe.

Giampaolo Rodolà (author) 9 years, 1 month ago  # | flag

The two methods are equivalent. For what it's worth, I bet f.seek(0, os.SEEK_END) is a bit faster. I don't close the file in tail() method just because I forgot to. The garbage collector will automatically do that sooner or later though.

Jianfei WANG 9 years, 1 month ago  # | flag

Thanks! As a learner of python, I would really like to treat these small details carefully. :)

Totally King 8 years, 10 months ago  # | flag

A. Thx alot for your code. But It has a flaw as has my own one for watching rotating log files: As long as there is a sleep interval there is a (remote) chance that a few lines of the 'old' log file are lost and the 'new' log file is already read in ... And the problem is the smaller the interval (the less likely the loss of lines, BUT) the higher the CPU load ... Do you know any work around?

B. What do you think of my modification of your tail method?

def tail(fname, window):
    """Read last N lines from file fname."""
    try:
        f = open(fname, 'r')
    except IOError, err:
        if err.errno == errno.ENOENT:
            return []
        else:
            raise
    else:
        BUFSIZ = 1024
        f.seek(0, os.SEEK_END)
        fsize = f.tell()
        block = -1
        data = ''
        exitLoop = False
        while not exitLoop:
            step = (block * BUFSIZ)
            if abs(step) >= fsize:
                f.seek(0)
                newdata = f.read(BUFSIZ-(step-fsize))
                exitLoop = True
            else:
                f.seek(step, os.SEEK_END)
                newdata = f.read(BUFSIZ)
            if data == '':
                newdata = newdata.rstrip('\n')
            data = newdata + data
            if data.count('\n') >= window:
                break
            else:
                block -= 1
        return data.splitlines()[-window:]

Now, going thru the while loop not everything until the EOF f.read() has to be reread again and again, but rather the chunks f.read(BUFSIZ) are sewed together with newdata + data ...

Totally King 8 years, 10 months ago  # | flag

Of course the line

newdata = f.read(BUFSIZ-(step-fsize))

is not correct. The right version is here:

newdata = thefile.read(BUFSIZ-(abs(step)-fsize))

Sry. (=

Giampaolo Rodolà (author) 8 years, 10 months ago  # | flag

As long as there is a sleep interval there is a (remote) chance that a few lines of the 'old' log file are lost

How so? When a file is rotated unwatch() is called and one last attempt to read file's last lines is done. See:

def unwatch(self, file, fid):
    lines = self.readfile(file)
    del self.files_map[fid]
    if lines:
        self.callback(file.name, lines)

What do you think of my modification of your tail method?

It looks better (faster). I will try to merge it.

Giampaolo Rodolà (author) 8 years, 10 months ago  # | flag

I just merged your change in rev4. Also, I provided the following enhancements:

  • python 3 porting (also dropped support for python <= 2.5)
  • windows support
  • unit tests
  • main class can also be used as a context manager
Totally King 8 years, 10 months ago  # | flag

Just try to watch a rotating log file of a simple program:

i = 0
while True:
    print 'Increasing log file size ...', i

And you will see that whenever the file is unwatched and watched again a few hundred i's are dropped. At least that's what happened when I tried it.

Russell Luo 8 years, 9 months ago  # | flag

I would like to have a try today.

Ido C. 8 years ago  # | flag

Sometimes working with generators is useful. I wrote this subclass that may be useful for some. Enjoy!

class YieldingLogWatcher(LogWatcher):
    def __init__(self, folder, extensions=["log"], tail_lines=0,
                       sizehint=1048576):
        LogWatcher.__init__(self, folder, self.dummy_callback, extensions, tail_lines, sizehint)

    def dummy_callback(self):
        assert False

    def loop(self, interval=0.1, blocking=True):
        while True:
            self.update_files()
            for fid, file in list(self._files_map.items()):
                for file_name, line in self.readlines(file):
                    yield file_name, line

            if not blocking:
                yield
            else:
                time.sleep(interval)

    def readlines(self, file):
        while True:
            lines = file.readlines(self._sizehint)
            if not lines:
                break

            for line in lines:
                yield file.name, line
Nathanael Lecaude 6 years, 12 months ago  # | flag

I can't seem to get this working on OS X, tried on 2 different computers with the same result, the callback doesn't seem to get called when a log file is updated. Anyone having a similar issue ?

Daniel Pérez 6 years, 12 months ago  # | flag

Hi Nathanael,

I couldn't make it work for mac, but is working ok for linux.

Daniel Pérez 6 years, 12 months ago  # | flag

What about this?

tail -F file.txt  2>/dev/null | python lines.py

with lines.py:

import sys

while 1:
    try:
        line = sys.stdin.readline()
        if not line:
            break
        print line.strip()
    except Exception, e:
        raise e

Seem to work in linux and mac, also supporting file rotating

What problems you see to this solution? Really appreciate any feedback, thanks

Nathanael Lecaude 6 years, 11 months ago  # | flag

Thanks that works nicely ! I just changed the tail arguments so it would output only the last line:

tail -n 1 -F file.txt 2>/dev/null | python lines.py

Samir Sadek 6 years, 11 months ago  # | flag

Thanks a lot. I have a comment, I am learning python and I have come across your code for learning purpose. I have noticed that listdir() will not work as expected because of the "in" statement instead of a "==".

for f in ls: print f, os.path.splitext(f)[1][1:], os.path.splitext(f)[1][1:] in "py"

if f has no extention it will be considered true.

The output in a directory is the following :

rey-0.6.3-mac.dmg dmg False sh.txt txt False swift_client.py py True swift_client2.py py True swift_server.py py True swift_server2.py py True swift_server3.py py True swift_system.py py True tests True

Udaya S 6 years, 2 months ago  # | flag

Hi, i am looking to get only newly added lines , any help for that?