A python class which "watches" a directory and calls a callback(filename, lines) function every time one of the files being watched gets written, in real time.
Practically speaking, this can be compared to "tail -F *.log" UNIX command, but instead of having lines printed to stdout a python function gets called.
Similarly to tail, it takes care of "watching" new files which are created after initialization and "unwatching" those ones which are removed in the meantime. This means you'll be able to "follow" and support also rotating log files.
History
- rev5 (2013-04-05):
- sizehint parameter
- rev4 (2013-03-16):
- python 3 support (also dropped support for python <= 2.5)
- windows support
- unit tests
- main class can also be used as a context manager
- rev3 (2012-01-13): initial release
Run the command below in your terminal to instantly set up a sandboxed dev environment with this recipe.
You can view the complete code in the github
repository for this recipe.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 | #!/usr/bin/env python
"""
Real-time log files watcher supporting log rotation.
Works with Python >= 2.6 and >= 3.2, on both POSIX and Windows.
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""
import os
import time
import errno
import stat
import sys
class LogWatcher(object):
"""Looks for changes in all files of a directory.
This is useful for watching log file changes in real-time.
It also supports files rotation.
Example:
>>> def callback(filename, lines):
... print(filename, lines)
...
>>> lw = LogWatcher("/var/log/", callback)
>>> lw.loop()
"""
def __init__(self, folder, callback, extensions=["log"], tail_lines=0,
sizehint=1048576):
"""Arguments:
(str) @folder:
the folder to watch
(callable) @callback:
a function which is called every time one of the file being
watched is updated;
this is called with "filename" and "lines" arguments.
(list) @extensions:
only watch files with these extensions
(int) @tail_lines:
read last N lines from files being watched before starting
(int) @sizehint: passed to file.readlines(), represents an
approximation of the maximum number of bytes to read from
a file on every ieration (as opposed to load the entire
file in memory until EOF is reached). Defaults to 1MB.
"""
self.folder = os.path.realpath(folder)
self.extensions = extensions
self._files_map = {}
self._callback = callback
self._sizehint = sizehint
assert os.path.isdir(self.folder), self.folder
assert callable(callback), repr(callback)
self.update_files()
for id, file in self._files_map.items():
file.seek(os.path.getsize(file.name)) # EOF
if tail_lines:
try:
lines = self.tail(file.name, tail_lines)
except IOError as err:
if err.errno != errno.ENOENT:
raise
else:
if lines:
self._callback(file.name, lines)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __del__(self):
self.close()
def loop(self, interval=0.1, blocking=True):
"""Start a busy loop checking for file changes every *interval*
seconds. If *blocking* is False make one loop then return.
"""
# May be overridden in order to use pyinotify lib and block
# until the directory being watched is updated.
# Note that directly calling readlines() as we do is faster
# than first checking file's last modification times.
while True:
self.update_files()
for fid, file in list(self._files_map.items()):
self.readlines(file)
if not blocking:
return
time.sleep(interval)
def log(self, line):
"""Log when a file is un/watched"""
print(line)
def listdir(self):
"""List directory and filter files by extension.
You may want to override this to add extra logic or globbing
support.
"""
ls = os.listdir(self.folder)
if self.extensions:
return [x for x in ls if os.path.splitext(x)[1][1:] \
in self.extensions]
else:
return ls
@classmethod
def open(cls, file):
"""Wrapper around open().
By default files are opened in binary mode and readlines()
will return bytes on both Python 2 and 3.
This means callback() will deal with a list of bytes.
Can be overridden in order to deal with unicode strings
instead, like this:
import codecs, locale
return codecs.open(file, 'r', encoding=locale.getpreferredencoding(),
errors='ignore')
"""
return open(file, 'rb')
@classmethod
def tail(cls, fname, window):
"""Read last N lines from file fname."""
if window <= 0:
raise ValueError('invalid window value %r' % window)
with cls.open(fname) as f:
BUFSIZ = 1024
# True if open() was overridden and file was opened in text
# mode. In that case readlines() will return unicode strings
# instead of bytes.
encoded = getattr(f, 'encoding', False)
CR = '\n' if encoded else b'\n'
data = '' if encoded else b''
f.seek(0, os.SEEK_END)
fsize = f.tell()
block = -1
exit = False
while not exit:
step = (block * BUFSIZ)
if abs(step) >= fsize:
f.seek(0)
newdata = f.read(BUFSIZ - (abs(step) - fsize))
exit = True
else:
f.seek(step, os.SEEK_END)
newdata = f.read(BUFSIZ)
data = newdata + data
if data.count(CR) >= window:
break
else:
block -= 1
return data.splitlines()[-window:]
def update_files(self):
ls = []
for name in self.listdir():
absname = os.path.realpath(os.path.join(self.folder, name))
try:
st = os.stat(absname)
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
else:
if not stat.S_ISREG(st.st_mode):
continue
fid = self.get_file_id(st)
ls.append((fid, absname))
# check existent files
for fid, file in list(self._files_map.items()):
try:
st = os.stat(file.name)
except EnvironmentError as err:
if err.errno == errno.ENOENT:
self.unwatch(file, fid)
else:
raise
else:
if fid != self.get_file_id(st):
# same name but different file (rotation); reload it.
self.unwatch(file, fid)
self.watch(file.name)
# add new ones
for fid, fname in ls:
if fid not in self._files_map:
self.watch(fname)
def readlines(self, file):
"""Read file lines since last access until EOF is reached and
invoke callback.
"""
while True:
lines = file.readlines(self._sizehint)
if not lines:
break
self._callback(file.name, lines)
def watch(self, fname):
try:
file = self.open(fname)
fid = self.get_file_id(os.stat(fname))
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
else:
self.log("watching logfile %s" % fname)
self._files_map[fid] = file
def unwatch(self, file, fid):
# File no longer exists. If it has been renamed try to read it
# for the last time in case we're dealing with a rotating log
# file.
self.log("un-watching logfile %s" % file.name)
del self._files_map[fid]
with file:
lines = self.readlines(file)
if lines:
self._callback(file.name, lines)
@staticmethod
def get_file_id(st):
if os.name == 'posix':
return "%xg%x" % (st.st_dev, st.st_ino)
else:
return "%f" % st.st_ctime
def close(self):
for id, file in self._files_map.items():
file.close()
self._files_map.clear()
# ===================================================================
# --- tests
# ===================================================================
if __name__ == '__main__':
import unittest
import atexit
TESTFN = '$testfile.log'
TESTFN2 = '$testfile2.log'
PY3 = sys.version_info[0] == 3
if PY3:
def b(s):
return s.encode("latin-1")
else:
def b(s):
return s
class TestLogWatcher(unittest.TestCase):
def setUp(self):
def callback(filename, lines):
self.filename.append(filename)
for line in lines:
self.lines.append(line)
self.filename = []
self.lines = []
self.file = open(TESTFN, 'w')
self.watcher = LogWatcher(os.getcwd(), callback)
def tearDown(self):
self.watcher.close()
self.remove_test_files()
def write_file(self, data):
self.file.write(data)
self.file.flush()
@staticmethod
@atexit.register
def remove_test_files():
for x in [TESTFN, TESTFN2]:
try:
os.remove(x)
except EnvironmentError:
pass
def test_no_lines(self):
self.watcher.loop(blocking=False)
def test_one_line(self):
self.write_file('foo')
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo"])
self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
def test_two_lines(self):
self.write_file('foo\n')
self.write_file('bar\n')
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo\n", b"bar\n"])
self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
def test_new_file(self):
with open(TESTFN2, "w") as f:
f.write("foo")
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo"])
self.assertEqual(self.filename, [os.path.abspath(TESTFN2)])
def test_file_removed(self):
self.write_file("foo")
try:
os.remove(TESTFN)
except EnvironmentError: # necessary on Windows
pass
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo"])
def test_tail(self):
MAX = 10000
content = '\n'.join([str(x) for x in range(0, MAX)])
self.write_file(content)
# input < BUFSIZ (1 iteration)
lines = self.watcher.tail(self.file.name, 100)
self.assertEqual(len(lines), 100)
self.assertEqual(lines, [b(str(x)) for x in range(MAX-100, MAX)])
# input > BUFSIZ (multiple iterations)
lines = self.watcher.tail(self.file.name, 5000)
self.assertEqual(len(lines), 5000)
self.assertEqual(lines, [b(str(x)) for x in range(MAX-5000, MAX)])
# input > file's total lines
lines = self.watcher.tail(self.file.name, MAX + 9999)
self.assertEqual(len(lines), MAX)
self.assertEqual(lines, [b(str(x)) for x in range(0, MAX)])
#
self.assertRaises(ValueError, self.watcher.tail, self.file.name, 0)
LogWatcher.tail(self.file.name, 10)
def test_ctx_manager(self):
with self.watcher:
pass
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(TestLogWatcher))
unittest.TextTestRunner(verbosity=2).run(test_suite)
|
Basic usage
same as: tail -F /var/log/*.log
def callback(filename, lines):
for line in lines:
print(line)
watcher = LogWatcher("/var/log/", callback)
watcher.loop()
Also read last N lines from files before start watching
same as: tail -F /var/log/*.log -n 20
def callback(filename, lines):
for line in lines:
print(line)
watcher = LogWatcher("/var/log/", callback, tail_lines=20)
watcher.loop()
Tail last N lines from a single file only
same as: tail -n 10 foo.log
LogWatcher.tail('foo.log', 10)
Non blocking
import time
def callback(filename, lines):
for line in lines:
print(line)
watcher = LogWatcher("/var/log/", callback)
while 1:
print("loop")
watcher.loop(blocking=False)
time.sleep(0.1)
Coloured logs
In case your python application is using the logging module you might want to monitor what it's doing in real time and have a coloured ouput. Assuming your log format is configured as such:
import logging
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)1.1s %(asctime)s] %(message)s',)
...you'll have log lines looking like this:
[I 2011-11-29 19:26:44,774] info message
[D 2011-11-29 19:26:44,774] debug message
[E 2011-11-29 19:26:44,774] some error message
The code below is able to parse this syntax and add shell colors, including unhandled exception tracebacks which aren't logged via logging.error():
RED = "31m"
BLUE = "34m"
GREEN = "32m"
YELLOW = "33m"
MAGENTA = "35m"
def coloured(s, color):
return '\033[1;%s%s\033[1;m' % (color, s)
def callback(filename, lines):
while lines:
line = lines.pop(0).rstrip()
noheader = False
if line.startswith("[E ") or line.startswith("Traceback"):
color = RED
elif line.startswith("[D "):
color = BLUE
elif line.startswith("[I "):
color = GREEN
elif line.startswith("[W "):
color = YELLOW
else:
noheader = True
color = MAGENTA
if noheader:
print(line)
else:
endheader = line.find(']')
header = coloured(line[0:endheader + 1], color)
line = line[endheader + 1:]
print(header + line)
watcher = LogWatcher("/var/log/", callback, tail_lines=10)
watcher.loop()
thanks a lot.this is what I need.I will get it and use it.
Good job!
but,why just give the files list?use folder and extensions,is not easy to use.
'extensions' parameter was added to avoid subclassing (passing a parameter is easier/quicker). If you need total control you can override listdir() method in your subclass and do whatever you want in there (e.g. add globbing support or watch multiple folders).
Thank you very much. I needed exactly this today. YMMD.
Many thanks, I was using this class with python 3.3 and it was working well. Now I have to use Python 2.7, and it seems to monitor only one file per folder. Did you know if it can be compatible with 2.7? Regards
Ciao Leonardo. Actually this is supposed to work with Python 2.x only. Not sure why it seems to monitor only one file per folder: have you tried to debug listdir() method?
Hi Giampaolo, thanks for the quick answer. I 've tried to debug the lisdir() and the list is complete with all the files needed. Maybe i encounter the issue in the update_files : the names of the files seems to be ok but they have all the same fid when i print the "ls".
Different files with the same id might mean they are symlinks pointing to the same file, which is therefore seen as a single entity. Try to look into why get_file_id() returns the same value for different files. Another possibility is you're using some exotic filesystem upon which get_file_id() is unreliaable.
You're right, i'm using NTFS filesystems(quite exotic i agreed :)) and with os.stat function it results : st_ino=0L, st_dev=0 which is a normal behavior for windows(after some research). I don't know why it works fine with python 3.3. If I find a solution to use it with windows, i will post another comment. Thanks for your help.
I've never tried this on Windows so it's entirely possible it doesn't work. And yes, st_ino and st_dev can't be used to identify a file uniquely on Windows. I think you can try st_ctime (file creation time) as in:
Please let me know if it fixes the issue in which case I'll update the recipe (and while I'm at it I'll also add python 3 support).
typo in the code above:
Yes it works with windows! many thanks. For the python 3 support some changes must be made : print(something) instead of print something and except ... as err instead of except ..., err
While reading the code, I have some small questions about it in detail:
For seeking to the end of file, you used 2 ways:
and
Is there any difference of these 2 methods?
Another one is in tail() method, I see that you open the file to read some line from tail, but you don't close that file. Is that on purpose, or just because it doesn't matter if the file is closed after the static method exited?
BTW, thanks for great work of this helpful recipe.
The two methods are equivalent. For what it's worth, I bet f.seek(0, os.SEEK_END) is a bit faster. I don't close the file in tail() method just because I forgot to. The garbage collector will automatically do that sooner or later though.
Thanks! As a learner of python, I would really like to treat these small details carefully. :)
A. Thx alot for your code. But It has a flaw as has my own one for watching rotating log files: As long as there is a sleep interval there is a (remote) chance that a few lines of the 'old' log file are lost and the 'new' log file is already read in ... And the problem is the smaller the interval (the less likely the loss of lines, BUT) the higher the CPU load ... Do you know any work around?
B. What do you think of my modification of your tail method?
Now, going thru the while loop not everything until the EOF
f.read()
has to be reread again and again, but rather the chunksf.read(BUFSIZ)
are sewed together withnewdata + data
...Of course the line
is not correct. The right version is here:
Sry. (=
How so? When a file is rotated unwatch() is called and one last attempt to read file's last lines is done. See:
It looks better (faster). I will try to merge it.
I just merged your change in rev4. Also, I provided the following enhancements:
Just try to watch a rotating log file of a simple program:
And you will see that whenever the file is unwatched and watched again a few hundred i's are dropped. At least that's what happened when I tried it.
I would like to have a try today.
Sometimes working with generators is useful. I wrote this subclass that may be useful for some. Enjoy!
I can't seem to get this working on OS X, tried on 2 different computers with the same result, the callback doesn't seem to get called when a log file is updated. Anyone having a similar issue ?
Hi Nathanael,
I couldn't make it work for mac, but is working ok for linux.
What about this?
with lines.py:
Seem to work in linux and mac, also supporting file rotating
What problems you see to this solution? Really appreciate any feedback, thanks
Thanks that works nicely ! I just changed the tail arguments so it would output only the last line:
tail -n 1 -F file.txt 2>/dev/null | python lines.py
Thanks a lot. I have a comment, I am learning python and I have come across your code for learning purpose. I have noticed that listdir() will not work as expected because of the "in" statement instead of a "==".
for f in ls: print f, os.path.splitext(f)[1][1:], os.path.splitext(f)[1][1:] in "py"
if f has no extention it will be considered true.
The output in a directory is the following :
rey-0.6.3-mac.dmg dmg False sh.txt txt False swift_client.py py True swift_client2.py py True swift_server.py py True swift_server2.py py True swift_server3.py py True swift_system.py py True tests True
Hi, i am looking to get only newly added lines , any help for that?
@Uday, See https://pypi.python.org/pypi/pygtail