NOTE: Recipes have moved! Please visit GitHub.com/activestate/code for the current versions.

A simple but useful progress meter for python, rendered in text.

Python, 110 lines
 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110``` ```""" Here is a silly example of its usage: import progress import time import random total = 1000 p = progress.ProgressMeter(total=total) while total > 0: cnt = random.randint(1, 25) p.update(cnt) total -= cnt time.sleep(random.random()) Here is an example of its output: [-------------------------> ] 41% 821.2/sec """ import time, sys, math class ProgressMeter(object): ESC = chr(27) def __init__(self, **kw): # What time do we start tracking our progress from? self.timestamp = kw.get('timestamp', time.time()) # What kind of unit are we tracking? self.unit = str(kw.get('unit', '')) # Number of units to process self.total = int(kw.get('total', 100)) # Number of units already processed self.count = int(kw.get('count', 0)) # Refresh rate in seconds self.rate_refresh = float(kw.get('rate_refresh', .5)) # Number of ticks in meter self.meter_ticks = int(kw.get('ticks', 60)) self.meter_division = float(self.total) / self.meter_ticks self.meter_value = int(self.count / self.meter_division) self.last_update = None self.rate_history_idx = 0 self.rate_history_len = 10 self.rate_history = [None] * self.rate_history_len self.rate_current = 0.0 self.last_refresh = 0 self._cursor = False self.reset_cursor() def reset_cursor(self, first=False): if self._cursor: sys.stdout.write(self.ESC + '[u') self._cursor = True sys.stdout.write(self.ESC + '[s') def update(self, count, **kw): now = time.time() # Caclulate rate of progress rate = 0.0 # Add count to Total self.count += count self.count = min(self.count, self.total) if self.last_update: delta = now - float(self.last_update) if delta: rate = count / delta else: rate = count self.rate_history[self.rate_history_idx] = rate self.rate_history_idx += 1 self.rate_history_idx %= self.rate_history_len cnt = 0 total = 0.0 # Average rate history for rate in self.rate_history: if rate == None: continue cnt += 1 total += rate rate = total / cnt self.rate_current = rate self.last_update = now # Device Total by meter division value = int(self.count / self.meter_division) if value > self.meter_value: self.meter_value = value if self.last_refresh: if (now - self.last_refresh) > self.rate_refresh or \ (self.count >= self.total): self.refresh() else: self.refresh() def get_meter(self, **kw): bar = '-' * self.meter_value pad = ' ' * (self.meter_ticks - self.meter_value) perc = (float(self.count) / self.total) * 100 return '[%s>%s] %d%% %.1f/sec' % (bar, pad, perc, self.rate_current) def refresh(self, **kw): # Clear line sys.stdout.write(self.ESC + '[2K') self.reset_cursor() sys.stdout.write(self.get_meter(**kw)) # Are we finished? if self.count >= self.total: sys.stdout.write('\n') sys.stdout.flush() # Timestamp self.last_refresh = time.time() ```

This meter refreshes its output, creating a standing arrow that fills the width of its constraints as its internal counter is incremented.

Nicholas 12 years, 2 months ago

Does not work on OSX (as it is). Nice idea!

Unfortunately this does not work on OSX by default. The default terminal does not understand the save/load cursor ANSI sequence. However it does understand the GOTO COL 0 sequence ( ESC[G ). A way to fix this for OSX would be to change the following line:

``````sys.stdout.write(self.ESC + '[2K' )
``````

to

``````sys.stdout.write(self.ESC + '[2K' + self.ESC+'[G')
``````

I think this sequence would work on ANY terminal, meaning that you could zero out the function reset_cursor() and all function calls referring to it.

Denis Barmenkov 12 years, 2 months ago

Avoiding use of ANSI codes. To avoid ANSI, you can use character Backspace (0x08) to move cursor one position backward, so entire recipe becomes:

``````"""
Here is a silly example of its usage:

import progress
import time
import random

total = 1000
p = progress.ProgressMeter(total=total)

while total > 0:
cnt = random.randint(1, 25)
p.update(cnt)
total -= cnt
time.sleep(random.random())

Here is an example of its output:

[------------------------->                                   ] 41%  821.2/sec

2006-02-20 Denis Barmenkov: ANSI codes replaced by Backspace (0x08) characters
"""
import time, sys, math

class ProgressMeter(object):
#ESC = chr(27)
def __init__(self, **kw):
# What time do we start tracking our progress from?
self.timestamp = kw.get('timestamp', time.time())
# What kind of unit are we tracking?
self.unit = str(kw.get('unit', ''))
# Number of units to process
self.total = int(kw.get('total', 100))
# Number of units already processed
self.count = int(kw.get('count', 0))
# Refresh rate in seconds
self.rate_refresh = float(kw.get('rate_refresh', .5))
# Number of ticks in meter
self.meter_ticks = int(kw.get('ticks', 60))
self.meter_division = float(self.total) / self.meter_ticks
self.meter_value = int(self.count / self.meter_division)
self.last_update = None
self.rate_history_idx = 0
self.rate_history_len = 10
self.rate_history = [None] * self.rate_history_len
self.rate_current = 0.0
self.last_refresh = 0
self.prev_meter_len = 0

def update(self, count, **kw):
now = time.time()
# Caclulate rate of progress
rate = 0.0
self.count += count
self.count = min(self.count, self.total)
if self.last_update:
delta = now - float(self.last_update)
if delta:
rate = count / delta
else:
rate = count
self.rate_history[self.rate_history_idx] = rate
self.rate_history_idx += 1
self.rate_history_idx %= self.rate_history_len
cnt = 0
total = 0.0
# Average rate history
for rate in self.rate_history:
if rate == None:
continue
cnt += 1
total += rate
rate = total / cnt
self.rate_current = rate
self.last_update = now
# Device Total by meter division
``````

(comment continued...)

Denis Barmenkov 12 years, 2 months ago

(...continued from previous comment)

``````        value = int(self.count / self.meter_division)
if value > self.meter_value:
self.meter_value = value
if self.last_refresh:
if (now - self.last_refresh) > self.rate_refresh or \
(self.count >= self.total):
self.refresh()
else:
self.refresh()

def get_meter(self, **kw):
bar = '-' * self.meter_value
pad = ' ' * (self.meter_ticks - self.meter_value)
perc = (float(self.count) / self.total) * 100
return '[%s>%s] %d%%  %.1f/sec' % (bar, pad, perc, self.rate_current)

def refresh(self, **kw):
# Clear line and return cursor to start-of-line
sys.stdout.write(' ' * self.prev_meter_len + '\x08' * self.prev_meter_len)
# Get meter text
meter_text = self.get_meter(**kw)
# Write meter and return cursor to start-of-line
sys.stdout.write(meter_text + '\x08'*len(meter_text))
self.prev_meter_len = len(meter_text)

# Are we finished?
if self.count >= self.total:
sys.stdout.write('\n')
sys.stdout.flush()
# Timestamp
self.last_refresh = time.time()
``````
PĂˇdraig Brady 12 years, 2 months ago

spinner. Cool. On a related note, I created a "spinner" to use when you don't know how long it will take.

http://www.pixelbeat.org/talks/python/spinner.py

Alexis Lopez 7 years, 8 months ago

added time-to-completion estimation and refinements to my liking.

``````'''
A class that creates a text-based progress bar instance.

example progress bar object:
pb_ex = ProgressMeter(total=6000,unit='operations',ticks=25)
examples progress bar format:
[=========|---------------] 17%  22.5 operations/sec. 03 min 13 sec remaining
[=========================] 100% completed in 4 hours 42 min 17 sec

source: http://code.activestate.com/recipes/473899-progress-meter/
credits:
2006-02-16 vishnubob: original code
2006-02-20 Denis Barmenkov: ANSI codes replaced by Backspace (0x08) characters
2010-05-17 Alexis Lopez: total time taken and time-to-completion estimation
------------------------------------------------------------------------------
Usage: from progressmeter import ProgressMeter
'''

import sys
import time

class ProgressMeter(object):
def __init__(self, *args, **kw):
'''Initiates the progress bar object.'''
# What time do we start tracking our progress from?
self.timestamp = kw.get('timestamp', None)
# What kind of unit are we tracking?
self.unit = str(kw.get('unit', ''))
# Number of units to process
self.total = int(kw.get('total', 100))
# Number of units already processed
self.count = int(kw.get('count', 0))
# Refresh rate in seconds
self.rate_refresh = float(kw.get('rate_refresh', .5))
# Number of ticks in meter
self.meter_ticks = int(kw.get('ticks', 60))
self.meter_division = float(self.total) / self.meter_ticks
self.meter_value = int(self.count / self.meter_division)
self.last_update = None
self.rate_history_idx = 0
self.rate_history_len = 10
self.rate_history = [None] * self.rate_history_len
self.rate_current = 0.0
self.last_refresh = 0
self.prev_meter_len = 0
self.switch_off = False
self.estimated_duration = []
``````

(comment continued...)

Alexis Lopez 7 years, 8 months ago

(...continued from previous comment)

``````    def update(self, count, *args, **kw):
'''Updates the progress bar internal counter by adding the value of the
"count" variable to the internal counter.'''
if not self.timestamp: self.timestamp = time.time()
now = time.time()
# Calculate rate of progress
rate = 0.0
self.count += count
self.count = min(self.count, self.total)
if self.last_update:
delta = now - float(self.last_update)
if delta:
rate = count / delta
else:
rate = count
self.rate_history[self.rate_history_idx] = rate
self.rate_history_idx += 1
self.rate_history_idx %= self.rate_history_len
cnt = 0
total = 0.0
# Average rate history
for rate in self.rate_history:
if rate == None:
continue
cnt += 1
total += rate
rate = total / cnt
self.estimated_duration.append((self.total - self.count) / rate)
self.rate_current = rate
self.last_update = now
# Device Total by meter division
value = int(self.count / self.meter_division)
if value > self.meter_value:
self.meter_value = value
if self.last_refresh:
if (now - self.last_refresh) > self.rate_refresh or (self.count >= self.total):
self._refresh()
else:
self._refresh()

def set(self, percentage, *args, **kw):
'''Sets the progress bar internal counter to a value such that the
percentage progress is close to the specified value of the
"percentage" variable.'''
if percentage < 100:
count = int((float(percentage) / 100) * self.total)
count = count - self.count
else:
self.count = self.total
count = 0
self.update(count)

def start(self, *args, **kw):
'''Sets the progress bar timestamp.
If this method is not called then ProgressMeter.update() or
ProgressMeter.set() automatically sets the progress bar timestamp on
its first usage. Does NOT override user specified __init__ settings
same as with ProgressMeter.update() or ProgressMeter.set()'''
if not self.timestamp: self.timestamp = time.time()
``````

(comment continued...)

Alexis Lopez 7 years, 8 months ago

(...continued from previous comment)

``````    def reset(self, *args, **kw):
'''Resets the progress bar to the initial settings with self.count
set to 0'''
# What time do we start tracking our progress from?
timestamp = kw.get('timestamp', None)
# What kind of unit are we tracking?
unit = str(kw.get('unit', self.unit))
# Number of units to process
total = int(kw.get('total', self.total))
# Number of units already processed
count = int(kw.get('count', 0))
# Refresh rate in seconds
rate_refresh = float(kw.get('rate_refresh', self.rate_refresh))
# Number of ticks in meter
meter_ticks = int(kw.get('ticks', self.meter_ticks))
self.__init__(timestamp=timestamp, unit=unit, total=total, count=count, rate_refres=rate_refresh,
ticks=meter_ticks)

def _get_meter(self, *args, **kw):
perc = (float(self.count) / self.total) * 100
bar = '=' * self.meter_value
if self.count >= self.total:
pad = '=' * (self.meter_ticks - self.meter_value)
# Time taken to completion
dur = time.time() - self.timestamp
# Converting into hours, minutes and seconds
hours, remainder = divmod(dur,3600)
minutes, seconds = divmod(remainder,60)
if dur < 60:
est = ' completed in %.f sec' % seconds
elif dur >= 60 and dur < 3600:
est = ' completed in %.f min %.f sec' % (minutes, seconds)
else:
if hours == 1:
est = ' completed in %.f hour %.f min %.f sec' % (hours, minutes, seconds)
else:
est = ' completed in %.f hours %.f min %.f sec' % (hours, minutes, seconds)
return '>[%s=%s] %d%%%s' % (bar, pad, perc, est)
``````

(comment continued...)

Alexis Lopez 7 years, 8 months ago

(...continued from previous comment)

``````        else:
pad = '-' * (self.meter_ticks - self.meter_value)
if len(self.estimated_duration) >= 1:
# parameters to refine time-remaining estimation
if self.estimated_duration[-1] < 15: exp = 1.75; dat = 10
elif self.estimated_duration[-1] >= 15 and self.estimated_duration[-1] < 30: exp = 1.5; dat = 15
elif self.estimated_duration[-1] >= 30 and self.estimated_duration[-1] < 90: exp = 1.25; dat = 50
else: exp = 1.00; dat = 50
# Calculation of time-remaining estimation
wght_num, wght_den = (0 , 0)
for i in range(0,min(len(self.estimated_duration),dat)):
wght_num += self.estimated_duration[-dat:][i] * ((i+1)**exp)
wght_den += (i+1)**exp
est_dur = int(wght_num / wght_den)
# Converting into hours, minutes and seconds
hours,  remainder = divmod(est_dur,3600)
minutes, seconds = divmod(remainder,60)
if est_dur < 60:
est = ' %02.f seconds remaining' % seconds
elif est_dur >= 60 and est_dur < 3600:
est = ' %02.f min %02.f sec remaining' % (minutes, seconds)
else:
if hours == 1:
est = ' %.f hour %02.f min %02.f sec remaining' % (hours, minutes, seconds)
else:
est = ' %.f hours %02.f min %02.f sec remaining' % (hours, minutes, seconds)
return '>[%s|%s] %d%%  %.1f %s/sec.%s' % (bar, pad, perc,
self.rate_current,
self.unit, est)
else:
return '>[%s|%s] %d%%  %.1f %s/sec. Calculating time remaining...' % (bar, pad, perc,
self.rate_current,
self.unit)
``````

(comment continued...)

Alexis Lopez 7 years, 8 months ago

(...continued from previous comment)

``````    def _refresh(self, *args, **kw):
if self.switch_off:
return
else:
# Clear line and return cursor to start-of-line
sys.stdout.write(' ' * self.prev_meter_len + '\x08' * self.prev_meter_len)
# Get meter text
meter_text = self._get_meter(*args, **kw)
# Write meter and return cursor to start-of-line
sys.stdout.write(meter_text + '\x08'*len(meter_text))
self.prev_meter_len = len(meter_text)

# Are we finished?
if self.count >= self.total:
sys.stdout.write('\n')
# Switches off method refresh (equivalent to killing the meter)
# this is a safety measure in case of wrong usage where loops
# continue beyond ProgressMeter.set(100) or ProgressMeter.update(total)
self.switch_off = True
sys.stdout.flush()

# Timestamp
self.last_refresh = time.time()

if __name__ == '__main__':

import random

print "this is a test of method ProgressMeter.update"

total=5000
pba = ProgressMeter(total=total,unit='apples')
while total > 0:
cnt = random.randint(20, 30)
pba.update(cnt)
total -= cnt
time.sleep(random.uniform(.25,0.75))

print "this is a test of method ProgressMeter.set"

pct = 0
pbb = ProgressMeter(total=997,unit='oranges',ticks=80)
while pct < 105:
pct += random.randint(2, 5)
pbb.set(pct)
time.sleep(random.uniform(.25,0.75))

print "this is a test of method ProgressMeter.reset and ProgressMeter.start and ProgressMeter.set(100)"

pct = 0
pbb.reset(unit='bananas',ticks=40)
pbb.start()
time.sleep(2)
pbb.set(0)
while pct < 94:
pct += random.randint(2, 5)
pbb.set(pct)
time.sleep(random.uniform(.25,0.75))
pbb.set(100)

# Checks switch off in case of wrong usage (no console output expected)
pbb.set(110)
pbb.update(100000)

print "this is a test message"
print "the program will shutdown automatically"
time.sleep(5)
print "shutting down"
time.sleep(0.15)
sys.exit()
``````
Antonio Martin 7 years, 5 months ago

Thanks! It was exactly what I was looking for.

