Welcome, guest | Sign In | My Account | Store | Cart

After several attempts to use third-party modules I wrote my own console progress meter.

Bonus list:

  1. calculation of ETA based on last update points. More accuracy when comparing with calculation ETA based on process start time (process can survive after Hibernate, but ETA has lost his accuracy)
  2. ability to write progress meter to sys.stderr
  3. update_left() method for multithreaded programs :)
Python, 163 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
'''
pmeter.py -- Precise console progress meter with ETA calculation

Some code inherited from CFV sources (cfv.sf.net)

2010-01-11 20:23
'''
import sys
import time
import threading

__author__ = 'Denis Barmenkov <denis.barmenkov@gmail.com>'
__source__ = 'http://code.activestate.com/recipes/577002-precise-console-progress-meter-with-eta-calculatio/?in=user-57155'

def format_sec(sec):
    sec = int(sec)
    min_, sec = divmod(sec, 60)
    hour, min_ = divmod(min_, 60)
    return '%d:%02d:%02d' % (hour, min_, sec)

class ETA(object):
    '''
    calculate ETA (Estimated Time of Arrival :)
    for some events

    Save few last update points or some seconds.
    Help fight statistics after hibernate :)
    '''
    
    def __init__(self, wanted_size, max_point=20, max_seconds=30):
        self.wanted_size = wanted_size
        self.points = list()
        self.max_point = max_point
        self.max_seconds = max_seconds
        self.points.append([time.clock(), 0])
        self.eta = 'N/A'

    def _cleanup(self):
        if len(self.points) < 2:
            return 0
        else:
            last_point_time = self.points[-1][0]
            while len(self.points) > 2:
                if last_point_time - self.points[0][0] > self.max_seconds and \
                   len(self.points) > self.max_point:
                    self.points.pop(0)
                else:
                    break
            return 1

    def update(self, cursize):
        self.points.append([time.clock(), cursize])
        if not self._cleanup():
            return

        delta_time = self.points[-1][0] - self.points[0][0]
        delta_work = cursize
        speed = float(delta_work) / float(delta_time)
        if speed == 0.0:
            return 

        eta = (float(self.wanted_size) - float(cursize)) / float(speed)
        self.eta = format_sec(eta) 

    def getstatus(self):
        return self.eta


class ProgressMeter(object):
  
    def __init__(self, steps=20, min_update_delta=0.1, outstream=sys.stdout):
        self.wantsteps = steps
        self.prev_message = ''
        self.last_update_time = -100
        self.needrefresh = 1
        self.times = list()
        self.done_char = '#'
        self.left_char = '.'
        self.eta_calculator = None
        self.min_update_delta = max(min_update_delta, 0.04) # max 25 fps on redraw :)
        self.outstream = outstream
        self.work_mutex = threading.Lock()
        self.size = None
        self.label = None
        self.steps = None

    def init(self, label, size, cursize=0):
        self.size = size
        self.label = label
        self.steps = self.wantsteps
        self.needrefresh = 1

        self.eta_calculator = ETA(size)
        self.update(cursize)

        self.last_update_time = -100

    def set_complete(self):
        self.needrefresh = 1
        self.update(self.size)
        print # left progress bar on screen

    def _rawupdate(self, cursize):
        self.eta_calculator.update(cursize)

        donesteps = (cursize * self.steps) / self.size
        stepsleft = self.steps - donesteps
        percent = 100.0 * float(cursize) / float(self.size)
        percent_str = '         %.2f%%' % percent
        percent_str = percent_str[-7:]

        if cursize == self.size:
            percent = 100.0

        eta = self.eta_calculator.getstatus()

        message = '%s:%s %s%s ETA: %s' % (self.label, percent_str, self.done_char*donesteps, self.left_char*stepsleft, eta)
        self.outstream.write('\b'*len(self.prev_message) + message); self.outstream.flush()
        self.prev_message = message

        self.last_update_time = time.clock()

    def update_left(self, left):
        '''
        useful in miltithreaded environment when processing job pool:
            
        Main:
            pm.init(len(joblist))

        In thread.run():
            job = joblist.pop(0)
            pm.update_left(len(joblist))
        '''
        self.update(self.size - left)
    
    def update(self, cursize):
        self.work_mutex.acquire()
        if self.needrefresh:
            self._rawupdate(cursize)
            self.needrefresh = 0
        else:
            delta = time.clock() - self.last_update_time
            if delta < self.min_update_delta:
                pass
            else:
                self._rawupdate(cursize)
        self.work_mutex.release()
    
    def cleanup(self):
        self.work_mutex.acquire()
        if not self.needrefresh:
            self.outstream.write('\r' + ' ' * len(self.prev_message) + '\r')
            self.needrefresh = 1
        self.work_mutex.release()


if __name__ == '__main__':
    progress = ProgressMeter(30, outstream=sys.stderr)
    progress.init('Progress Label', 500)
    for i in range(500+1):
        time.sleep(0.01)
        progress.update(i)
    progress.cleanup()

2 comments

Stephen Benjamin 14 years ago  # | flag

Thanks...works great. Better than alot of the others on here.

Luke Stanley 11 years, 8 months ago  # | flag

If I run this as is, it says:

Traceback (most recent call last): File "eta.py", line 160, in <module> progress.init('Progress Label', 500) File "eta.py", line 95, in init self.update(cursize) File "eta.py", line 140, in update self._rawupdate(cursize) File "eta.py", line 105, in _rawupdate self.eta_calculator.update(cursize) File "eta.py", line 59, in update speed = float(delta_work) / float(delta_time) ZeroDivisionError: float division

(it qualifies it by saying zero, in 2.7)