The class DAL2 provides a way to label blocks on a hard drive while storing such information on the hard drive itself. The information is kept in the BIT (Block Information Table) and is written out to disk when appropriate. Otherwise, the BIT is kept in memory for efficiency. The hard disk is again presented as a collection of blocks but completely hides disk IO as a result of Disk Abstraction Layer 1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | from dal_1 import DAL1
from math import ceil
################################################################################
class DAL2:
# CONSTANTS
MAX_BLOCKS = (2 ** 16) - 1
MAX_SIZE = (2 ** 16)
# Disk Abstraction Layer
def __init__(self, blocks, size):
assert type(blocks) is int and 1 <= blocks <= self.MAX_BLOCKS
assert type(size) is int and 1 <= size <= self.MAX_SIZE
blocks = int(ceil(float(blocks * (size + 1)) / size))
assert blocks <= self.MAX_SIZE
DAL1.BLOCKS = blocks
DAL1.SIZE = size
self.__disk = DAL1()
self.__blocks = (blocks * size) / (size + 1)
self.__index = (blocks - self.__blocks) * size
self.__BIT = [0 for index in range(self.__blocks)]
# Open A Block
def open(self, status):
assert type(status) is int and 1 <= status <= 255
index = self.__BIT.index(0)
self.__BIT[index] = status
return index
# Read A Block
def read(self, block):
assert type(block) is int and 0 <= block < self.__blocks
assert self.__BIT[block] != 0
return self.__disk.read(self.__index + block * self.__disk.SIZE, \
self.__disk.SIZE)
# Write A Block
def write(self, block, data):
assert type(block) is int and 0 <= block < self.__blocks
assert type(data) is str and len(data) == self.__disk.SIZE
assert self.__BIT[block] != 0
self.__disk.write(self.__index + block * self.__disk.SIZE, data)
# Close A Block
def close(self, block):
assert type(block) is int and 0 <= block < self.__blocks
assert self.__BIT[block] != 0
self.__disk.erase(self.__index + block * self.__disk.SIZE, \
self.__disk.SIZE)
self.__BIT[block] = 0
# Get Status Information
def status(self, block):
assert type(block) is int and 0 <= block < self.__blocks
return self.__BIT[block]
# Probability Of Failure
def fail(self, probability):
self.__disk.fail(probability)
# Dump To File
def dump(self, name):
self.__disk.write(0, ''.join([chr(i) for i in self.__BIT]))
self.__disk.dump(name)
# Load From File
def load(self, name, abstract):
assert type(abstract) is bool
self.__disk.load(name)
self.__blocks = (self.__disk.BLOCKS * self.__disk.SIZE) / \
(self.__disk.SIZE + 1)
self.__index = (self.__disk.BLOCKS - self.__blocks) * self.__disk.SIZE
self.__BIT = [ord(c) for c in self.__disk.read(0, self.__blocks)]
if abstract:
self.__soft()
else:
self.__hard()
# Fix All Errors
def __soft(self):
for block, status in enumerate(self.__BIT):
if status == 0:
self.__disk.erase(self.__index + block * self.__disk.SIZE, \
self.__disk.SIZE)
# Find Any Error
def __hard(self):
data = chr(0) * self.__disk.SIZE
for block, status in enumerate(self.__BIT):
if status == 0:
assert data == self.__disk.read(self.__index + block * \
self.__disk.SIZE, \
self.__disk.SIZE)
################################################################################
def test():
from os import remove, urandom
from random import randint
test = DAL2(1024, 1024)
memo = [None for temp in range(1024)]
for temp in range(1024):
status = randint(1, 255)
block = test.open(status)
data = urandom(1024)
test.write(block, data)
memo[block] = status, data
for temp in range(1024):
block = randint(0, 1023)
if test.status(block):
test.close(block)
memo[block] = None
test.dump('temp')
other = DAL2(1, 1)
other.load('temp', False)
remove('temp')
try:
for index, information in enumerate(memo):
if information is None:
assert other.status(index) == 0
else:
assert other.status(index) == information[0]
assert other.read(index) == information[1]
valid = False
try:
other.status(1024)
except:
valid = True
assert valid
raw_input('Tested To True')
except:
raw_input('Tested To False')
################################################################################
if __name__ == '__main__':
test()
|
This recipe is part of a series of files having the purpose of designing and implementing a file system. dal_1.py can be found at http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/492209