Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
"""This module provides a versioned output file.

When you write to such a file, it saves a versioned backup of any
existing file contents.

For usage examples see main()."""

import sys, os, glob, string, marshal

class VersionedOutputFile:
    """This is like a file object opened for output, but it makes
    versioned backups of anything it might otherwise overwrite."""

    def __init__(self, pathname, numSavedVersions=3):
        """Create a new output file.
        
        `pathname' is the name of the file to [over]write.
        `numSavedVersions' tells how many of the most recent versions
        of `pathname' to save."""
        
        self._pathname = pathname
        self._tmpPathname = "%s.~new~" % self._pathname
        self._numSavedVersions = numSavedVersions
        self._outf = open(self._tmpPathname, "wb")

    def __del__(self):
        self.close()

    def close(self):
        if self._outf:
            self._outf.close()
            self._replaceCurrentFile()
            self._outf = None

    def asFile(self):
        """Return self's shadowed file object, since marshal is
        pretty insistent on working w. pure file objects."""
        return self._outf

    def __getattr__(self, attr):
        """Delegate most operations to self's open file object."""
        return getattr(self.__dict__['_outf'], attr)
    
    def _replaceCurrentFile(self):
        """Replace the current contents of self's named file."""
        self._backupCurrentFile()
        os.rename(self._tmpPathname, self._pathname)

    def _backupCurrentFile(self):
        """Save a numbered backup of self's named file."""
        # If the file doesn't already exist, there's nothing to do.
        if os.path.isfile(self._pathname):
            newName = self._versionedName(self._currentRevision() + 1)
            os.rename(self._pathname, newName)

            # Maybe get rid of old versions.
            if ((self._numSavedVersions is not None) and
                (self._numSavedVersions > 0)):
                self._deleteOldRevisions()

    def _versionedName(self, revision):
        """Get self's pathname with a revision number appended."""
        return "%s.~%s~" % (self._pathname, revision)
    
    def _currentRevision(self):
        """Get the revision number of self's largest existing backup."""
        revisions = [0] + self._revisions()
        return max(revisions)

    def _revisions(self):
        """Get the revision numbers of all of self's backups."""
        
        revisions = []
        backupNames = glob.glob("%s.~[0-9]*~" % (self._pathname))
        for name in backupNames:
            try:
                revision = int(string.split(name, "~")[-2])
                revisions.append(revision)
            except ValueError:
                # Some ~[0-9]*~ extensions may not be wholly numeric.
                pass
        revisions.sort()
        return revisions

    def _deleteOldRevisions(self):
        """Delete old versions of self's file, so that at most
        self._numSavedVersions versions are retained."""
        
        revisions = self._revisions()
        revisionsToDelete = revisions[:-self._numSavedVersions]
        for revision in revisionsToDelete:
            pathname = self._versionedName(revision)
            if os.path.isfile(pathname):
                os.remove(pathname)
                
def main():
    """Module mainline (for isolation testing)"""
    basename = "TestFile.txt"
    if os.path.exists(basename):
        os.remove(basename)
    for i in range(10):
        outf = VersionedOutputFile(basename)
        outf.write("This is version %s.\n" % i)
        outf.close()

    # Now there should be just four versions of TestFile.txt:
    expectedSuffixes = ["", ".~7~", ".~8~", ".~9~"]
    expectedVersions = []
    for suffix in expectedSuffixes:
        expectedVersions.append("%s%s" % (basename, suffix))
    matchingFiles = glob.glob("%s*" % basename)
    for filename in matchingFiles:
        if filename not in expectedVersions:
            sys.stderr.write("Found unexpected file %s.\n" % filename)
        else:
            # Unit tests should clean up after themselves...
            os.remove(filename)

    # Finally, here's an example of how to use versioned
    # output files in concert with marshal.
    import marshal

    outf = VersionedOutputFile("marshal.dat")
    # Marshal out a sequence:
    marshal.dump([1, 2, 3], outf.asFile())
    outf.close()
    os.remove("marshal.dat")

if __name__ == "__main__":
    main()

History

  • revision 2 (22 years ago)
  • previous revisions are not available