Welcome, guest | Sign In | My Account | Store | Cart

A DiskSpaceProctor is registered with a SyncManager to be shared by forked processes and threads. This object will check the available free space and add up filesz of concurrent writes. If there is not enough space for the requesting write, an exception is raised.

Python, 111 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import sys
import unittest
from multiprocessing.managers import SyncManager
from contextlib import contextmanager
__license__ = 'MIT'
__email__ = 'mat@miga.me'
#
#
#
class SharedDiskSpaceProctor(SyncManager):
   def __init__(self):
      SyncManager.__init__(self)
         self.register(	'DiskSpaceProctor',
               callable=DiskSpaceProctor,
               exposed=('_can_write', '_close_write', 'make_dirs'))
#
#
#
class DiskSpaceProctorException(Exception):
   def __init__(self,msg,**kwds):
      for k,v in kwds.items():
         setattr(self,k,v)
      self.msg = msg
   def __str__(self):
      return '%s - %s'%(self.msg, str(self.__dict__)) 

#
#
#
class DiskSpaceProctor(object):
   def __init__(self):
      self._wRLock = threading.RLock()
      self._currentWrites = {}
   #
   def _can_write(self, path, fz):
      '''
         returns True if there is available space and
         considers current write operations from other
         processes
      '''
      with self._wRLock:
         try:
            path = self.__chk_cwd(path) 
            s = os.statvfs(os.path.dirname(path))
            ca = s.f_frsize * s.f_bavail
            if self._currentWrites.has_key(path):
               raise DiskSpaceProctorException('Concurrent writes not allowed to same path%s'%path)
            wbt = 0
            for v in self._currentWrites.values():
               wbt += v
            if (fz + wbt) < ca:
               self._currentWrites[path]=fz
               return True
            raise DiskSpaceProctorException (
               'Raising Exception "%s" from %s.%s()'%(
                  'Not enough space for requested file size',
                  self.__class__.__name__, str(inspect.stack()[0][3]))
            )
         except DiskSpaceProctorException, de:
            raise de
         except Exception, e:
            raise Exception (
               'Raising Exception "%s" from %s.%s()'%(
                  e, self.__class__.__name__, str(inspect.stack()[0][3]))
            )
   #
   def _close_write(self, path):
      '''
         remove path and its filesz out of current writes addition
      '''
      with self._wRLock:
         path = self.__chk_cwd(path)
         self._currentWrites.pop(path)
   #
   def make_dirs(self, path, mode=0754, potFilesz=4096):
      '''
         create full path, consider potential file write sz
      '''
      if os.path.exists(path): return True
      if self._can_write(path, potFilesz):
         os.makedirs(path, mode=mode)
         self._close_write(path)
         return True
      return False
   #
   def __chk_cwd(self, path):
      return os.path.join(os.getcwd(),path) if os.path.dirname(path)=='' else os.path.dirname(path)
#
#
#  
class Test(unittest.TestCase):
   def test_diskspace_proctor(self):
      sdsp = SharedDiskSpaceProctor()
      sdsp.start()
      dsp = sdsp.DiskSpaceProctor()
      dspLock = sdsp.Lock()
      testFilename = 'diskspaceproctortest.dat'
      if os.path.exists(testFilename):
         os.remove(testFilename)
      doWrite = False
      with dspLock: 
         doWrite = dsp._can_write(testFilename, 4096):
      if doWrite:    
         f = open(testFilename, 'w')
         f.write('for testing only')
         f.close()
      self.assertTrue(os.path.exists(testFilename))   
#  
if __name__ == '__main__':
   unittest.main()

In automated large file creation and management, it can become necessary to constantly monitor incoming data size and the available space on the destination volume. This utility monitors concurrent writes in multiple processes and threads. It would be possible to send out some notification to an admin, on SNMP traps, when space is limited.

I haven't found what will replace os.statvfs(). I'm probably not looking in the right place. :/

Had to change this a bit.. yielding in a context managed block was making each write consecutive.