#!/usr/bin/env python
"""This module provides a versioned output file.
When you write to such a file, it saves a versioned backup of any
existing file contents.
For usage examples see main()."""
import sys, os, glob, string, marshal
class VersionedOutputFile:
"""This is like a file object opened for output, but it makes
versioned backups of anything it might otherwise overwrite."""
def __init__(self, pathname, numSavedVersions=3):
"""Create a new output file.
`pathname' is the name of the file to [over]write.
`numSavedVersions' tells how many of the most recent versions
of `pathname' to save."""
self._pathname = pathname
self._tmpPathname = "%s.~new~" % self._pathname
self._numSavedVersions = numSavedVersions
self._outf = open(self._tmpPathname, "wb")
def __del__(self):
self.close()
def close(self):
if self._outf:
self._outf.close()
self._replaceCurrentFile()
self._outf = None
def asFile(self):
"""Return self's shadowed file object, since marshal is
pretty insistent on working w. pure file objects."""
return self._outf
def __getattr__(self, attr):
"""Delegate most operations to self's open file object."""
return getattr(self.__dict__['_outf'], attr)
def _replaceCurrentFile(self):
"""Replace the current contents of self's named file."""
self._backupCurrentFile()
os.rename(self._tmpPathname, self._pathname)
def _backupCurrentFile(self):
"""Save a numbered backup of self's named file."""
# If the file doesn't already exist, there's nothing to do.
if os.path.isfile(self._pathname):
newName = self._versionedName(self._currentRevision() + 1)
os.rename(self._pathname, newName)
# Maybe get rid of old versions.
if ((self._numSavedVersions is not None) and
(self._numSavedVersions > 0)):
self._deleteOldRevisions()
def _versionedName(self, revision):
"""Get self's pathname with a revision number appended."""
return "%s.~%s~" % (self._pathname, revision)
def _currentRevision(self):
"""Get the revision number of self's largest existing backup."""
revisions = [0] + self._revisions()
return max(revisions)
def _revisions(self):
"""Get the revision numbers of all of self's backups."""
revisions = []
backupNames = glob.glob("%s.~[0-9]*~" % (self._pathname))
for name in backupNames:
try:
revision = int(string.split(name, "~")[-2])
revisions.append(revision)
except ValueError:
# Some ~[0-9]*~ extensions may not be wholly numeric.
pass
revisions.sort()
return revisions
def _deleteOldRevisions(self):
"""Delete old versions of self's file, so that at most
self._numSavedVersions versions are retained."""
revisions = self._revisions()
revisionsToDelete = revisions[:-self._numSavedVersions]
for revision in revisionsToDelete:
pathname = self._versionedName(revision)
if os.path.isfile(pathname):
os.remove(pathname)
def main():
"""Module mainline (for isolation testing)"""
basename = "TestFile.txt"
if os.path.exists(basename):
os.remove(basename)
for i in range(10):
outf = VersionedOutputFile(basename)
outf.write("This is version %s.\n" % i)
outf.close()
# Now there should be just four versions of TestFile.txt:
expectedSuffixes = ["", ".~7~", ".~8~", ".~9~"]
expectedVersions = []
for suffix in expectedSuffixes:
expectedVersions.append("%s%s" % (basename, suffix))
matchingFiles = glob.glob("%s*" % basename)
for filename in matchingFiles:
if filename not in expectedVersions:
sys.stderr.write("Found unexpected file %s.\n" % filename)
else:
# Unit tests should clean up after themselves...
os.remove(filename)
# Finally, here's an example of how to use versioned
# output files in concert with marshal.
import marshal
outf = VersionedOutputFile("marshal.dat")
# Marshal out a sequence:
marshal.dump([1, 2, 3], outf.asFile())
outf.close()
os.remove("marshal.dat")
if __name__ == "__main__":
main()