import os import sys import unittest from multiprocessing.managers import SyncManager from contextlib import contextmanager __license__ = 'MIT' __email__ = '' # # # class SharedDiskSpaceProctor(SyncManager): def __init__(self): SyncManager.__init__(self) self.register( 'DiskSpaceProctor', callable=DiskSpaceProctor, exposed=('_can_write', '_close_write', 'make_dirs')) # # # class DiskSpaceProctorException(Exception): def __init__(self,msg,**kwds): for k,v in kwds.items(): setattr(self,k,v) self.msg = msg def __str__(self): return '%s - %s'%(self.msg, str(self.__dict__)) # # # class DiskSpaceProctor(object): def __init__(self): self._wRLock = threading.RLock() self._currentWrites = {} # def _can_write(self, path, fz): ''' returns True if there is available space and considers current write operations from other processes ''' with self._wRLock: try: path = self.__chk_cwd(path) s = os.statvfs(os.path.dirname(path)) ca = s.f_frsize * s.f_bavail if self._currentWrites.has_key(path): raise DiskSpaceProctorException('Concurrent writes not allowed to same path%s'%path) wbt = 0 for v in self._currentWrites.values(): wbt += v if (fz + wbt) < ca: self._currentWrites[path]=fz return True raise DiskSpaceProctorException ( 'Raising Exception "%s" from %s.%s()'%( 'Not enough space for requested file size', self.__class__.__name__, str(inspect.stack()[0][3])) ) except DiskSpaceProctorException, de: raise de except Exception, e: raise Exception ( 'Raising Exception "%s" from %s.%s()'%( e, self.__class__.__name__, str(inspect.stack()[0][3])) ) # def _close_write(self, path): ''' remove path and its filesz out of current writes addition ''' with self._wRLock: path = self.__chk_cwd(path) self._currentWrites.pop(path) # def make_dirs(self, path, mode=0754, potFilesz=4096): ''' create full path, consider potential file write sz ''' if os.path.exists(path): return True if self._can_write(path, potFilesz): os.makedirs(path, mode=mode) self._close_write(path) return True return False # def __chk_cwd(self, path): return os.path.join(os.getcwd(),path) if os.path.dirname(path)=='' else os.path.dirname(path) # # # class Test(unittest.TestCase): def test_diskspace_proctor(self): sdsp = SharedDiskSpaceProctor() sdsp.start() dsp = sdsp.DiskSpaceProctor() dspLock = sdsp.Lock() testFilename = 'diskspaceproctortest.dat' if os.path.exists(testFilename): os.remove(testFilename) doWrite = False with dspLock: doWrite = dsp._can_write(testFilename, 4096): if doWrite: f = open(testFilename, 'w') f.write('for testing only') f.close() self.assertTrue(os.path.exists(testFilename)) # if __name__ == '__main__': unittest.main()