Lets walk through an implementation of huffman encoding and decoding in Python. For the purposes of demonstrating key ideas, I’m going to just treat bits as plaintext strings just to keep things simple. While this means that the output isn’t truly compressed as bytes, you will hopefully take away a deeper understanding of how it works under the hood.
Theoretical Overview
Huffman encoding works by exploiting the unequal distribution of character occurrences in text. Rather than encoding every single character with the same number of bites, it encodes characters that occur more frequently with smaller number of bits and those that occur less frequently with greater number of bits.
For example, lets say we have the text abc
.
Without compression, each character in abc
will take up a fixed number of bits – lets say a byte (8 bits) per character using an ASCII character set. That’s 3 bytes for 3 characters or 24 bits total.
If we used a variable length encoding, we can instead use as many bits as we need to identify a character. To illustrate this concept, lets map each character to a variable number of bits like so:
a = 0
b = 10
c = 110
Then abc
will be 010110
which is only 6 bits. That’s 18 bits (75%) less compared to the uncompressed version!
But here’s the catch: we need to make sure that these codes are prefix free, meaning that no code in our set is a prefix of another code. Why? This is best understood with an example. Lets add another character, d
, to our previous set.
a = 0
b = 01
c = 110
d = 10
Now consider if we wanted to encode acb
, we would have 011001
. But upon decoding it, it can be misinterpreted as bdb
(01 – 10 – 01). That’s because the bits for b
contains the prefix of a
– so if you read from left to right, you can either read 0 and stop (which gives you a
) or read both 0 and 1 (which gives you b
). When do you stop reading?
Unless we introduce a delimiter into our output bit stream, we can’t tell where the bits of one character ends and another starts. The only way to tell without a delimiter is to have codes that introduce no ambiguity, and you can accomplish that by ensuring that the codes are prefix free.
This presents two additional challenges that the creator of the huffman encoding solved:
- How do we generate these prefix free codes?
- How do we generate optimal prefix free codes such that we are able to assign shorter codes to higher frequency characters?
The prefix free codes are created by constructing a binary trie. The edges in the tree represent 0’s and 1’s and the leaf nodes of this binary tree represent a unique character in a text. Therefore, the paths represent the code for the character at the leaf. Since the characters are at the leaf nodes, all the paths to those nodes are unique and non-overlapping, making the codes prefix free. To attain optimatily, the trie is constructed bottom up, starting with characters that occur the least often so that the eventual codes (made up of paths from the root of the trie to leaf nodes) are shortest for those that occur the most often.
Implementation Overview
Here’s an overview of both compression and decompression steps:
Compression
- read the text and figure out character frequency
- use frequency to build trie – this generates the bit sequence in the form of trie paths for each character
- generate a code table using trie – this lets us find a corresponding bit sequence code for a character
- encode our text using table to produce a bit stream
- encode our trie as bits. this will be used by decoder
- write both to a file
Decompression
- read the trie bits portion (header) to re-construct the trie. we’ll need this to decode the encoded text
- read the body / text bits portion (this is the encoded form of the actual value we’re trying to get)
The Trie Node
We’ll be using this to construct our trie. this will be referenced throughout the implementation.
class Node:
def __init__(self, char="", left=None, right=None, freq=0):
self.char = char
self.left = left
self.right = right
self.freq = freq
def to_binary(self):
return "{:08b}".format(ord(self.char))
def is_leaf(self):
return (self.left is None and self.right is None)
# necessary for heapq comparisons
# heapq falls back on object comparison when priority keys are equal
def __lt__(a, b):
return a.char < b.char
def __eq__(self, other):
if isinstance(other, Node):
return (
(self.char == other.char) and
(self.left == other.left) and
(self.right == other.right)
)
return False
def __repr__(self):
return "None" if self.char is None else self.char
Code language: HTML, XML (xml)
Compression Process
This is the main method for compression – we’re encoding the text and then we’re including some header metadata for the decoder. The essense of the header metadata is the serialized trie that we constructed for the purposes of encoding.
def compress(text):
trie_tree = build_trie(text)
table = build_code_table(trie_tree)
trie_bits = serialize_trie_to_binary(trie_tree)
header = "{:016b}{}".format(len(trie_bits), trie_bits)
body = encode_text(table, text)
return header + body
Code language: JavaScript (javascript)
The following method uses a min heap to ensure that the most frequently occuring characters (via the freq
attribute) are included in our trie structure last.
def build_trie(text):
from collections import Counter
from heapq import heappush, heappop
char_count = Counter(text)
queue = []
for char, freq in char_count.items():
node = Node(char=char, freq=freq)
heappush(queue, (node.freq, node))
while len(queue) > 1:
freq1, node1 = heappop(queue)
freq2, node2 = heappop(queue)
parent_node = Node(
left=node1,
right=node2,
freq=freq1 + freq2
)
heappush(queue, (parent_node.freq, parent_node))
freq, root_node = heappop(queue)
return root_node
Code language: JavaScript (javascript)
This method constructs our character to code hash table. Our trie lets using decode an encoded stream by allowing us to follow the binary node paths to the characters using bit values in a stream. However, we need to create a character to code mapping in order for our constructed trie to be useful in the encoding process. Otherwise, we would need to scan our entire trie using either DFS or BFS searching for a target character (for every character we want to encode).
def build_code_table(node):
table = {}
def map_char_to_code(node, value):
if node.is_leaf():
table[node.char] = value
return
map_char_to_code(node.left, value + "0")
map_char_to_code(node.right, value + "1")
map_char_to_code(node, "")
return table
Code language: JavaScript (javascript)
In order for a decoder to decode our encoded text, it needs to know the character-to-code mapping we used so this method serializes the trie used in the encoding into bits. It uses a pre-order traversal to encode our trie. If it’s a non-leaf node, we prefix the output with a zero. Otherwise, we prefix it with a 1 followed by the actual bits representing the character.
def serialize_trie_to_binary(node):
if not node:
return ""
if node.is_leaf():
return "1" + node.to_binary()
return "0" + serialize_trie_to_binary(node.left) + serialize_trie_to_binary(node.right)
Code language: JavaScript (javascript)
This method makes use of our character-to-code table to convert characters into bits. This represents our compressed text!
def encode_text(table, text):
output = ""
for x in text:
output += table[x]
return output
Code language: JavaScript (javascript)
Decompression Process
Here’s the main method for decompression. It essentially re-constructs the trie in memory used the bits in the input representing the trie. Then it uses that in-memory trie to decode the bits of the input that represent our encoded text.
def decompress(bit_string):
trie_size = int(f"{bit_string[0:16]}", 2)
trie_range_end = 16 + trie_size
trie = deserialize_binary_trie(bit_string[16:trie_range_end])
body = bit_string[trie_range_end:]
return decode_text(trie, body)
Code language: JavaScript (javascript)
This function does the reverse of serialize_trie_to_binary
. The recursion here takes advantage of the fact that 1
bits are leafs of our trie, therefore it can be used as a base case to continue de-serializing the next trie path. The curr_pos
is used in this function to act as a pointer into our current read position so we know when to start and stop reading input.
def deserialize_binary_trie(bits):
def read_bits(curr_pos):
if curr_pos >= len(bits):
return None, curr_pos
bit = bits[curr_pos]
if bit == "1":
char_range_left = curr_pos+1
char_range_right = char_range_left + 8
char_bits = bits[char_range_left:char_range_right]
return Node(
char=chr(int(char_bits, 2))
), char_range_right
left_node, pos = read_bits(curr_pos + 1)
right_node, pos = read_bits(pos)
return Node(
left = left_node,
right = right_node
), pos
node, pos = read_bits(0)
return node
Code language: JavaScript (javascript)
Finally, with our trie object on hand, this function follows the bits of the encoded text using the trie to find the characters.
def decode_text(node, data):
out = ""
root = node
curr_node = root
for bit in data:
if bit == "0":
curr_node = curr_node.left
else:
curr_node = curr_node.right
if curr_node.is_leaf():
out += curr_node.char
curr_node = root
return out
Code language: PHP (php)
That completes the overview of this basic python representation of the huffman algorithm. In practice, some implementations may used pre-existing code tables rather than generating them on the fly as we did here. For example, if you need fast encoding and know about average frequencies of the text you’re encoding, you may not want to be constructing a new trie on every encode operation.
References
Here’s a couple of resources I used in writing this implementation – I highly encourage you to check them out to understand huffman in even greater depth.
Leave a Reply