Welcome, guest | Sign In | My Account | Store | Cart
2

A simple but useful progress meter for python, rendered in text.

Python, 110 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
"""
Here is a silly example of its usage:

import progress
import time
import random

total = 1000
p = progress.ProgressMeter(total=total)

while total > 0:
    cnt = random.randint(1, 25)
    p.update(cnt)
    total -= cnt
    time.sleep(random.random())


Here is an example of its output:

[------------------------->                                   ] 41%  821.2/sec
"""
import time, sys, math

class ProgressMeter(object):
    ESC = chr(27)
    def __init__(self, **kw):
        # What time do we start tracking our progress from?
        self.timestamp = kw.get('timestamp', time.time())
        # What kind of unit are we tracking?
        self.unit = str(kw.get('unit', ''))
        # Number of units to process
        self.total = int(kw.get('total', 100))
        # Number of units already processed
        self.count = int(kw.get('count', 0))
        # Refresh rate in seconds
        self.rate_refresh = float(kw.get('rate_refresh', .5))
        # Number of ticks in meter
        self.meter_ticks = int(kw.get('ticks', 60))
        self.meter_division = float(self.total) / self.meter_ticks
        self.meter_value = int(self.count / self.meter_division)
        self.last_update = None
        self.rate_history_idx = 0
        self.rate_history_len = 10
        self.rate_history = [None] * self.rate_history_len
        self.rate_current = 0.0
        self.last_refresh = 0
        self._cursor = False
        self.reset_cursor()

    def reset_cursor(self, first=False):
        if self._cursor:
            sys.stdout.write(self.ESC + '[u')
        self._cursor = True
        sys.stdout.write(self.ESC + '[s')

    def update(self, count, **kw):
        now = time.time()
        # Caclulate rate of progress
        rate = 0.0
        # Add count to Total
        self.count += count
        self.count = min(self.count, self.total)
        if self.last_update:
            delta = now - float(self.last_update)
            if delta:
                rate = count / delta
            else:
                rate = count
            self.rate_history[self.rate_history_idx] = rate
            self.rate_history_idx += 1
            self.rate_history_idx %= self.rate_history_len
            cnt = 0
            total = 0.0
            # Average rate history
            for rate in self.rate_history:
                if rate == None:
                    continue
                cnt += 1
                total += rate
            rate = total / cnt
        self.rate_current = rate
        self.last_update = now
        # Device Total by meter division
        value = int(self.count / self.meter_division)
        if value > self.meter_value:
            self.meter_value = value
        if self.last_refresh:
            if (now - self.last_refresh) > self.rate_refresh or \
                (self.count >= self.total):
                    self.refresh()
        else:
            self.refresh()

    def get_meter(self, **kw):
        bar = '-' * self.meter_value
        pad = ' ' * (self.meter_ticks - self.meter_value)
        perc = (float(self.count) / self.total) * 100
        return '[%s>%s] %d%%  %.1f/sec' % (bar, pad, perc, self.rate_current)

    def refresh(self, **kw):
        # Clear line
        sys.stdout.write(self.ESC + '[2K')
        self.reset_cursor()
        sys.stdout.write(self.get_meter(**kw))
        # Are we finished?
        if self.count >= self.total:
            sys.stdout.write('\n')
        sys.stdout.flush()
        # Timestamp
        self.last_refresh = time.time()

This meter refreshes its output, creating a standing arrow that fills the width of its constraints as its internal counter is incremented.

10 comments

Nicholas 11 years, 4 months ago  # | flag

Does not work on OSX (as it is). Nice idea!

Unfortunately this does not work on OSX by default. The default terminal does not understand the save/load cursor ANSI sequence. However it does understand the GOTO COL 0 sequence ( ESC[G ). A way to fix this for OSX would be to change the following line:

sys.stdout.write(self.ESC + '[2K' )

to

sys.stdout.write(self.ESC + '[2K' + self.ESC+'[G')

I think this sequence would work on ANY terminal, meaning that you could zero out the function reset_cursor() and all function calls referring to it.

Denis Barmenkov 11 years, 4 months ago  # | flag

Avoiding use of ANSI codes. To avoid ANSI, you can use character Backspace (0x08) to move cursor one position backward, so entire recipe becomes:

"""
Here is a silly example of its usage:

import progress
import time
import random

total = 1000
p = progress.ProgressMeter(total=total)

while total > 0:
    cnt = random.randint(1, 25)
    p.update(cnt)
    total -= cnt
    time.sleep(random.random())


Here is an example of its output:

[------------------------->                                   ] 41%  821.2/sec


2006-02-20 Denis Barmenkov: ANSI codes replaced by Backspace (0x08) characters
"""
import time, sys, math

class ProgressMeter(object):
    #ESC = chr(27)
    def __init__(self, **kw):
        # What time do we start tracking our progress from?
        self.timestamp = kw.get('timestamp', time.time())
        # What kind of unit are we tracking?
        self.unit = str(kw.get('unit', ''))
        # Number of units to process
        self.total = int(kw.get('total', 100))
        # Number of units already processed
        self.count = int(kw.get('count', 0))
        # Refresh rate in seconds
        self.rate_refresh = float(kw.get('rate_refresh', .5))
        # Number of ticks in meter
        self.meter_ticks = int(kw.get('ticks', 60))
        self.meter_division = float(self.total) / self.meter_ticks
        self.meter_value = int(self.count / self.meter_division)
        self.last_update = None
        self.rate_history_idx = 0
        self.rate_history_len = 10
        self.rate_history = [None] * self.rate_history_len
        self.rate_current = 0.0
        self.last_refresh = 0
        self.prev_meter_len = 0

    def update(self, count, **kw):
        now = time.time()
        # Caclulate rate of progress
        rate = 0.0
        # Add count to Total
        self.count += count
        self.count = min(self.count, self.total)
        if self.last_update:
            delta = now - float(self.last_update)
            if delta:
                rate = count / delta
            else:
                rate = count
            self.rate_history[self.rate_history_idx] = rate
            self.rate_history_idx += 1
            self.rate_history_idx %= self.rate_history_len
            cnt = 0
            total = 0.0
            # Average rate history
            for rate in self.rate_history:
                if rate == None:
                    continue
                cnt += 1
                total += rate
            rate = total / cnt
        self.rate_current = rate
        self.last_update = now
        # Device Total by meter division

(comment continued...)

Denis Barmenkov 11 years, 4 months ago  # | flag

(...continued from previous comment)

        value = int(self.count / self.meter_division)
        if value > self.meter_value:
            self.meter_value = value
        if self.last_refresh:
            if (now - self.last_refresh) > self.rate_refresh or \
                (self.count >= self.total):
                    self.refresh()
        else:
            self.refresh()

    def get_meter(self, **kw):
        bar = '-' * self.meter_value
        pad = ' ' * (self.meter_ticks - self.meter_value)
        perc = (float(self.count) / self.total) * 100
        return '[%s>%s] %d%%  %.1f/sec' % (bar, pad, perc, self.rate_current)

    def refresh(self, **kw):
        # Clear line and return cursor to start-of-line
        sys.stdout.write(' ' * self.prev_meter_len + '\x08' * self.prev_meter_len)
        # Get meter text
        meter_text = self.get_meter(**kw)
        # Write meter and return cursor to start-of-line
        sys.stdout.write(meter_text + '\x08'*len(meter_text))
        self.prev_meter_len = len(meter_text)

        # Are we finished?
        if self.count >= self.total:
            sys.stdout.write('\n')
        sys.stdout.flush()
        # Timestamp
        self.last_refresh = time.time()
Pádraig Brady 11 years, 4 months ago  # | flag

spinner. Cool. On a related note, I created a "spinner" to use when you don't know how long it will take.

http://www.pixelbeat.org/talks/python/spinner.py

Alexis Lopez 6 years, 10 months ago  # | flag

added time-to-completion estimation and refinements to my liking.

'''
A class that creates a text-based progress bar instance.

example progress bar object: 
pb_ex = ProgressMeter(total=6000,unit='operations',ticks=25)
examples progress bar format:
[=========|---------------] 17%  22.5 operations/sec. 03 min 13 sec remaining
[=========================] 100% completed in 4 hours 42 min 17 sec

Licensed under the PSF License
source: http://code.activestate.com/recipes/473899-progress-meter/
credits:
2006-02-16 vishnubob: original code
2006-02-20 Denis Barmenkov: ANSI codes replaced by Backspace (0x08) characters
2010-05-17 Alexis Lopez: total time taken and time-to-completion estimation
------------------------------------------------------------------------------
Usage: from progressmeter import ProgressMeter
'''

import sys
import time

class ProgressMeter(object):
    def __init__(self, *args, **kw):
        '''Initiates the progress bar object.'''
        # What time do we start tracking our progress from?
        self.timestamp = kw.get('timestamp', None)
        # What kind of unit are we tracking?
        self.unit = str(kw.get('unit', ''))
        # Number of units to process
        self.total = int(kw.get('total', 100))
        # Number of units already processed
        self.count = int(kw.get('count', 0))
        # Refresh rate in seconds
        self.rate_refresh = float(kw.get('rate_refresh', .5))
        # Number of ticks in meter
        self.meter_ticks = int(kw.get('ticks', 60))
        self.meter_division = float(self.total) / self.meter_ticks
        self.meter_value = int(self.count / self.meter_division)
        self.last_update = None
        self.rate_history_idx = 0
        self.rate_history_len = 10
        self.rate_history = [None] * self.rate_history_len
        self.rate_current = 0.0
        self.last_refresh = 0
        self.prev_meter_len = 0
        self.switch_off = False
        self.estimated_duration = []

(comment continued...)

Alexis Lopez 6 years, 10 months ago  # | flag

(...continued from previous comment)

    def update(self, count, *args, **kw):
        '''Updates the progress bar internal counter by adding the value of the 
           "count" variable to the internal counter.'''
        if not self.timestamp: self.timestamp = time.time()
        now = time.time()
        # Calculate rate of progress
        rate = 0.0
        # Add count to Total
        self.count += count
        self.count = min(self.count, self.total)
        if self.last_update:
            delta = now - float(self.last_update)
            if delta:
                rate = count / delta
            else:
                rate = count
            self.rate_history[self.rate_history_idx] = rate
            self.rate_history_idx += 1
            self.rate_history_idx %= self.rate_history_len
            cnt = 0
            total = 0.0
            # Average rate history
            for rate in self.rate_history:
                if rate == None:
                    continue
                cnt += 1
                total += rate
            rate = total / cnt
            self.estimated_duration.append((self.total - self.count) / rate)
        self.rate_current = rate
        self.last_update = now
        # Device Total by meter division
        value = int(self.count / self.meter_division)
        if value > self.meter_value:
            self.meter_value = value
        if self.last_refresh:
            if (now - self.last_refresh) > self.rate_refresh or (self.count >= self.total):
                    self._refresh()
        else:
            self._refresh()

    def set(self, percentage, *args, **kw):
        '''Sets the progress bar internal counter to a value such that the
           percentage progress is close to the specified value of the
           "percentage" variable.'''
        if percentage < 100:
            count = int((float(percentage) / 100) * self.total)
            count = count - self.count
        else:
            self.count = self.total
            count = 0
        self.update(count)

    def start(self, *args, **kw):
        '''Sets the progress bar timestamp.
           If this method is not called then ProgressMeter.update() or
           ProgressMeter.set() automatically sets the progress bar timestamp on
           its first usage. Does NOT override user specified __init__ settings
           same as with ProgressMeter.update() or ProgressMeter.set()''' 
        if not self.timestamp: self.timestamp = time.time()

(comment continued...)

Alexis Lopez 6 years, 10 months ago  # | flag

(...continued from previous comment)

    def reset(self, *args, **kw):
        '''Resets the progress bar to the initial settings with self.count
           set to 0'''
        # What time do we start tracking our progress from?
        timestamp = kw.get('timestamp', None)
        # What kind of unit are we tracking?
        unit = str(kw.get('unit', self.unit))
        # Number of units to process
        total = int(kw.get('total', self.total))
        # Number of units already processed
        count = int(kw.get('count', 0))
        # Refresh rate in seconds
        rate_refresh = float(kw.get('rate_refresh', self.rate_refresh))
        # Number of ticks in meter
        meter_ticks = int(kw.get('ticks', self.meter_ticks))
        self.__init__(timestamp=timestamp, unit=unit, total=total, count=count, rate_refres=rate_refresh,
                  ticks=meter_ticks)

    def _get_meter(self, *args, **kw):
        perc = (float(self.count) / self.total) * 100
        bar = '=' * self.meter_value
        if self.count >= self.total:
            pad = '=' * (self.meter_ticks - self.meter_value)
            # Time taken to completion
            dur = time.time() - self.timestamp
            # Converting into hours, minutes and seconds
            hours, remainder = divmod(dur,3600)
            minutes, seconds = divmod(remainder,60)
            if dur < 60:
                est = ' completed in %.f sec' % seconds
            elif dur >= 60 and dur < 3600:
                est = ' completed in %.f min %.f sec' % (minutes, seconds)
            else:
                if hours == 1:
                    est = ' completed in %.f hour %.f min %.f sec' % (hours, minutes, seconds)
                else:
                    est = ' completed in %.f hours %.f min %.f sec' % (hours, minutes, seconds)
            return '>[%s=%s] %d%%%s' % (bar, pad, perc, est)

(comment continued...)

Alexis Lopez 6 years, 10 months ago  # | flag

(...continued from previous comment)

        else:
            pad = '-' * (self.meter_ticks - self.meter_value)
            if len(self.estimated_duration) >= 1:
                # parameters to refine time-remaining estimation
                if self.estimated_duration[-1] < 15: exp = 1.75; dat = 10
                elif self.estimated_duration[-1] >= 15 and self.estimated_duration[-1] < 30: exp = 1.5; dat = 15
                elif self.estimated_duration[-1] >= 30 and self.estimated_duration[-1] < 90: exp = 1.25; dat = 50 
                else: exp = 1.00; dat = 50
                # Calculation of time-remaining estimation
                wght_num, wght_den = (0 , 0)
                for i in range(0,min(len(self.estimated_duration),dat)):
                    wght_num += self.estimated_duration[-dat:][i] * ((i+1)**exp)
                    wght_den += (i+1)**exp
                est_dur = int(wght_num / wght_den)
                # Converting into hours, minutes and seconds
                hours,  remainder = divmod(est_dur,3600)
                minutes, seconds = divmod(remainder,60)
                if est_dur < 60:
                    est = ' %02.f seconds remaining' % seconds
                elif est_dur >= 60 and est_dur < 3600:
                    est = ' %02.f min %02.f sec remaining' % (minutes, seconds)
                else:
                    if hours == 1:
                        est = ' %.f hour %02.f min %02.f sec remaining' % (hours, minutes, seconds)
                    else:
                        est = ' %.f hours %02.f min %02.f sec remaining' % (hours, minutes, seconds)
                return '>[%s|%s] %d%%  %.1f %s/sec.%s' % (bar, pad, perc,
                                                      self.rate_current,
                                                      self.unit, est)
            else:
                return '>[%s|%s] %d%%  %.1f %s/sec. Calculating time remaining...' % (bar, pad, perc,
                                                                                      self.rate_current,
                                                                                      self.unit)

(comment continued...)

Alexis Lopez 6 years, 10 months ago  # | flag

(...continued from previous comment)

    def _refresh(self, *args, **kw):
        if self.switch_off:
            return
        else:
            # Clear line and return cursor to start-of-line
            sys.stdout.write(' ' * self.prev_meter_len + '\x08' * self.prev_meter_len)
            # Get meter text
            meter_text = self._get_meter(*args, **kw)
            # Write meter and return cursor to start-of-line
            sys.stdout.write(meter_text + '\x08'*len(meter_text))
            self.prev_meter_len = len(meter_text)

            # Are we finished?
            if self.count >= self.total:
                sys.stdout.write('\n')
                # Switches off method refresh (equivalent to killing the meter)
                # this is a safety measure in case of wrong usage where loops
                # continue beyond ProgressMeter.set(100) or ProgressMeter.update(total)
                self.switch_off = True
            sys.stdout.flush()

            # Timestamp
            self.last_refresh = time.time()

if __name__ == '__main__':

    import random

    print "this is a test of method ProgressMeter.update"

    total=5000
    pba = ProgressMeter(total=total,unit='apples')
    while total > 0:
        cnt = random.randint(20, 30)
        pba.update(cnt)
        total -= cnt
        time.sleep(random.uniform(.25,0.75))

    print "this is a test of method ProgressMeter.set"

    pct = 0
    pbb = ProgressMeter(total=997,unit='oranges',ticks=80)
    while pct < 105:
        pct += random.randint(2, 5)
        pbb.set(pct)
        time.sleep(random.uniform(.25,0.75))

    print "this is a test of method ProgressMeter.reset and ProgressMeter.start and ProgressMeter.set(100)"

    pct = 0
    pbb.reset(unit='bananas',ticks=40)
    pbb.start()
    time.sleep(2)
    pbb.set(0)
    while pct < 94:
        pct += random.randint(2, 5)
        pbb.set(pct)
        time.sleep(random.uniform(.25,0.75))
    pbb.set(100)

    # Checks switch off in case of wrong usage (no console output expected)
    pbb.set(110)
    pbb.update(100000)

    print "this is a test message"
    print "the program will shutdown automatically"
    time.sleep(5)
    print "shutting down"
    time.sleep(0.15)
    sys.exit()
Antonio Martin 6 years, 7 months ago  # | flag

Thanks! It was exactly what I was looking for.

Add a comment

Sign in to comment

Created by vishnubob on Thu, 16 Feb 2006 (PSF)
Python recipes (4587)
vishnubob's recipes (2)

Required Modules

Other Information and Tasks