Welcome, guest | Sign In | My Account | Store | Cart

Given a large one-dimensional array, break it into blocks of contstant length and compute min and max for each block, the so-called "peaks data".

Python, 21 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from Numeric import *

def peaks(data, step):
	n = len(data) - len(data)%step # ignore tail
	slices = [ data[i:n:step] for i in range(step) ]
	peak_max = reduce(maximum, slices)
	peak_min = reduce(minimum, slices)
	return transpose(array([peak_max, peak_min]))

"""example of use:

>>> x = sin(arrayrange(0, 3.14, 1e-5))
>>> len(x)
314000
>>> peaks(x,10000)
array([[ 0.09982347,  0.        ],
       [ 0.19865953,  0.09983342],
         ...
       [ 0.23924933,  0.14112991],
       [ 0.14112001,  0.04159065]])
"""

In a typical waveform display widget one needs to display a sampled sound of length ~1e6 in a GUI window of 800-1000 pixels width. Thus there are around 1024 samples for pixel, and it seems reasonable to draw both min and max values (there are alternatives, such as RMS energy).

Assuming the data is stored in a Numeric.array (numarray.array would work as well) of length n=m*step, the peaks() function returns (m,2)-shaped array containing min and max for each block of length step in data.

Note that the arrays created by slices = ... assignment do not copy the contents of data, so there is little (or no) memory allocation overhead.

The main disadvantage of the recipe is that it requires (steps-1) passes over the array and the CPU cache usage is non-optimal. The obvious one-pass algorithm should perform better, but i can't figure out how to write the one-pass algorithm in Numeric.

2 comments

Alexander Ross 18 years, 1 month ago  # | flag

This is quite a bit faster. The following is quite a bit faster. And should be directly translatable to Numeric code.

import numpy as np
def peaks(data, step):
    data = data.ravel()
    length = len(data)
    if length % step == 0:
        data.shape = (length/step, step)
    else:
        data.resize((length/step, step))
    max_data = np.maximum.reduce(data,1)
    min_data = np.minimum.reduce(data,1)
    return np.concatenate((max_data[:,np.NewAxis], min_data[:,np.NewAxis]), 1)

Profiling this function on my machine gives 0.015 seconds per call:

>>> x = np.sin(np.arrayrange(0, 3.14, 1e-5))
>>> profiler.run("run_n_times('peaks(x,1000)', 1000)").print_stats()
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      999    0.190    0.000   14.880    0.015 :1(peaks)

Compare with 0.104 seconds per call:

>>> x = sin(arrayrange(0, 3.14, 1e-5))
>>> profiler.run("run_n_times('peaksN(x,1000)', 1000)").print_stats()
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      999    2.160    0.002  104.190    0.104 :1(peaksN)

I don't think it is possilbe to combine the two operations (min and max) with functions that numpy provides, but you could probably write an algorithm in C and use weave to inline the code.

Maxim Krikun (author) 18 years, 1 month ago  # | flag

so what i missed was ufunc's own .reduce() method

thanks a lot for pointing out!