Algorithms, Models, Data Structures, and Systems from Scratch

The from-scratch designs and implementations of important and influential Algorithms, ML Models, Data Structures, and Systems.

Algorithms

Boyer-Moore Majority Vote Algorithm
from numbers import Real


def majority_element(nums: list[Real]) -> Real | None:
    """Boyer-Moore Majority Vote Algorithm for finding the majority element."""

    if not nums:
        return None

    candidate = 0
    count = 0
    for num in nums:
        candidate = candidate if count else num
        count += 1 if candidate == num else -1
    return candidate
Bresenham's Circle Drawing Algorithm
import matplotlib.pyplot as plt


def bresenham(radius: float) -> None:
    """Draws a circle using Bresenham's Circle Drawing Algorithm."""

    # Initial parameters
    x = 0
    y = radius
    d = 3 - 2 * radius

    # Consider the upper half of the first quadrant
    while x <= y:
        # Put eight points via eight-way symmetry
        plt.scatter([x, x, -x, -x, y, y, -y, -y], [y, -y, y, -y, x, -x, x, -x])

        # Make decisions and update parameters
        if d <= 0:
            d += 4 * x + 6
        else:
            d += 4 * (x - y) + 10
            y -= 1
        x += 1

    # Show the drawing
    plt.title("Circle Drawing Using Bresenham's Circle Drawing Algorithm")
    plt.show()
Floyd's Cycle-Finding Algorithm
def floyd(head: LinkedListNode | None) -> bool:
    """Checks for a cycle using Floyd's cycle-finding algorithm."""

    slow = head
    fast = head
    while fast and fast.next:
        slow = slow.next
        fast = fast.next.next

        if slow == fast:
            return True

    return False
Kadane's Algorithm
from numbers import Real


def max_subarray_sum(nums: list[Real]) -> Real | None:
    """Finds the maximum subarray sum using Kadane's algorithm."""

    if not nums:
        return None

    cur = nums[0]
    res = nums[0]
    for idx in range(1, len(nums)):
        cur = max(cur + nums[idx], nums[idx])
        res = max(res, cur)
    return res
Levenshtein Distance
def levenshtein_distance(word1: str, word2: str) -> int:
    """Computes the Levenshtein distance between two words."""

    m, n = len(word1), len(word2)

    dp = [[0] * (n + 1) for _ in range(m + 1)]

    for i in range(m + 1):
        for j in range(n + 1):
            if i * j == 0:
                dp[i][j] = i + j
            elif word1[i - 1] == word2[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1])

    return dp[m][n]
Longest Common Subsequence
def longest_common_subsequence(word1: str, word2: str) -> int:
    """Computes the Longest Common Subsequence (LCS) between two words."""

    m, n = len(word1), len(word2)

    dp = [[0] * (n + 1) for _ in range(m + 1)]

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if word1[i - 1] == word2[j - 1]:
                dp[i][j] = 1 + dp[i - 1][j - 1]
            else:
                dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])

    return dp[m][n]
Fair Coin from Unfair Coin
def unfair_flip() -> int:
    """An unfair flip: assume it is provided."""

    ...


def fair_flip() -> int:
    """Simulates a fair coin from an unfair coin (0 is heads and 1 is tails)."""

    flip1, flip2 = unfair_flip(), unfair_flip()

    return flip1 | 0 if flip1 ^ flip2 else fair_flip()

System Design

Cache (LFU)
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass


@dataclass
class Node:
    """Doubly linked list node."""

    key: int | None = None
    val: int | None = None

    next: Node | None = None
    prev: Node | None = None

    freq: int = 1


class List:
    """Doubly linked list."""

    def __init__(self) -> None:
        """Initializes the doubly linked list."""

        self.head = Node()
        self.tail = Node()

        self.head.next = self.tail
        self.tail.prev = self.head

    def is_empty(self) -> bool:
        """Checks whether a doubly linked list is empty."""

        return self.head.next == self.tail

    def push_front(self, node: Node) -> None:
        """Moves a node to the front."""

        head_next = self.head.next

        self.head.next = node
        head_next.prev = node

        node.next = head_next
        node.prev = self.head

    def pop(self, node: Node | None = None) -> Node | None:
        """Removes and returns the last node."""

        if self.is_empty():
            return None

        if node is None:
            node = self.tail.prev

        node.next.prev = node.prev
        node.prev.next = node.next

        return node


class LFUCache:
    """List Frequently Used (LFU) cache."""

    def __init__(self, capacity: int) -> None:
        """Initializes the cache with the given capacity."""

        self.capacity = capacity
        self.key_to_node = {}
        self.freqmap = defaultdict(List)
        self.min_freq = 1

    def _update(self, node: Node) -> None:
        """Performs operations needed to comply with the LFU cache requirements."""

        self.freqmap[node.freq].pop(node)

        if self.min_freq == node.freq and self.freqmap[node.freq].is_empty():
            self.min_freq += 1
        node.freq += 1

        self.freqmap[node.freq].push_front(node)

    def put(self, key: int, val: int) -> None:
        """Puts a key-value pair into the cache."""

        if key in self.key_to_node:
            node = self.key_to_node[key]
            self._update(node)
            node.val = val
        else:
            if len(self.key_to_node) == self.capacity:
                node = self.freqmap[self.min_freq].pop()
                del self.key_to_node[node.key]

            new_node = Node(key, val)
            self.key_to_node[key] = new_node
            self.freqmap[new_node.freq].push_front(new_node)
            self.min_freq = new_node.freq

    def get(self, key: int) -> int | None:
        """Gets a value by key from the cache."""

        if key in self.key_to_node:
            node = self.key_to_node[key]
            self._update(node)
            return node.val

        return None
Cache (LRU)
from __future__ import annotations
from dataclasses import dataclass


@dataclass
class Node:
    """Doubly linked list node."""

    key: int | None = None
    val: int | None = None

    next: Node | None = None
    prev: Node | None = None


class List:
    """Doubly linked list."""

    def __init__(self) -> None:
        """Initializes the doubly linked list."""

        self.head = Node()
        self.tail = Node()

        self.head.next = self.tail
        self.tail.prev = self.head

    def is_empty(self) -> bool:
        """Checks whether a doubly linked list is empty."""

        return self.head.next == self.tail

    def push_front(self, node: Node) -> None:
        """Moves a node to the front."""

        head_next = self.head.next

        self.head.next = node
        head_next.prev = node

        node.next = head_next
        node.prev = self.head

    def pop(self, node: Node | None = None) -> Node | None:
        """Removes and returns the last node."""

        if self.is_empty():
            return None

        if node is None:
            node = self.tail.prev

        node.next.prev = node.prev
        node.prev.next = node.next

        return node


class LRUCache:
    """Least Recently Used (LRU) cache."""

    def __init__(self, capacity: int) -> None:
        """Initializes the cache with the given capacity."""

        self.capacity = capacity
        self.container = List()
        self.key_to_node = {}

    def _update(self, node: Node) -> None:
        """Performs operations needed to comply with the LRU cache requirements."""

        self.container.pop(node)
        self.container.push_front(node)

    def put(self, key: int, val: int) -> None:
        """Puts a key-value pair into the cache."""

        if key in self.key_to_node:
            node = self.key_to_node[key]
            self._update(node)
            node.val = val
        else:
            if len(self.key_to_node) == self.capacity:
                node = self.container.pop()
                del self.key_to_node[node.key]

            new_node = Node(key, val)
            self.key_to_node[key] = new_node
            self.container.push_front(new_node)


    def get(self, key: int) -> int | None:
        """Gets a value by key from the cache."""

        if key in self.key_to_node:
            node = self.key_to_node[key]
            self._update(node)
            return node.val

        return None
Cache (RR)
import random

from dataclasses import dataclass
from typing import Any


@dataclass
class RRCache:
    """Random Replacement (RR) cache."""

    container: list[Any] = []
    capacity: int = 16

    def put(self, key: Any, val: Any) -> None:
        """Puts a key-value pair."""

        if len(self.container) == self.capacity:
            self.container.pop()

        self.container.append((key, val))

    def get(self) -> tuple[Any, Any] | None:
        """Gets a random key-value pair."""

        return random.choice(self.container) if self.container else None

    def pop(self) -> tuple[Any, Any] | None:
        """Removes and returns a random key-value pair."""

        if not self.container:
            return None

        idx = random.randrange(len(self.container))
        self.container[0], self.container[idx] = self.container[idx], self.container[0]

        return self.container.pop()
Data Stream Statistics
import random

from dataclasses import dataclass


@dataclass
class DataStreamStats:
    """Data Stream Statistics."""

    size: float = 0
    mean: float = 0
    sum_of_square_diffs: float = 0
    var: float = 0
    std: float = 0
    random: float = 0

    def add(self, val: float) -> None:
        """Adds a value to the stream."""

        self.size += 1
        old_mean = self.mean
        self.mean += (val - self.mean) / self.size
        self.sum_of_square_diffs += (val - old_mean) * (val - self.mean)
        self.var = self.sum_of_square_diffs / self.size
        self.std = self.var**0.5

        if random.random() < 1 / self.size:
            self.random = val
In-Memory File System
from __future__ import annotations
from dataclasses import dataclass, field


@dataclass
class FileSystemNode:
    """File system node."""

    subdirs: dict[str, FileSystemNode] = field(default_factory=dict)
    content: str = ""


class FileSystem:
    """An in-memory file system."""

    def __init__(self):
        self.root = FileSystemNode()

    def _find_or_create(self, path: str, create: bool) -> FileSystemNode | None:
        """Finds the node at path and optionally, creates intermediate directories."""

        if len(path) <= 1:
            return self.root

        ptr = self.root
        for word in path.split("/")[1:]:
            if word not in ptr.subdirs:
                if not create:
                    return None
                ptr.subdirs[word] = FileSystemNode()
            ptr = ptr.subdirs[word]

        return ptr

    def ls(self, path: str) -> List[str]:
        """Lists the content of the provided path."""

        if ptr := self._find_or_create(path, create=False):
            return path.split("/")[-1:] if ptr.content else sorted(ptr.subdirs.keys())

        return []

    def mkdir(self, path: str) -> None:
        """Makes a new directory or new directories."""

        self._find_or_create(path, create=True)

    def add_content_to_file(self, filepath: str, content: str) -> None:
        """Appends the provided content to the given filepath."""

        if ptr := self._find_or_create(filepath, create=True):
            ptr.content += content

        return None

    def read_content_from_file(self, filepath: str) -> str | None:
        """Reads content from the given filepath."""

        if ptr := self._find_or_create(filepath, create=False):
            return ptr.content

        return None
Prefix Tree
from __future__ import annotations
from dataclasses import dataclass, field


@dataclass
class PrefixTreeNode:
    """Prefix tree node."""

    children: dict[str, PrefixTreeNode] = field(default_factory=dict)
    is_end: bool = False


class PrefixTree:
    """Prefix tree."""

    def __init__(self) -> None:
        """Initializes a prefix tree as a node."""

        self.root = PrefixTreeNode()

    def insert(self, word: str) -> None:
        """Inserts a word into the prefix tree."""

        ptr = self.root
        for char in word:
            if char not in ptr.children:
                ptr.children[char] = PrefixTreeNode()
            ptr = ptr.children[char]
        ptr.is_end = True

    def search(self, word: str) -> bool:
        """Searches a word in the prefix tree."""

        ptr = self.root
        for char in word:
            if char not in ptr.children:
                return False
            ptr = ptr.children[char]
        return ptr.is_end

    def starts_with(self, prefix: str) -> bool:
        """Searches a prefix in the prefix tree."""

        ptr = self.root
        for char in prefix:
            if char not in ptr.children:
                return False
            ptr = ptr.children[char]
        return True
Stack (Maximum Frequency)
from collections import defaultdict


class MaximumFrequencyStack:
    """Maximum frequency stack."""

    def __init__(self) -> None:
        """Initializes the stack."""

        self.freqmap = {}
        self.groups = defaultdict(list)
        self.maxfreq = 0

    def push(self, val: int) -> None:
        """Pushes a value onto the stack."""

        freq = self.freqmap.get(val, 0) + 1

        self.freqmap[val] = freq
        self.groups[freq].append(val)
        self.maxfreq = max(self.maxfreq, freq)

    def pop(self) -> int:
        """Pops a value from the stack."""

        val = self.groups[self.maxfreq].pop()
        self.freqmap[val] -= 1

        if not self.groups[self.maxfreq]:
            self.maxfreq -= 1

        return val
Stack (Minimum)
from dataclasses import dataclass
from numbers import Real


@dataclass
class MinStack:
    """Minimum stack."""

    stack: list[Real] = []

    def push(self, val: Real) -> None:
        """Pushes a value onto the stack."""

        if not self.stack:
            self.stack.append((val, val))
        else:
            self.stack.append((val, min(val, self.stack[-1][1])))

    def pop(self) -> Real | None:
        """Pops a value from the stack."""

        return self.stack.pop()[0] if self.stack else None

    def top(self) -> Real | None:
        """Gets the top value from the stack."""

        return self.stack[-1][0] if self.stack else None

    def min(self) -> Real | None:
        """Gets the minimum value from the stack."""

        return self.stack[-1][1] if self.stack else None

Machine Learning and Statistical Models

Convolutional Neural Network (CNN)
import numpy as np


def conv2d(image: np.ndarray, kernel: np.ndarray, stride: int = 1, padding: int = 0) -> np.ndarray:
    """Computes a 2D Convolution of a single-channel image via given kernel, stride, and padding."""

    # Flipping a kernel is necessary as otherwise, we get cross-correlation, which is a related, but
    # different operation. Convolution has some nice mathematical properties, which are not always
    # present in cross-correlation.
    kernel = np.flipud(np.fliplr(kernel))

    image_height, image_width = image.shape
    kernel_height, kernel_width = kernel.shape

    out_height = int(np.floor((image_height - kernel_height + 2 * padding) / stride) + 1)
    out_width = int(np.floor((image_width - kernel_width + 2 * padding) / stride) + 1)

    out = np.zeros((out_height, out_width))
    for h in range(out_height):
        for w in range(out_width):
            out[h][w] = (image[h : h + kernel_height, w : w + kernel_width] * kernel).sum()

    return out


if __name__ == "__main__":
    images, labels = get_data()

    kernel = np.array([[1, 1, 1], [0, 0, 0], [-1, -1, -1]])
    features = np.array([conv2d(image, kernel) for image in images])

    lr = LogisticRegression(epochs=1_000, logging=True)
    lr.fit(np.maximum(0, features), labels)
Gaussian Naive Bayes
import numpy as np


class GaussianNaiveBayes:
    """Gaussian Naive Bayes."""

    def fit(self, features: np.ndarray, labels: np.ndarray) -> None:
        """Fits the Gaussian Naive Bayes model."""

        self.labels = labels
        self.unique_labels = np.unique(labels)

        self.params = []
        # For the given label, calculate the mean and variance of all features
        for label in self.unique_labels:
            label_features = features[self.labels == label]
            self.params.append([(col.mean(), col.var()) for col in label_features.T])

    def likelihood(self, data: float, mean: float, var: float) -> float:
        """Calculates the Gaussian likelihood of the data with the given mean and variance."""

        # NOTE: Added in denominator to prevent division by zero
        eps = 1e-4

        coeff = 1 / np.sqrt(2 * np.pi * var + eps)
        exponent = np.exp(-((data - mean) ** 2 / (2 * var + eps)))

        return coeff * exponent

    def predict(self, features: np.ndarray) -> np.ndarray:
        """Performs inference using Bayes' Theorem:  P(A | B) = P(B | A) * P(A) / P(B)."""

        num_samples, _ = features.shape

        predictions = np.empty(num_samples)
        for idx, feature in enumerate(features):
            posteriors = []
            for label_idx, label in enumerate(self.unique_labels):
                # Prior is the mean of what we have
                prior = np.log((self.labels == label).mean())

                # Naive assumption (independence):
                #   P(a0, a1, a2 | B) = P(a0 | B) * P(a1 | B) * P(a2 | B)
                pairs = zip(feature, self.params[label_idx])
                likelihood = np.sum([np.log(self.likelihood(f, m, v)) for f, (m, v) in pairs])

                # Posterior = Prior * Likelihood / Scaling Factor (ignoring scaling factor)
                posteriors.append(prior + likelihood)

            # Store the label with the largest posterior probability
            predictions[idx] = self.unique_labels[np.argmax(posteriors)]

        return predictions
K-Means Clustering (KMeans)
import numpy as np

from dataclasses import dataclass


@dataclass
class KMeans:
    k: int
    iterations: int
    rtol: float = 1e-05
    atol: float = 1e-08

    def fit(self, features: np.ndarray) -> None:
        """Clusters the data."""

        n_samples, _ = features.shape

        self.centroids = features[np.random.choice(n_samples, size=self.k, replace=False)]
        self.closest = np.empty(n_samples)

        for _ in range(self.iterations):
            old_closest = self.closest.copy()

            distances = np.linalg.norm(features[:, None] - self.centroids, axis=2)
            self.closest = np.argmin(distances, axis=1)

            one_hot = np.eye(self.closest.max() + 1)[self.closest]
            self.centroids = one_hot.T @ features / one_hot.sum(axis=0)[:, None]

            if np.allclose(self.closest, old_closest, rtol=self.rtol, atol=self.atol):
                break
K-Nearest Neighbors (k-NN)
import numpy as np

from dataclasses import dataclass


@dataclass
class KNN:
    features: np.ndarray
    labels: np.ndarray
    k: int

    def predict(self, features: np.ndarray) -> np.ndarray:
        """Performs inference using the given features."""

        num_samples, _ = features.shape

        predictions = np.empty(num_samples)
        for idx, feature in enumerate(features):
            distances = ((self.features - feature) ** 2).sum(axis=1) ** 0.5
            k_sorted_idxs = np.argsort(distances)[: self.k]
            most_common_idx = np.bincount(k_sorted_idxs).argmax()
            predictions[idx] = self.labels[most_common_idx]

        return predictions
Linear Regression
import numpy as np

from dataclasses import dataclass


@dataclass
class LinearRegression:
    """Linear Regression."""

    epochs: int
    learning_rate: float
    logging: bool

    def fit(self, features: np.ndarray, labels: np.ndarray) -> None:
        """Fits the Linear Regression model."""

        num_samples, num_features = features.shape
        self.weights, self.bias = np.zeros(num_features), 0

        for epoch in range(self.epochs):
            residuals = labels - self.predict(features)

            d_weights = -2 / num_samples * features.T.dot(residuals)
            d_bias = -2 / num_samples * residuals.sum()

            self.weights -= self.learning_rate * d_weights
            self.bias -= self.learning_rate * d_bias

            if self.logging:
                print(f"MSE Loss [{epoch}]: {(residuals ** 2).mean():.3f}")

    def predict(self, features: np.ndarray) -> np.ndarray:
        """Performs inference using the given features."""

        return features.dot(self.weights) + self.bias
Logistic Regression
import numpy as np

from dataclasses import dataclass


@dataclass
class LogisticRegression:
    """Logistic Regression."""

    learning_rate: float
    epochs: int
    threshold: float
    logging: bool

    def sigmoid(self, predictions: np.ndarray) -> np.ndarray:
        """The numerically stable implementation of the Sigmoid activation function."""

        neg_mask = predictions < 0
        pos_mask = ~neg_mask

        zs = np.empty_like(predictions)
        zs[neg_mask] = np.exp(predictions[neg_mask])
        zs[pos_mask] = np.exp(-predictions[pos_mask])

        res = np.ones_like(predictions)
        res[neg_mask] = zs[neg_mask]

        return res / (1 + zs)

    def mean_log_loss(self, predictions: np.ndarray, labels: np.ndarray) -> np.float32:
        """Computes the mean Cross Entropy Loss (in binary classification, also called Log-loss)."""

        return -(labels * np.log(predictions) + (1 - labels) * np.log(1 - predictions)).mean()

    def fit(self, features: np.ndarray, labels: np.ndarray) -> None:
        """Fits the Logistic Regression model."""

        num_samples, num_features = features.shape
        self.weights, self.bias = np.zeros(num_features), 0

        for epoch in range(self.epochs):
            prediction = self.sigmoid(features.dot(self.weights) + self.bias)
            difference = prediction - labels  # type: ignore

            d_weights = features.T.dot(difference) / num_samples
            d_bias = difference.sum() / num_samples

            self.weights -= self.learning_rate * d_weights
            self.bias -= self.learning_rate * d_bias

            if self.logging:
                print(f"Mean Log-loss [{epoch}]: {self.mean_log_loss(prediction, labels):.3f}")

    def predict(self, features: np.ndarray) -> np.ndarray:
        """Performs inference using the given features."""

        return np.where(self.sigmoid(features.dot(self.weights) + self.bias) < self.threshold, 0, 1)
Principal Component Analysis
import numpy as np


def pca(
    data: np.ndarray,
    n_components: int,
    standardize: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Performs dimensionality reduction via Principal Component Analysis."""

    if standardize:
        data = (data - data.mean(axis=0)) / data.std(axis=0)

    cov = np.cov(data.T)
    eigval, eigvec = np.linalg.eigh(cov)

    # idx = np.argsort(eigval)[::-1][:n_components]
    idx = np.arange(len(eigval) - 1, len(eigval) - 1 - n_components, -1)
    eigval, eigvec = eigval[idx], eigvec[:, idx]

    feat = data @ eigvec

    return feat, eigval, eigvec
Byte Pair Encoding
import collections


def bpe(text: str, num_merges: int = 3, start_token: int = 90) -> tuple[str, list[str]]:
    """Byte-Pair Encoding (BPE) algorithm."""

    replacements = []
    for _ in range(num_merges):
        if pair_counts := collections.Counter(zip(text, text[1:])):
            max_freq_pair = max(pair_counts, key=pair_counts.get)
            start, target = "".join(max_freq_pair), chr(start_token)
            text = text.replace(start, target)
            start_token -= 1
            replacements.append((start, target))
    return text, replacements