The class DAL2 provides a way to label blocks on a hard drive while storing such information on the hard drive itself. The information is kept in the BIT (Block Information Table) and is written out to disk when appropriate. Otherwise, the BIT is kept in memory for efficiency. The hard disk is again presented as a collection of blocks but completely hides disk IO as a result of Disk Abstraction Layer 1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139  | from dal_1 import DAL1
from math import ceil
################################################################################
class DAL2:
    # CONSTANTS
    MAX_BLOCKS = (2 ** 16) - 1
    MAX_SIZE = (2 ** 16)
    # Disk Abstraction Layer
    def __init__(self, blocks, size):
        assert type(blocks) is int and 1 <= blocks <= self.MAX_BLOCKS
        assert type(size) is int and 1 <= size <= self.MAX_SIZE
        blocks = int(ceil(float(blocks * (size + 1)) / size))
        assert blocks <= self.MAX_SIZE
        DAL1.BLOCKS = blocks
        DAL1.SIZE = size
        self.__disk = DAL1()
        self.__blocks = (blocks * size) / (size + 1)
        self.__index = (blocks - self.__blocks) * size
        self.__BIT = [0 for index in range(self.__blocks)]
    # Open A Block
    def open(self, status):
        assert type(status) is int and 1 <= status <= 255
        index = self.__BIT.index(0)
        self.__BIT[index] = status
        return index
    # Read A Block
    def read(self, block):
        assert type(block) is int and 0 <= block < self.__blocks
        assert self.__BIT[block] != 0
        return self.__disk.read(self.__index + block * self.__disk.SIZE, \
                                self.__disk.SIZE)
    # Write A Block
    def write(self, block, data):
        assert type(block) is int and 0 <= block < self.__blocks
        assert type(data) is str and len(data) == self.__disk.SIZE
        assert self.__BIT[block] != 0
        self.__disk.write(self.__index + block * self.__disk.SIZE, data)
    # Close A Block
    def close(self, block):
        assert type(block) is int and 0 <= block < self.__blocks
        assert self.__BIT[block] != 0
        self.__disk.erase(self.__index + block * self.__disk.SIZE, \
                          self.__disk.SIZE)
        self.__BIT[block] = 0
    # Get Status Information
    def status(self, block):
        assert type(block) is int and 0 <= block < self.__blocks
        return self.__BIT[block]
    # Probability Of Failure
    def fail(self, probability):
        self.__disk.fail(probability)
    # Dump To File
    def dump(self, name):
        self.__disk.write(0, ''.join([chr(i) for i in self.__BIT]))
        self.__disk.dump(name)
    # Load From File
    def load(self, name, abstract):
        assert type(abstract) is bool
        self.__disk.load(name)
        self.__blocks = (self.__disk.BLOCKS * self.__disk.SIZE) / \
                        (self.__disk.SIZE + 1)
        self.__index = (self.__disk.BLOCKS - self.__blocks) * self.__disk.SIZE
        self.__BIT = [ord(c) for c in self.__disk.read(0, self.__blocks)]
        if abstract:
            self.__soft()
        else:
            self.__hard()
    # Fix All Errors
    def __soft(self):
        for block, status in enumerate(self.__BIT):
            if status == 0:
                self.__disk.erase(self.__index + block * self.__disk.SIZE, \
                                  self.__disk.SIZE)
    # Find Any Error
    def __hard(self):
        data = chr(0) * self.__disk.SIZE
        for block, status in enumerate(self.__BIT):
            if status == 0:
                assert data == self.__disk.read(self.__index + block * \
                                                self.__disk.SIZE, \
                                                self.__disk.SIZE)
################################################################################
def test():
    from os import remove, urandom
    from random import randint
    test = DAL2(1024, 1024)
    memo = [None for temp in range(1024)]
    for temp in range(1024):
        status = randint(1, 255)
        block = test.open(status)
        data = urandom(1024)
        test.write(block, data)
        memo[block] = status, data
    for temp in range(1024):
        block = randint(0, 1023)
        if test.status(block):
            test.close(block)
            memo[block] = None
    test.dump('temp')
    other = DAL2(1, 1)
    other.load('temp', False)
    remove('temp')
    try:
        for index, information in enumerate(memo):
            if information is None:
                assert other.status(index) == 0
            else:
                assert other.status(index) == information[0]
                assert other.read(index) == information[1]
        valid = False
        try:
            other.status(1024)
        except:
            valid = True
        assert valid
        raw_input('Tested To True')
    except:
        raw_input('Tested To False')
################################################################################
if __name__ == '__main__':
    test()
 | 
This recipe is part of a series of files having the purpose of designing and implementing a file system. dal_1.py can be found at http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/492209
Download
Copy to clipboard