# Author: Anand Patil # License: MIT License import matplotlib.pyplot as pl import numpy as np def symmetric(sorted_streams, stream_bounds): """Symmetric baseline""" lb, ub = np.min(stream_bounds[:,0,:],axis=0), np.max(stream_bounds[:,1,:],axis=0) return .5*(lb+ub) def pos_only(sorted_streams, stream_bounds): """Lumps will only be positive""" lb, ub = np.min(stream_bounds[:,0,:],axis=0), np.max(stream_bounds[:,1,:],axis=0) return lb def zero(sorted_streams, stream_bounds): """Zero baseline""" return np.zeros(stream_bounds.shape[2]) def min_weighted_wiggles(sorted_streams, stream_bounds): """Baseline recommended by Byron and Wattenberg""" lb, ub = np.min(stream_bounds[:,0,:],axis=0), np.max(stream_bounds[:,1,:],axis=0) weight = ub-lb sorted_streams = np.abs(sorted_streams) for i in xrange(len(sorted_streams)): sorted_streams[i,:] *= (-1)**i cusum_f = np.vstack((np.zeros(sorted_streams.shape[1]), np.cumsum(sorted_streams[:-1,:], axis=0))) f_prime = np.diff(sorted_streams, axis=1) cusum_f_prime = np.diff(cusum_f, axis=1) g_prime = np.hstack(([0],-np.sum((f_prime*.5 + cusum_f_prime)*sorted_streams[:,1:],axis=0) / weight[1:])) g_prime[np.where(weight==0)] = 0 g = np.cumsum(g_prime) return g def stacked_graph(streams, cmap=pl.cm.bone, color_seq='linear', baseline_fn=min_weighted_wiggles): """ Produces stacked graphs using matplotlib. Reference: 'Stacked graphs- geometry & aesthetics' by Byron and Wattenberg http://www.leebyron.com/else/streamgraph/download.php?file=stackedgraphs_byron_wattenberg.pdf Parameters: - streams: A list of time-series of positive values. Each element must be of the same length. - cmap: A matplotlib color map. Defaults to 'bone'. - colo_seq: 'linear' or 'random'. - baseline_fn: Current options are symmetric, pos_only, zero and min_weighted_wiggles. """ # Sort by onset times onset_times = [np.where(np.abs(stream)>0)[0][0] for stream in streams] order = np.argsort(onset_times) streams = np.asarray(streams) sorted_streams = streams[order] t = np.arange(streams.shape[1]) # Establish bounds stream_bounds = [np.vstack((np.zeros(streams.shape[1]), sorted_streams[0])), np.vstack((-sorted_streams[1], (np.zeros(streams.shape[1]))))] side = -1 for stream in sorted_streams[2:]: side *= -1 if side==1: stream_bounds.append(np.vstack((stream_bounds[-2][1], stream_bounds[-2][1]+stream))) else: stream_bounds.append(np.vstack((stream_bounds[-2][0]-stream, stream_bounds[-2][0]))) stream_bounds = np.array(stream_bounds) # Compute baseline baseline = baseline_fn(sorted_streams, stream_bounds) # Choose colors t_poly = np.hstack((t,t[::-1])) if color_seq=='linear': colors = np.linspace(0,1,streams.shape[1]) elif color_seq=='random': colors = np.random.random(size=streams.shape[1]) else: raise ValueError, 'Color sequence %s unrecognized'%color_seq # Plot pl.axis('off') for i in xrange(len(stream_bounds)): bound = stream_bounds[i] color = cmap(colors[i]) pl.fill(t_poly, np.hstack((bound[0]-baseline,(bound[1]-baseline)[::-1])), facecolor=color, linewidth=0.,edgecolor='none') # Demo if __name__ == '__main__': pl.clf() N_dsets = 50 T = 100 amp = 1 fade = .15 dsets = [] for i in xrange(N_dsets): this_dset = np.zeros(T) t_onset = np.random.randint(.9*T)-T/3 if t_onset >= 0: remaining_t = np.arange(T-t_onset) else: remaining_t = np.arange(T)-t_onset this_dset[max(t_onset,0):]=np.exp(-.15*np.random.gamma(10,.1)*remaining_t)\ * remaining_t * np.random.gamma(6,.2)# * np.cos(-fade*remaining_t*np.random.gamma(10,.1))**2 dsets.append(this_dset) stacked_graph(dsets, baseline_fn = min_weighted_wiggles, color_seq='random')