After several attempts to use third-party modules I wrote my own console progress meter.
Bonus list:
- calculation of ETA based on last update points. More accuracy when comparing with calculation ETA based on process start time (process can survive after Hibernate, but ETA has lost his accuracy)
- ability to write progress meter to sys.stderr
- update_left() method for multithreaded programs :)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | '''
pmeter.py -- Precise console progress meter with ETA calculation
Some code inherited from CFV sources (cfv.sf.net)
2010-01-11 20:23
'''
import sys
import time
import threading
__author__ = 'Denis Barmenkov <denis.barmenkov@gmail.com>'
__source__ = 'http://code.activestate.com/recipes/577002-precise-console-progress-meter-with-eta-calculatio/?in=user-57155'
def format_sec(sec):
sec = int(sec)
min_, sec = divmod(sec, 60)
hour, min_ = divmod(min_, 60)
return '%d:%02d:%02d' % (hour, min_, sec)
class ETA(object):
'''
calculate ETA (Estimated Time of Arrival :)
for some events
Save few last update points or some seconds.
Help fight statistics after hibernate :)
'''
def __init__(self, wanted_size, max_point=20, max_seconds=30):
self.wanted_size = wanted_size
self.points = list()
self.max_point = max_point
self.max_seconds = max_seconds
self.points.append([time.clock(), 0])
self.eta = 'N/A'
def _cleanup(self):
if len(self.points) < 2:
return 0
else:
last_point_time = self.points[-1][0]
while len(self.points) > 2:
if last_point_time - self.points[0][0] > self.max_seconds and \
len(self.points) > self.max_point:
self.points.pop(0)
else:
break
return 1
def update(self, cursize):
self.points.append([time.clock(), cursize])
if not self._cleanup():
return
delta_time = self.points[-1][0] - self.points[0][0]
delta_work = cursize
speed = float(delta_work) / float(delta_time)
if speed == 0.0:
return
eta = (float(self.wanted_size) - float(cursize)) / float(speed)
self.eta = format_sec(eta)
def getstatus(self):
return self.eta
class ProgressMeter(object):
def __init__(self, steps=20, min_update_delta=0.1, outstream=sys.stdout):
self.wantsteps = steps
self.prev_message = ''
self.last_update_time = -100
self.needrefresh = 1
self.times = list()
self.done_char = '#'
self.left_char = '.'
self.eta_calculator = None
self.min_update_delta = max(min_update_delta, 0.04) # max 25 fps on redraw :)
self.outstream = outstream
self.work_mutex = threading.Lock()
self.size = None
self.label = None
self.steps = None
def init(self, label, size, cursize=0):
self.size = size
self.label = label
self.steps = self.wantsteps
self.needrefresh = 1
self.eta_calculator = ETA(size)
self.update(cursize)
self.last_update_time = -100
def set_complete(self):
self.needrefresh = 1
self.update(self.size)
print # left progress bar on screen
def _rawupdate(self, cursize):
self.eta_calculator.update(cursize)
donesteps = (cursize * self.steps) / self.size
stepsleft = self.steps - donesteps
percent = 100.0 * float(cursize) / float(self.size)
percent_str = ' %.2f%%' % percent
percent_str = percent_str[-7:]
if cursize == self.size:
percent = 100.0
eta = self.eta_calculator.getstatus()
message = '%s:%s %s%s ETA: %s' % (self.label, percent_str, self.done_char*donesteps, self.left_char*stepsleft, eta)
self.outstream.write('\b'*len(self.prev_message) + message); self.outstream.flush()
self.prev_message = message
self.last_update_time = time.clock()
def update_left(self, left):
'''
useful in miltithreaded environment when processing job pool:
Main:
pm.init(len(joblist))
In thread.run():
job = joblist.pop(0)
pm.update_left(len(joblist))
'''
self.update(self.size - left)
def update(self, cursize):
self.work_mutex.acquire()
if self.needrefresh:
self._rawupdate(cursize)
self.needrefresh = 0
else:
delta = time.clock() - self.last_update_time
if delta < self.min_update_delta:
pass
else:
self._rawupdate(cursize)
self.work_mutex.release()
def cleanup(self):
self.work_mutex.acquire()
if not self.needrefresh:
self.outstream.write('\r' + ' ' * len(self.prev_message) + '\r')
self.needrefresh = 1
self.work_mutex.release()
if __name__ == '__main__':
progress = ProgressMeter(30, outstream=sys.stderr)
progress.init('Progress Label', 500)
for i in range(500+1):
time.sleep(0.01)
progress.update(i)
progress.cleanup()
|
Thanks...works great. Better than alot of the others on here.
If I run this as is, it says:
Traceback (most recent call last): File "eta.py", line 160, in <module> progress.init('Progress Label', 500) File "eta.py", line 95, in init self.update(cursize) File "eta.py", line 140, in update self._rawupdate(cursize) File "eta.py", line 105, in _rawupdate self.eta_calculator.update(cursize) File "eta.py", line 59, in update speed = float(delta_work) / float(delta_time) ZeroDivisionError: float division
(it qualifies it by saying zero, in 2.7)