ActiveState Code

Recipe 473899: Progress Meter


A simple but useful progress meter for python, rendered in text.

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
"""
Here is a silly example of its usage:

import progress
import time
import random

total = 1000
p = progress.ProgressMeter(total=total)

while total > 0:
    cnt = random.randint(1, 25)
    p.update(cnt)
    total -= cnt
    time.sleep(random.random())


Here is an example of its output:

[------------------------->                                   ] 41%  821.2/sec
"""
import time, sys, math

class ProgressMeter(object):
    ESC = chr(27)
    def __init__(self, **kw):
        # What time do we start tracking our progress from?
        self.timestamp = kw.get('timestamp', time.time())
        # What kind of unit are we tracking?
        self.unit = str(kw.get('unit', ''))
        # Number of units to process
        self.total = int(kw.get('total', 100))
        # Number of units already processed
        self.count = int(kw.get('count', 0))
        # Refresh rate in seconds
        self.rate_refresh = float(kw.get('rate_refresh', .5))
        # Number of ticks in meter
        self.meter_ticks = int(kw.get('ticks', 60))
        self.meter_division = float(self.total) / self.meter_ticks
        self.meter_value = int(self.count / self.meter_division)
        self.last_update = None
        self.rate_history_idx = 0
        self.rate_history_len = 10
        self.rate_history = [None] * self.rate_history_len
        self.rate_current = 0.0
        self.last_refresh = 0
        self._cursor = False
        self.reset_cursor()

    def reset_cursor(self, first=False):
        if self._cursor:
            sys.stdout.write(self.ESC + '[u')
        self._cursor = True
        sys.stdout.write(self.ESC + '[s')

    def update(self, count, **kw):
        now = time.time()
        # Caclulate rate of progress
        rate = 0.0
        # Add count to Total
        self.count += count
        self.count = min(self.count, self.total)
        if self.last_update:
            delta = now - float(self.last_update)
            if delta:
                rate = count / delta
            else:
                rate = count
            self.rate_history[self.rate_history_idx] = rate
            self.rate_history_idx += 1
            self.rate_history_idx %= self.rate_history_len
            cnt = 0
            total = 0.0
            # Average rate history
            for rate in self.rate_history:
                if rate == None:
                    continue
                cnt += 1
                total += rate
            rate = total / cnt
        self.rate_current = rate
        self.last_update = now
        # Device Total by meter division
        value = int(self.count / self.meter_division)
        if value > self.meter_value:
            self.meter_value = value
        if self.last_refresh:
            if (now - self.last_refresh) > self.rate_refresh or \
                (self.count >= self.total):
                    self.refresh()
        else:
            self.refresh()

    def get_meter(self, **kw):
        bar = '-' * self.meter_value
        pad = ' ' * (self.meter_ticks - self.meter_value)
        perc = (float(self.count) / self.total) * 100
        return '[%s>%s] %d%%  %.1f/sec' % (bar, pad, perc, self.rate_current)

    def refresh(self, **kw):
        # Clear line
        sys.stdout.write(self.ESC + '[2K')
        self.reset_cursor()
        sys.stdout.write(self.get_meter(**kw))
        # Are we finished?
        if self.count >= self.total:
            sys.stdout.write('\n')
        sys.stdout.flush()
        # Timestamp
        self.last_refresh = time.time()

Discussion

This meter refreshes its output, creating a standing arrow that fills the width of its constraints as its internal counter is incremented.

Comments

  1. 1. At 4:03 a.m. on 18 feb 2006, Nicholas said:

    Does not work on OSX (as it is). Nice idea!

    Unfortunately this does not work on OSX by default. The default terminal does not understand the save/load cursor ANSI sequence. However it does understand the GOTO COL 0 sequence ( ESC[G ). A way to fix this for OSX would be to change the following line:

    sys.stdout.write(self.ESC + '[2K' )
    

    to

    sys.stdout.write(self.ESC + '[2K' + self.ESC+'[G')
    

    I think this sequence would work on ANY terminal, meaning that you could zero out the function reset_cursor() and all function calls referring to it.

  2. 2. At 1:35 a.m. on 20 feb 2006, Denis Barmenkov said:

    Avoiding use of ANSI codes. To avoid ANSI, you can use character Backspace (0x08) to move cursor one position backward, so entire recipe becomes:

    """
    Here is a silly example of its usage:
    
    import progress
    import time
    import random
    
    total = 1000
    p = progress.ProgressMeter(total=total)
    
    while total > 0:
        cnt = random.randint(1, 25)
        p.update(cnt)
        total -= cnt
        time.sleep(random.random())
    
    
    Here is an example of its output:
    
    [------------------------->                                   ] 41%  821.2/sec
    
    
    2006-02-20 Denis Barmenkov: ANSI codes replaced by Backspace (0x08) characters
    """
    import time, sys, math
    
    class ProgressMeter(object):
        #ESC = chr(27)
        def __init__(self, **kw):
            # What time do we start tracking our progress from?
            self.timestamp = kw.get('timestamp', time.time())
            # What kind of unit are we tracking?
            self.unit = str(kw.get('unit', ''))
            # Number of units to process
            self.total = int(kw.get('total', 100))
            # Number of units already processed
            self.count = int(kw.get('count', 0))
            # Refresh rate in seconds
            self.rate_refresh = float(kw.get('rate_refresh', .5))
            # Number of ticks in meter
            self.meter_ticks = int(kw.get('ticks', 60))
            self.meter_division = float(self.total) / self.meter_ticks
            self.meter_value = int(self.count / self.meter_division)
            self.last_update = None
            self.rate_history_idx = 0
            self.rate_history_len = 10
            self.rate_history = [None] * self.rate_history_len
            self.rate_current = 0.0
            self.last_refresh = 0
            self.prev_meter_len = 0
    
        def update(self, count, **kw):
            now = time.time()
            # Caclulate rate of progress
            rate = 0.0
            # Add count to Total
            self.count += count
            self.count = min(self.count, self.total)
            if self.last_update:
                delta = now - float(self.last_update)
                if delta:
                    rate = count / delta
                else:
                    rate = count
                self.rate_history[self.rate_history_idx] = rate
                self.rate_history_idx += 1
                self.rate_history_idx %= self.rate_history_len
                cnt = 0
                total = 0.0
                # Average rate history
                for rate in self.rate_history:
                    if rate == None:
                        continue
                    cnt += 1
                    total += rate
                rate = total / cnt
            self.rate_current = rate
            self.last_update = now
            # Device Total by meter division
    

    (comment continued...)

  3. 3. At 1:35 a.m. on 20 feb 2006, Denis Barmenkov said:

    (...continued from previous comment)

            value = int(self.count / self.meter_division)
            if value > self.meter_value:
                self.meter_value = value
            if self.last_refresh:
                if (now - self.last_refresh) > self.rate_refresh or \
                    (self.count >= self.total):
                        self.refresh()
            else:
                self.refresh()
    
        def get_meter(self, **kw):
            bar = '-' * self.meter_value
            pad = ' ' * (self.meter_ticks - self.meter_value)
            perc = (float(self.count) / self.total) * 100
            return '[%s>%s] %d%%  %.1f/sec' % (bar, pad, perc, self.rate_current)
    
        def refresh(self, **kw):
            # Clear line and return cursor to start-of-line
            sys.stdout.write(' ' * self.prev_meter_len + '\x08' * self.prev_meter_len)
            # Get meter text
            meter_text = self.get_meter(**kw)
            # Write meter and return cursor to start-of-line
            sys.stdout.write(meter_text + '\x08'*len(meter_text))
            self.prev_meter_len = len(meter_text)
    
            # Are we finished?
            if self.count >= self.total:
                sys.stdout.write('\n')
            sys.stdout.flush()
            # Timestamp
            self.last_refresh = time.time()
    
  4. 4. At 1:59 a.m. on 20 feb 2006, Pádraig Brady said:

    spinner. Cool. On a related note, I created a "spinner" to use when you don't know how long it will take.

    http://www.pixelbeat.org/talks/python/spinner.py

Sign in to comment