Welcome, guest | Sign In | My Account | Store | Cart
import random
import copy
import time

def supersequence(x, y):
    """ True if and only if all elements in y occur in x in order (x is a supersequence of y)"""
    idx = 0
    try:
        for i in y:
            idx = x.index(i, idx)+1
    except ValueError, e:
        return False
    return True


def subsequence(x, y):
    """ x is a subsequence of y  <==>  y is a supersequence of x """
    return supersequence(y, x)


def is_supersequence_of_sequences(sequence, sequences):
    """ True if sequence is a supersequence of every s in sequences """
    result = True
    for s in sequences:
        if not supersequence(sequence, s):
            return False
    return result


def remove_redundant_sequences(redundant_sequences, debug=False):
    """ Removes every doublure sequence and every sequence that is a subsequence of another """
    if debug:
        print "Removing redundant sequences:"
    sequences = []
    for i,s in enumerate(redundant_sequences):
        remove = False
        for j in range(i+1, len(redundant_sequences)):
            if s == redundant_sequences[j]:
                remove = True
                if debug:
                    print "  Found doublure     ", s, "==", redundant_sequences[j]
        if not remove:
            sequences.append(s)
    sequences_p = []
    for i,s1 in enumerate(sequences):
        found = False
        for j,s2 in enumerate(sequences):
            if ((i != j) and (supersequence(s2, s1))):
                found = True
                if debug:
                    print "  Found supersequence", s2, "for", s1
                break
        if not found:
            sequences_p.append(s1)
    if debug:
        print "Removed %s redundant sequence(s). Reduced set:"%(len(redundant_sequences)-len(sequences_p))
        for i,s in enumerate(sequences_p):
            print " s%s:"%i, s
    return sequences_p


def lower_bound(scs):
    """ Finds a lower bound based on some fast algorithms."""
    def scs_length(s1, s2):
        """ shortest common supersequence length for two single sequences
            can be computed using dynamic programming. Based on algorithm for
            computing Longest Common Subsequence (== len(s1)+len(s2)-SCS )"""
        S = []
        for i in range(0, len(s1)+1):
            S.append([-1]*(len(s2)+1))
        for i in range(len(s1)+1):
            S[i][0] = i
        for j in range(len(s2)+1):
            S[0][j] = j
        for i in range(1, len(s1)+1):
            for j in range(1, len(s2)+1):
                if s1[i-1] == s2[j-1]:
                    S[i][j] = min(S[i-1][j]+1, S[i][j-1]+1, S[i-1][j-1]+1)
                else:
                    S[i][j] = min(S[i-1][j]+1, S[i][j-1]+1)
        return S[len(s1)][len(s2)]

    def find_max_scs_length_of_2_combinations(sequences):
        """ Returns maximum scs_length(s1,s2) for any s1, s2 in sequences. Note that this does not
            guarantee the length of the actual SCS of the entire set of sequences. """
        result = -1
        for i in range(len(sequences)):
            for j in range(i+1, len(sequences)):
                length = scs_length(sequences[i], sequences[j])
                if length > result:
                    result = length
        return result

    def max_count_occurrences(sequences):
        """ for every value v in the alphabet, for all s in sequences, count occurrences of v in s.
            The length of the SCS has be least the sum of maximum occurrences."""
        lower_bound = 0
        for v in set([s for seq in sequences for s in seq]):
            count_v = -1
            for s in sequences:
                count = s.count(v)
                if count > count_v:
                    count_v = count
            lower_bound += count_v
        return lower_bound

    return max(max_count_occurrences(scs), find_max_scs_length_of_2_combinations(scs))


def upper_bound(sequences, max_run_time_random_seconds = 1):
    """ Finds an upper bound based on some fast approximation algorithms."""

    def alphabet_leftmost(sequences):
        """ Approximation algorithm by looping through a random permutation on the alphabet """
        seqs = copy.deepcopy(sequences)
        common_supersequence = []
        permutation = list(set([x for s in seqs for x in s]))
        random.shuffle(permutation)
        i = 0
        while any(seqs):
            found = False
            for s in seqs:
                if s and s[0] == permutation[i]:
                    found = True
                    break
            if found:
                for s in seqs:
                    if s and s[0] == permutation[i]:
                        s.remove(permutation[i])
                common_supersequence.append(permutation[i])
            i = (i+1) % len(permutation)
        return common_supersequence


    def alphabet_leftmost_rand(sequences):
        """ Approximation algorithm using random selection on alphabet values. """
        seqs = copy.deepcopy(sequences)
        common_supersequence = []
        while any(seqs):
            firsts = list(set([s[0] for s in seqs if s]))
            next = firsts[random.randint(0, len(firsts)-1)]
            for s in seqs:
                if s and s[0] == next:
                    s.remove(next)
            common_supersequence.append(next)
        return common_supersequence

    def majority_merge(seqs):
        """ Quite fast approximation algorithm """
        sequences = copy.deepcopy(seqs)
        common_supersequence = []
        while any(sequences):
            firsts = [(s[0], len(s)) for s in sequences if s]
            counts = {}
            for x in firsts:
                if x[0] in counts:
                    counts[x[0]] += x[1]    #use +1 if not weighted MM
                else:
                    counts[x[0]] = x[1]     #use 1 if not weighted MM
            most_common = sorted(counts, key=lambda x: counts[x], reverse=True)[0]
            common_supersequence.append(most_common)
            for s in sequences:
                if s and s[0] == most_common:
                    s.remove(most_common)
                    if s == []:
                        sequences.remove([])
        return common_supersequence

    trivial = sum(len(s) for s in sequences)
    mm = len(majority_merge(sequences))
    alm, almr = 10000000, 10000000
    start = time.time()
    while time.time() < start + max_run_time_random_seconds:
        alm_test = alphabet_leftmost(sequences)
        if len(alm_test) < alm:
            alm = len(alm_test)
        almr_test = alphabet_leftmost_rand(sequences)
        if len(almr_test) < almr:
            almr = len(almr_test)
    upperbounds = [trivial, mm, alm, almr]
    solutions = bfs_genetic(sequences, keep_best = 1000, best_so_far = min(upperbounds))
    if solutions:
        upperbounds.append(sorted(solutions)[0])
    return min(upperbounds)


def backtrack_shortest_common_supersequences(sequences, upperbound=None):
    """ backtrack with pruning, then filter out the shortest SCSs (they are not unique!)"""
    if not upperbound:
        upperbound = 10000000
    result = backtrack_scs(sequences, [], upperbound)
    return [x for x in result if len(x) == min([len(y) for y in result])]

def backtrack_all_common_supersequences(sequences):
    """ backtrack all supersequences (not just shortest) """
    upperbound = 10000000
    return backtrack_scs(sequences, [], upperbound, False)

def backtrack_scs(sequences, choices, best_so_far, prune=True):
    """ Recursive DFS algorithm, potentially risky with large #sequences or long s in sequences"""
    solutions = []
    if any(sequences):
        # no need to evaluate subsolution choices if it will become longer than best_so_far.
        if not prune or len(choices) + max([len(s) for s in sequences]) <= best_so_far:
            firsts = set([s[0] for s in sequences if s])
            for c in firsts:
                indices = []
                for i,s in enumerate(sequences):
                    if s and s[0] == c:
                        s.remove(c)
                        indices.append(i)
                for solution in backtrack_scs(sequences, choices+[c], best_so_far):
                    solutions.append(solution)
                for i in indices:
                    sequences[i] = [c]+sequences[i]
    else:
        solutions = [choices]
        if len(choices) < best_so_far:
            best_so_far = len(choices)
    return solutions


def bfs_genetic(sequences, keep_best = 100000000, best_so_far = 100000000):
    """ Breath first search algorithm. Prunes away up to keep_best subsolutions according to metric() """
    def metric((choices, remaining_sequences)):
        # TODO: find good metric, given current choices and remaining sequences to solve.
        # return sum(len(s) for s in remaining_sequences)
        return len(remove_redundant_sequences(copy.deepcopy(remaining_sequences)))

    Q = []
    solutions = {}
    firsts = set([s[0] for s in sequences if s])
    for f in firsts:
        seqs = copy.deepcopy(sequences)
        for s in seqs:
            if s and s[0] == f:
                s.remove(f)
        Q.append(([f], seqs))
    depth = 1
    while Q:
        if depth < len(Q[0][0]):
            if len(Q) > keep_best:
                # print "Throwing away", len(Q) - keep_best, "out of", len(Q), "at depth", depth
                Q = sorted(Q, key=metric)[:keep_best]
            depth += 1
        else:
            choices, seqs = Q.pop(0)
            if len(choices) + max([len(s) for s in seqs]) <= best_so_far:
                if any(seqs):
                    firsts = set([s[0] for s in seqs if s])
                    for f in firsts:
                        sequences = copy.deepcopy(seqs)
                        for s in sequences:
                            if s and s[0] == f:
                                s.remove(f)
                        Q.append((choices[:]+[f], sequences))
                else:
                    if depth in solutions:
                        solutions[depth].append(choices[:])
                    else:
                        solutions[depth] = [choices[:]]
    return solutions


def pretty_print_scs_with_sequences(scs, sequences):
    print "SCS: " + "".join([str(s) for s in scs])
    for i,aseq in enumerate(sequences):
        seq = [-1]*len(scs)
        idx = 0
        for v in aseq:
            while scs.index(v, idx) > idx:
                idx += 1
            seq[idx] = v
            idx += 1
        print " s%s: "%i + "".join([str(s) for s in seq]).replace("-1", ".")


if __name__=="__main__":
    def run_SCS_algorithms(sequences):
        sequences = remove_redundant_sequences(sequences, debug=True)
        lowerbound, upperbound = lower_bound(sequences), upper_bound(sequences)
        print "Lowerbound on Shortest Common Supersequence:", lowerbound
        print "Upperbound on Shortest Common Supersequence:", upperbound
        solutions = backtrack_shortest_common_supersequences(sequences, upperbound)
        print "Backtrack found %s SCSs of optimal length %s."%(len(solutions), len(solutions[0]))
        print "E.g.", solutions[0], "is valid:", is_supersequence_of_sequences(solutions[0], sequences)
        pretty_print_scs_with_sequences(solutions[0], sequences)

    sequences_1 = [ [1,2,3],
                    [1,2,5],
                    [3,1,5,4],
                    [1,2,1,5],
                    [1,2,5]]

    sequences_2 = [ [1,2,1,2,3],    # "acacg"
                    [1,4,1,3,1],    # "ataga"
                    [2,1,2,3,4],    # "cacgt"
                    [3,4,1,1,4]]    # "ctaat"

    sequences_3 = [ [5,2,1,6,3,6,3,1],
                    [1,4,1,3,1,5,1,2],
                    [2,1,6,3,4,4,2,3],
                    [3,4,5,1,4,6,1,2]]

    sequences_4 = [ [5,1,3,5,2,6],
                    [1,5,2,1,3,2,3,1],
                    [5,1,3,5,2,1,6,2],
                    [3,5,1,6,2,4,6,1,2]]

    run_SCS_algorithms(sequences_4)

History