Welcome, guest | Sign In | My Account | Store | Cart

A common task, especially in text processing, is to break some input sequence into chunks (lines, paragraphs, records, etc.) and process them one by one. The iterators of builtin strings and files can be considered such chunkers, breaking the object into characters and lines respectively. This recipe is a generic way to break any iterable into consecutive blocks, specified by a delimiter or a pair of (start,end) delimiters.

Python, 69 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from operator import itemgetter
from itertools import groupby,imap

_undefined = object()

def iterblocks(iterable, start, end=_undefined, skip_delim=True):
    '''Create an iterator over consecutive items (I{blocks}) of the given iterable.

    @param start: The delimiter denoting the start of a block. It can be:
        1. a predicate C{p(item)} that returns true if the item is a
        delimiter and false otherwise, or
        2. a non-callable object C{obj}: equivalent to C{lambda item: item==start}
    @param end: If not None, the delimiter denoting the end of a block. Items
        after an end delimiter but before the next start delimiter are skipped.
        Takes the same values as C{start}.

    @param skip_delim: True if the delimiter(s) are to be skipped, false otherwise.
    @type skip_delim: C{bool}
    '''
    def get_predicate(arg):
        return arg if callable(arg) else (
               arg.__eq__ if hasattr(arg,'__eq__') else
               lambda item: item == arg)
    def stamped_items(items):
        count = 0
        startsblock = get_predicate(start)
        if end is _undefined:
            for item in items:
                if startsblock(item):
                    count += 1
                    if skip_delim: continue
                yield count,item
        else:
            endsblock = get_predicate(end)
            inblock = False
            for item in items:
                if inblock:
                    if endsblock(item):
                        inblock = False
                        if skip_delim: continue
                elif startsblock(item):
                    count += 1
                    inblock = True
                    if skip_delim: continue
                else: continue
                yield count,item
    get2nd = itemgetter(1)
    for count, block in groupby(stamped_items(iterable), itemgetter(0)):
        yield imap(get2nd, block)

if __name__ == '__main__':
    import re
    # a slow version of str.split
    for chars in iterblocks('Hello World', re.compile(r'\s').match):
        print ''.join(chars)
    source = """\
> name1....

line_11
line_12
line_13
...
> name2 ...

line_21
line_22
...""".splitlines()
    for lines in iterblocks(source, start=re.compile('>').match, end='...'):
        print list(lines)

v1.1: Fixed bug for consecutive delimiters v1.2: Changed "end=None" with "end=_undefined" since None can be a valid delimiter v1.3: Added forgotten imports

3 comments

Raymond Hettinger 16 years, 10 months ago  # | flag

Variant with itertools.groupby(). FWIW, with a well crafted key= function, groupby() makes short work of data partitioning problems:

def blocks(s, start, end):
    def classify(c, ingroup=[0]):
        klass = c==start and 2 or c==end and 3 or ingroup[0]
        ingroup[0] = klass==1 or klass==2
        return klass
    return [tuple(g) for k, g in groupby(s, classify) if k == 1]

print blocks('the {quick} brown {fox} jumped', start='{', end='}')
George Sakkis (author) 16 years, 10 months ago  # | flag

Neat! It's also trivial to retain the delimiters by changing "return result" to "return result if skip_delim else result>=1". groupby is a real swiss knife!

Jim Pryor 16 years, 1 month ago  # | flag

Incorporating comment #1: Here's a rewrite, using the method described in comment #1, and retaining the optional end and skip_delim logic of the original post:

from itertools import groupby
def chunked(s, start='', end='', skip_delim=True):
    def classify1(c, ingroup=[1]):
        key = 3*ingroup[0] if c==end else ingroup[0]
        if key==3 or key==-3:
            ingroup[0] = -ingroup[0]
        return key if skip_delim else -ingroup[0] if (key==3 or key==-3) else ingroup[0]
    def classify2(c, ingroup=[1,False]):
        key = 2 if c==start else 3 if (c==end and ingroup[1]) else ingroup[0] if ingroup[1] else 0
        if key==3:
            ingroup[0] = -ingroup[0]
        ingroup[1] = key==2 or key==1 or key==-1
        return key if skip_delim else 0 if key==0 else -ingroup[0] if key==3 else ingroup[0]
    classify = classify2
    if end=='':
        if start=='':
            yield ''.join(s)
            return
        else:
            end = start
            classify = classify1
    elif start=='':
        classify = classify1
    cur={}
    for k,g in groupby(s,classify):
        if (k==1 or k==-1):
            if 1 in cur:
                yield ''.join(cur[1])
            cur[1] = tuple(g)
        elif k==3 or k==-3:
            yield ''.join(cur.setdefault(1,()))
            del cur[1]
    if classify==classify1 or 1 in cur:
        yield ''.join(cur.get(1,()))