After several attempts to use third-party modules I wrote my own console progress meter.
Bonus list:
- calculation of ETA based on last update points. More accuracy when comparing with calculation ETA based on process start time (process can survive after Hibernate, but ETA has lost his accuracy)
- ability to write progress meter to sys.stderr
- update_left() method for multithreaded programs :)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | '''
pmeter.py -- Precise console progress meter with ETA calculation
Some code inherited from CFV sources (cfv.sf.net)
2010-01-11 20:23
'''
import sys
import time
import threading
__author__ = 'Denis Barmenkov <denis.barmenkov@gmail.com>'
__source__ = 'http://code.activestate.com/recipes/577002-precise-console-progress-meter-with-eta-calculatio/?in=user-57155'
def format_sec(sec):
    sec = int(sec)
    min_, sec = divmod(sec, 60)
    hour, min_ = divmod(min_, 60)
    return '%d:%02d:%02d' % (hour, min_, sec)
class ETA(object):
    '''
    calculate ETA (Estimated Time of Arrival :)
    for some events
    Save few last update points or some seconds.
    Help fight statistics after hibernate :)
    '''
    
    def __init__(self, wanted_size, max_point=20, max_seconds=30):
        self.wanted_size = wanted_size
        self.points = list()
        self.max_point = max_point
        self.max_seconds = max_seconds
        self.points.append([time.clock(), 0])
        self.eta = 'N/A'
    def _cleanup(self):
        if len(self.points) < 2:
            return 0
        else:
            last_point_time = self.points[-1][0]
            while len(self.points) > 2:
                if last_point_time - self.points[0][0] > self.max_seconds and \
                   len(self.points) > self.max_point:
                    self.points.pop(0)
                else:
                    break
            return 1
    def update(self, cursize):
        self.points.append([time.clock(), cursize])
        if not self._cleanup():
            return
        delta_time = self.points[-1][0] - self.points[0][0]
        delta_work = cursize
        speed = float(delta_work) / float(delta_time)
        if speed == 0.0:
            return 
        eta = (float(self.wanted_size) - float(cursize)) / float(speed)
        self.eta = format_sec(eta) 
    def getstatus(self):
        return self.eta
class ProgressMeter(object):
  
    def __init__(self, steps=20, min_update_delta=0.1, outstream=sys.stdout):
        self.wantsteps = steps
        self.prev_message = ''
        self.last_update_time = -100
        self.needrefresh = 1
        self.times = list()
        self.done_char = '#'
        self.left_char = '.'
        self.eta_calculator = None
        self.min_update_delta = max(min_update_delta, 0.04) # max 25 fps on redraw :)
        self.outstream = outstream
        self.work_mutex = threading.Lock()
        self.size = None
        self.label = None
        self.steps = None
    def init(self, label, size, cursize=0):
        self.size = size
        self.label = label
        self.steps = self.wantsteps
        self.needrefresh = 1
        self.eta_calculator = ETA(size)
        self.update(cursize)
        self.last_update_time = -100
    def set_complete(self):
        self.needrefresh = 1
        self.update(self.size)
        print # left progress bar on screen
    def _rawupdate(self, cursize):
        self.eta_calculator.update(cursize)
        donesteps = (cursize * self.steps) / self.size
        stepsleft = self.steps - donesteps
        percent = 100.0 * float(cursize) / float(self.size)
        percent_str = '         %.2f%%' % percent
        percent_str = percent_str[-7:]
        if cursize == self.size:
            percent = 100.0
        eta = self.eta_calculator.getstatus()
        message = '%s:%s %s%s ETA: %s' % (self.label, percent_str, self.done_char*donesteps, self.left_char*stepsleft, eta)
        self.outstream.write('\b'*len(self.prev_message) + message); self.outstream.flush()
        self.prev_message = message
        self.last_update_time = time.clock()
    def update_left(self, left):
        '''
        useful in miltithreaded environment when processing job pool:
            
        Main:
            pm.init(len(joblist))
        In thread.run():
            job = joblist.pop(0)
            pm.update_left(len(joblist))
        '''
        self.update(self.size - left)
    
    def update(self, cursize):
        self.work_mutex.acquire()
        if self.needrefresh:
            self._rawupdate(cursize)
            self.needrefresh = 0
        else:
            delta = time.clock() - self.last_update_time
            if delta < self.min_update_delta:
                pass
            else:
                self._rawupdate(cursize)
        self.work_mutex.release()
    
    def cleanup(self):
        self.work_mutex.acquire()
        if not self.needrefresh:
            self.outstream.write('\r' + ' ' * len(self.prev_message) + '\r')
            self.needrefresh = 1
        self.work_mutex.release()
if __name__ == '__main__':
    progress = ProgressMeter(30, outstream=sys.stderr)
    progress.init('Progress Label', 500)
    for i in range(500+1):
        time.sleep(0.01)
        progress.update(i)
    progress.cleanup()
 | 

 Download
Download Copy to clipboard
Copy to clipboard
Thanks...works great. Better than alot of the others on here.
If I run this as is, it says:
Traceback (most recent call last): File "eta.py", line 160, in <module> progress.init('Progress Label', 500) File "eta.py", line 95, in init self.update(cursize) File "eta.py", line 140, in update self._rawupdate(cursize) File "eta.py", line 105, in _rawupdate self.eta_calculator.update(cursize) File "eta.py", line 59, in update speed = float(delta_work) / float(delta_time) ZeroDivisionError: float division
(it qualifies it by saying zero, in 2.7)