Search nth smallest float in really big file (more, more and more bigger than available RAM) in a single pass through the file
if your file more than 150GB - you should use a more appropriate sampling params to the data more - you can use tempfile to store data(interval) returned from func fill_interval data: file with only one float number per line, good shuffled
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | import os
import random
from array import array
def get_sampling_params(filename):
'''redefine to fit data'''
filesize = os.path.getsize(filename)
num_of_chunks = 500
size_of_chunk = filesize/(100*num_of_chunks)
seek_positions = [random.randint(0,(filesize-size_of_chunk)) for i
in xrange(num_of_chunks)]
epsilon = 2
return seek_positions, size_of_chunk, epsilon
def sample_chunks_gen(filename, seek_positions, size_of_chunk):
'''yields array of floats'''
def _array_from_file_chunk(input_file, seek_position, size_of_chunk):
input_file.seek(seek_position)
buf_str = input_file.read(size_of_chunk)
first_endl = buf_str.find('\n')
last_endl = buf_str.rfind('\n')
return array('f', map(float,
buf_str[(first_endl+1):][:(last_endl+1)].splitlines()))
with open(filename, 'rb') as input_file:
for seek_position in seek_positions:
yield _array_from_file_chunk(input_file, seek_position,
size_of_chunk)
def select(data, positions, start=0, end=None):
'''For every n in *positions* find nth rank ordered element in *data*
inplace select'''
if not end: end = len(data) - 1
if end < start:
return []
if end == start:
return [data[start]]
pivot_rand_i = random.randrange(start,end)
pivot_rand = data[pivot_rand_i] # get random pivot
data[end], data[pivot_rand_i] = data[pivot_rand_i], data[end]
pivot_i = start
for i in xrange(start, end): # partitioning about the pivot
if data[i] < pivot_rand:
data[pivot_i], data[i] = data[i], data[pivot_i]
pivot_i += 1
data[end], data[pivot_i] = data[pivot_i], data[end]
under_positions, over_positions, mid_positions = [],[],[]
for position in positions:
if position == pivot_i:
mid_positions.append(position)
elif position < pivot_i:
under_positions.append(position)
else:
over_positions.append(position)
result = []
if len(under_positions) > 0:
result.extend(select(data, under_positions, start, pivot_i-1))
if len(mid_positions) > 0:
result.extend([data[position] for position in mid_positions])
if len(over_positions) > 0:
result.extend(select(data, over_positions, pivot_i+1, end))
return result
def get_interval_endpoints(data, position, epsilon):
'''return interval endpoints'''
values = select(data, [position-epsilon, position+epsilon])
return min(values), max(values)
def reduce_endpoints(endpoints):
'''return min and max of all endpoints'''
first_endpoint = min(endpoints, key=lambda x: x[0])
last_endpoint = max(endpoints, key=lambda x: x[1])
return first_endpoint[0], last_endpoint[1]
def fill_interval(filename, endpoints, size_of_chunk=64*1024):
'''return count of values before interval, interval, data len'''
with open(filename, 'rb') as input_file:
first_endpoint = endpoints[0]
last_endpoint = endpoints[1]
seek_position = 0
interval = array('f')
iappend = interval.append
pre_count = 0
count = 0
input_file.seek(seek_position)
buf_str = input_file.read(size_of_chunk)
while buf_str:
last_endl = buf_str.rfind('\n')
buf_arr = array('f', map(float,
buf_str[:(last_endl+1)].splitlines()))
count = count + len(buf_arr)
for value in buf_arr:
if value <= first_endpoint:
pre_count = pre_count + 1
elif last_endpoint <= value:
pass
else:
iappend(value)
seek_position = seek_position + (last_endl+1)
input_file.seek(seek_position)
buf_str = input_file.read(size_of_chunk)
return interval, count, pre_count
def find_value(filename, position, sampling_func=get_sampling_params):
'''find value in file'''
seek_positions, size_of_chunk, epsilon = sampling_func(filename)
s_chunks_iter = sample_chunks_gen(filename, seek_positions, size_of_chunk)
s_endpoints = map(lambda x:
get_interval_endpoints(x, position, epsilon),
s_chunks_iter)
endpoints = reduce_endpoints(s_endpoints)
interval, count, pre_count = fill_interval(filename, endpoints)
print len(interval)
results = select(interval, [position - pre_count])
return results[0]
if __name__ == '__main__':
import optparse
parser = optparse.OptionParser()
parser.add_option(
'-f','--file',
dest='filename',
)
parser.add_option(
'-p','--position',
dest='position',
type='int'
)
options, args = parser.parse_args()
print repr(find_value(options.filename, options.position))
|