A DiskSpaceProctor is registered with a SyncManager to be shared by forked processes and threads. This object will check the available free space and add up filesz of concurrent writes. If there is not enough space for the requesting write, an exception is raised.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | import os
import sys
import unittest
from multiprocessing.managers import SyncManager
from contextlib import contextmanager
__license__ = 'MIT'
__email__ = 'mat@miga.me'
#
#
#
class SharedDiskSpaceProctor(SyncManager):
def __init__(self):
SyncManager.__init__(self)
self.register( 'DiskSpaceProctor',
callable=DiskSpaceProctor,
exposed=('_can_write', '_close_write', 'make_dirs'))
#
#
#
class DiskSpaceProctorException(Exception):
def __init__(self,msg,**kwds):
for k,v in kwds.items():
setattr(self,k,v)
self.msg = msg
def __str__(self):
return '%s - %s'%(self.msg, str(self.__dict__))
#
#
#
class DiskSpaceProctor(object):
def __init__(self):
self._wRLock = threading.RLock()
self._currentWrites = {}
#
def _can_write(self, path, fz):
'''
returns True if there is available space and
considers current write operations from other
processes
'''
with self._wRLock:
try:
path = self.__chk_cwd(path)
s = os.statvfs(os.path.dirname(path))
ca = s.f_frsize * s.f_bavail
if self._currentWrites.has_key(path):
raise DiskSpaceProctorException('Concurrent writes not allowed to same path%s'%path)
wbt = 0
for v in self._currentWrites.values():
wbt += v
if (fz + wbt) < ca:
self._currentWrites[path]=fz
return True
raise DiskSpaceProctorException (
'Raising Exception "%s" from %s.%s()'%(
'Not enough space for requested file size',
self.__class__.__name__, str(inspect.stack()[0][3]))
)
except DiskSpaceProctorException, de:
raise de
except Exception, e:
raise Exception (
'Raising Exception "%s" from %s.%s()'%(
e, self.__class__.__name__, str(inspect.stack()[0][3]))
)
#
def _close_write(self, path):
'''
remove path and its filesz out of current writes addition
'''
with self._wRLock:
path = self.__chk_cwd(path)
self._currentWrites.pop(path)
#
def make_dirs(self, path, mode=0754, potFilesz=4096):
'''
create full path, consider potential file write sz
'''
if os.path.exists(path): return True
if self._can_write(path, potFilesz):
os.makedirs(path, mode=mode)
self._close_write(path)
return True
return False
#
def __chk_cwd(self, path):
return os.path.join(os.getcwd(),path) if os.path.dirname(path)=='' else os.path.dirname(path)
#
#
#
class Test(unittest.TestCase):
def test_diskspace_proctor(self):
sdsp = SharedDiskSpaceProctor()
sdsp.start()
dsp = sdsp.DiskSpaceProctor()
dspLock = sdsp.Lock()
testFilename = 'diskspaceproctortest.dat'
if os.path.exists(testFilename):
os.remove(testFilename)
doWrite = False
with dspLock:
doWrite = dsp._can_write(testFilename, 4096):
if doWrite:
f = open(testFilename, 'w')
f.write('for testing only')
f.close()
self.assertTrue(os.path.exists(testFilename))
#
if __name__ == '__main__':
unittest.main()
|
In automated large file creation and management, it can become necessary to constantly monitor incoming data size and the available space on the destination volume. This utility monitors concurrent writes in multiple processes and threads. It would be possible to send out some notification to an admin, on SNMP traps, when space is limited.
I haven't found what will replace os.statvfs(). I'm probably not looking in the right place. :/
Had to change this a bit.. yielding in a context managed block was making each write consecutive.