TD9 - Heapsorting

1. Basics

The idea of heapsorting is as follows:

  1. set a correct heap out of the list L to sort (all fathers are greater than their sons)
  2. successively switch the root L[0] with the last entry L[-1] of L (the root is then correctly placed)
  3. drive the new root down until correctly placed in the heap (this is called sifting: we let the "small grain" go down the sift)
  4. iteratively proceed like this over i on the list L[:-i] without last i indices until reaching the root.

If performed "in place", the memory cost is $n$ for $n$ the size of L. Otherwise, one needs to duplicate the list and the cost in memory is $2n$. Since the heap has depth $\log_2 n$, the $n$ sifting operations cost at most $O(\log_2 n)$, so a total cost of $$O(n\log n).$$ We will see below that the "heapification" step 1) has cost $O(n)$, so the total algorithm cost is $O(n\log n)$.

2. Details of the algorithm in place

Heapify (1st step)

Having defined a heap constructor H, that is a complete binary tree (up to size $n$), the heapify() procedure consists in:

  • building up the heap with the values of L in sequence
  • either working with the binary tree or with the vector of indices [1,...,n] representing the heap, we operate recursively (or iteratively):

    • starting from the root, heapify the left child and the right child: hence the children are heapified
    • if the root value is smaller than one of its children, switch the root with its largest child and heapify the corresponding child

    The recursive call of the heapify function ends when meeting a tree (or list) of size 2 or 3 (which we switch accordingly).

Being recursive, this operation costs: $$C(n) = 3C(n/2)$$ where the factor $3$ unfolds from the heapification of both children, plus the possible second heapification of a child. This would lead to a final cost of $$C(n) = O(n^{\log_23}) \simeq O(n^{1.58})$$

This can be improved by replacing the second possible heapification by "sifting down" the root of the corresponding child in its own subtree (classical classification of order $\log n$ in complexity). The resulting cost is then: $$C(n) = 2C(n/2) + \log n $$ and so the final cost of heapify() is $$O(n).$$

Combination with fusion sort

Fusion sorting has cost $O(n\log n)$, i.e., the same as heapsorting. Combining both maintains the total cost to $O(n\log n)$, but with the advantage of heapsorting to be in place (so can be performed in cache on small size lists).

In [293]:
import numpy as np
import sys
Function: divide_left_right
INPUT : indices list
OUTPUT : the division of indices in the two 'children' subsets indices[1, 3,4, 7,8,9,10, ...] and indices[2, 5,6, 11,12,13,14, ...]
In [294]:
## Divides an array of indices in left_child and right_child
## Iterative version on vectors
def divide_left_right(indices):
    n = len(indices)
    left_child = []
    right_child = []
    depth=0
    i=0
    while (2*i+1<n):
        left_child += list(range(2*i+1,min([n,2*i+1+2**depth])))
        if 2*i+1+2**depth<n:
            right_child += list(range(2*i+1+2**depth,min([n,2*i+1+2**(1+depth)])))
            i=2*i+1
            depth+=1
        else:
            break
    return [indices[i] for i in left_child],[indices[i] for i in right_child]
Function: switch_three
INPUT : list of 2 or 3 values [a,b,(c)]
OUTPUT : (the same values sorted with maximal entry in first position , an indicator of the largest index)
In [297]:
## Switches the 2 or 3 values of "father / left_child / [right_child]" by setting max on top
def switch_three(values):
    max_child = -1     # father is already largest
    l = len(values)
    if (l == 3 and values[2]>max([values[0],values[1]])):
        values[0],values[2] = values[2],values[0]
        max_child = 1  # right_child is largest
        
    if (values[1]>values[0]):
        values[0],values[1]=values[1],values[0]
        max_child = 0  # left_child is largest 
        
    return values,max_child
In [295]:
class H:
    def __init__(self,elements):
        H.values=elements
        H.size = len(elements)
                
    def heapify(self,indices=[]):
        if indices == []:
            indices = list(range(self.size))
            
        if 2<=len(indices)<=3:
            new_values = switch_three(H.values[indices])[0]
            for i,r in enumerate(indices):
                H.values[r] = new_values[i]
                
        if len(indices)>3:
            left_child,right_child=divide_left_right(indices)
            
            self.heapify(left_child)
            self.heapify(right_child)                       

            new_values,max_child = switch_three(H.values[indices[:3]])
            for i,r in enumerate(indices[:3]):
                H.values[r] = new_values[i]                
            
            if max_child == 0:
                #self.heapify(left_child)   # this is an option, but this is more costly
                self.sift_down(indices[1],self.size)
            elif max_child == 1:
                #self.heapify(right_child)  # this is an option, but this is more costly
                self.sift_down(indices[2],self.size)
    
    ## Sift down the root in array of length index
    def sift_down(self,j,index):
        max_child = 0
        while (2*j+1<index and max_child>=0):
            local_indices = [i for i in [j,2*j+1,2*j+2] if i<index]
            new_values,max_child = switch_three(H.values[local_indices])
            for i,r in enumerate(local_indices):
                H.values[r] = new_values[i]                            
            j=2*j+1+max_child
    
    ## Switch the root with last unsorted element i, iteratively over decreasing i
    def sort(self):
        self.heapify()
        for i in reversed(range(self.size)):
            self.values[0],self.values[i] = self.values[i],self.values[0]
            self.sift_down(0,i)
    
    def print(self):
        print(H.values)
        
    def treeView(self):
        i=0
        level=0
        while True :
            print(" |---" * level,self.values[i])
            if 2*i+1<=self.size-1:
                i=2*i+1
                level+=1
            else:
                if (i+1 %2 == 1 and i+1<=self.size-1):
                    i+=1
                else:
                    while (i-1)//2 % 2 == 0 and i>0:
                        i=(i-1)//2
                        level-=1
                    i=(i-1)//2+1
                    level-=1
            if i==0:
                return
In [296]:
heapSize = 20
L = np.random.randint(0,100,heapSize)

myHeap = H(L)

print("Before heapifying:\n")
print("L=")
myHeap.print()

myHeap.treeView()

print("\nAfter heapifying:\n")
myHeap.heapify()

print("L=")
myHeap.print()

myHeap.treeView()

print("\nAfter sorting:\n")
myHeap.sort()

print("L=")
myHeap.print()

myHeap.treeView()
Before heapifying:

L=
[38 25 56 18  7 95 27 88  3 74 10 89 45 90 75 40 43 60 96 14]
 38
 |--- 25
 |--- |--- 18
 |--- |--- |--- 88
 |--- |--- |--- |--- 40
 |--- |--- |--- 3
 |--- |--- |--- |--- 60
 |--- |--- 7
 |--- |--- |--- 74
 |--- |--- |--- |--- 14
 |--- |--- |--- 10
 |--- 56
 |--- |--- 95
 |--- |--- |--- 89
 |--- |--- 27
 |--- |--- |--- 90

After heapifying:

L=
[96 88 95 60 74 89 90 43 38 14 10 56 45 27 75 40 25 18  3  7]
 96
 |--- 88
 |--- |--- 60
 |--- |--- |--- 43
 |--- |--- |--- |--- 40
 |--- |--- |--- 38
 |--- |--- |--- |--- 18
 |--- |--- 74
 |--- |--- |--- 14
 |--- |--- |--- |--- 7
 |--- |--- |--- 10
 |--- 95
 |--- |--- 89
 |--- |--- |--- 56
 |--- |--- 90
 |--- |--- |--- 27

After sorting:

L=
[ 3  7 10 14 18 25 27 38 40 43 45 56 60 74 75 88 89 90 95 96]
 3
 |--- 7
 |--- |--- 14
 |--- |--- |--- 38
 |--- |--- |--- |--- 88
 |--- |--- |--- 40
 |--- |--- |--- |--- 90
 |--- |--- 18
 |--- |--- |--- 43
 |--- |--- |--- |--- 96
 |--- |--- |--- 45
 |--- 10
 |--- |--- 25
 |--- |--- |--- 56
 |--- |--- 27
 |--- |--- |--- 74

EXTRAS

In [207]:
from functools import wraps

def trace(func):
    func_name = func.__name__
    separator = '|  '

    trace.recursion_depth = 0

    @wraps(func)
    def traced_func(*args, **kwargs):

        # repeat separator N times (where N is recursion depth)
        # `map(str, args)` prepares the iterable with str representation of positional arguments
        # `", ".join(map(str, args))` will generate comma-separated list of positional arguments
        # `"x"*5` will print `"xxxxx"` - so we can use multiplication operator to repeat separator
        print(f'{separator * trace.recursion_depth}|-- {func_name}({", ".join(map(str, args))})')
        # we're diving in
        trace.recursion_depth += 1
        result = func(*args, **kwargs)
        # going out of that level of recursion
        trace.recursion_depth -= 1
        # result is printed on the next level
        print(f'{separator * (trace.recursion_depth + 1)}|-- return {result}')

        return result

    return traced_func
In [ ]: