Skip to content

模式:最小堆 / 优先队列 (Min Heap)

进阶

一句话

存储在数组中的二叉树,最小元素始终在根节点,支持 O(1) 查看和 O(log n) 插入/删除。

互动演示

现实类比

急诊室的分诊台。患者不是按到达顺序看诊——最危急的永远排第一。新患者按严重程度插入队列,系统始终知道谁最紧急,无需逐个扫描。

核心思想

最小堆是一棵完全二叉树,每个父节点都小于其子节点。将其存储在扁平数组中(父在 i,子在 2i+12i+2),避免指针开销并获得缓存友好的访问。

graph TD
    A["1 (根 = 最小值)"] --> B["3"]
    A --> C["2"]
    B --> D["7"]
    B --> E["5"]
    C --> F["4"]
    C --> G["6"]

两个操作维护不变式:

  • 上浮 (sift up) — 插入到末尾后,将元素向上冒泡直到父节点更小
  • 下沉 (sift down) — 移除根节点后(与最后一个元素交换),将新根向下推直到两个子节点都更大
属性
peek(获取最小值)O(1) — 始终在索引 0
push(插入)O(log n) — 从底部上浮
pop(提取最小值)O(log n) — 根与末尾交换后下沉
空间O(n) — 扁平数组,无指针

动手试试 — 插入值并提取最小值,观察上浮和下沉的过程:

生产验证

项目源码用途
ReactSchedulerMinHeap.js#L17-L90React 调度器将任务存储在按 sortIndex(过期时间)排序的最小堆中。peek() 以 O(1) 返回最高优先级任务。整个实现约 75 行。
Linux 内核fair.c#L1407-L1460CFS 的 update_curr 更新虚拟运行时间。pick_next_task_fair(行9234)从红黑树中选择最小 vruntime 的任务——与最小堆"始终访问最小值"原理相同。

实现

typescript
interface HeapNode {
  sortIndex: number;
  id: number;
}

class MinHeap<T extends HeapNode> {
  private heap: T[] = [];

  peek(): T | null {
    return this.heap[0] ?? null;
  }

  push(node: T): void {
    this.heap.push(node);
    this.siftUp(this.heap.length - 1);
  }

  pop(): T | null {
    if (this.heap.length === 0) return null;
    const first = this.heap[0]!;
    const last = this.heap.pop()!;
    if (this.heap.length > 0) {
      this.heap[0] = last;
      this.siftDown(0);
    }
    return first;
  }

  get size(): number {
    return this.heap.length;
  }

  private siftUp(i: number): void {
    while (i > 0) {
      const parent = (i - 1) >>> 1;
      if (this.compare(this.heap[i]!, this.heap[parent]!) < 0) {
        this.swap(i, parent);
        i = parent;
      } else break;
    }
  }

  private siftDown(i: number): void {
    const len = this.heap.length;
    const half = len >>> 1;
    while (i < half) {
      let smallest = i;
      const left = 2 * i + 1;
      const right = 2 * i + 2;
      if (left < len && this.compare(this.heap[left]!, this.heap[smallest]!) < 0) smallest = left;
      if (right < len && this.compare(this.heap[right]!, this.heap[smallest]!) < 0) smallest = right;
      if (smallest !== i) {
        this.swap(i, smallest);
        i = smallest;
      } else break;
    }
  }

  private compare(a: T, b: T): number {
    const diff = a.sortIndex - b.sortIndex;
    return diff !== 0 ? diff : a.id - b.id;
  }

  private swap(i: number, j: number): void {
    [this.heap[i], this.heap[j]] = [this.heap[j]!, this.heap[i]!];
  }
}
rust
pub struct MinHeap<T: Ord> {
    data: Vec<T>,
}

impl<T: Ord> MinHeap<T> {
    pub fn new() -> Self { MinHeap { data: Vec::new() } }

    pub fn peek(&self) -> Option<&T> { self.data.first() }

    pub fn push(&mut self, val: T) {
        self.data.push(val);
        self.sift_up(self.data.len() - 1);
    }

    pub fn pop(&mut self) -> Option<T> {
        if self.data.is_empty() { return None; }
        let last = self.data.len() - 1;
        self.data.swap(0, last);
        let val = self.data.pop();
        if !self.data.is_empty() { self.sift_down(0); }
        val
    }

    fn sift_up(&mut self, mut i: usize) {
        while i > 0 {
            let parent = (i - 1) / 2;
            if self.data[i] < self.data[parent] {
                self.data.swap(i, parent);
                i = parent;
            } else { break; }
        }
    }

    fn sift_down(&mut self, mut i: usize) {
        let len = self.data.len();
        loop {
            let (left, right) = (2 * i + 1, 2 * i + 2);
            let mut smallest = i;
            if left < len && self.data[left] < self.data[smallest] { smallest = left; }
            if right < len && self.data[right] < self.data[smallest] { smallest = right; }
            if smallest != i { self.data.swap(i, smallest); i = smallest; }
            else { break; }
        }
    }
}
go
type HeapNode struct {
	SortIndex int
	ID        int
}

type MinHeap struct {
	data []HeapNode
}

func (h *MinHeap) Peek() (HeapNode, bool) {
	if len(h.data) == 0 { return HeapNode{}, false }
	return h.data[0], true
}

func (h *MinHeap) Push(node HeapNode) {
	h.data = append(h.data, node)
	h.siftUp(len(h.data) - 1)
}

func (h *MinHeap) Pop() (HeapNode, bool) {
	if len(h.data) == 0 { return HeapNode{}, false }
	val := h.data[0]
	last := len(h.data) - 1
	h.data[0] = h.data[last]
	h.data = h.data[:last]
	if len(h.data) > 0 { h.siftDown(0) }
	return val, true
}

func (h *MinHeap) siftUp(i int) {
	for i > 0 {
		parent := (i - 1) / 2
		if h.less(i, parent) { h.data[i], h.data[parent] = h.data[parent], h.data[i]; i = parent } else { break }
	}
}

func (h *MinHeap) siftDown(i int) {
	n := len(h.data)
	for {
		left, right, smallest := 2*i+1, 2*i+2, i
		if left < n && h.less(left, smallest) { smallest = left }
		if right < n && h.less(right, smallest) { smallest = right }
		if smallest != i { h.data[i], h.data[smallest] = h.data[smallest], h.data[i]; i = smallest } else { break }
	}
}

func (h *MinHeap) less(i, j int) bool {
	if h.data[i].SortIndex != h.data[j].SortIndex { return h.data[i].SortIndex < h.data[j].SortIndex }
	return h.data[i].ID < h.data[j].ID
}
python
import heapq

# Python's heapq module implements a min heap on a list
heap = []

heapq.heappush(heap, (10, "low-priority"))
heapq.heappush(heap, (1, "urgent"))
heapq.heappush(heap, (5, "medium"))

# peek: heap[0] is always the minimum
assert heap[0] == (1, "urgent")

# pop in priority order
assert heapq.heappop(heap) == (1, "urgent")
assert heapq.heappop(heap) == (5, "medium")
assert heapq.heappop(heap) == (10, "low-priority")

# Custom: from-scratch implementation
class MinHeap:
    def __init__(self):
        self._data = []

    def push(self, val):
        self._data.append(val)
        self._sift_up(len(self._data) - 1)

    def pop(self):
        if not self._data:
            return None
        self._data[0], self._data[-1] = self._data[-1], self._data[0]
        val = self._data.pop()
        if self._data:
            self._sift_down(0)
        return val

    def peek(self):
        return self._data[0] if self._data else None

    def _sift_up(self, i):
        while i > 0:
            parent = (i - 1) // 2
            if self._data[i] < self._data[parent]:
                self._data[i], self._data[parent] = self._data[parent], self._data[i]
                i = parent
            else:
                break

    def _sift_down(self, i):
        n = len(self._data)
        while True:
            smallest, left, right = i, 2*i+1, 2*i+2
            if left < n and self._data[left] < self._data[smallest]:
                smallest = left
            if right < n and self._data[right] < self._data[smallest]:
                smallest = right
            if smallest != i:
                self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
                i = smallest
            else:
                break

练习

难度练习文件
基础实现 push、pop、peek 和 sift 操作exercises/typescript/min-heap/01-basic.test.ts
进阶构建 React 风格的任务调度器exercises/typescript/min-heap/02-task-scheduler.test.ts

运行练习:pnpm test:exercises(TypeScript)· cargo test(Rust)· go test ./...(Go)· pytest(Python)

练习文件: Rust exercises/rust/src/min_heap/mod.rs · Go exercises/go/min_heap/min_heap_test.go · Python exercises/python/min_heap/test_min_heap.py

何时使用

  • 任务调度 — 始终处理最高优先级(最低截止时间)的任务
  • 事件驱动系统 — 定时器堆用于在特定时间调度回调
  • 图算法 — Dijkstra 最短路径、Prim 最小生成树
  • 流式 Top-K — 维护流中的 K 个最小/最大元素
  • 操作系统调度器 — CFS 使用具有最小堆属性的树进行公平 CPU 分配

何时不用

  • 需要 O(1) 任意查找 — 堆只保证 O(1) 获取最小值;查找用哈希表
  • 排序迭代 — 如果需要所有元素有序,排序一次更好;反复 pop 是 O(n log n)
  • 小规模固定集合 — 少于 10 个元素时,线性扫描更简单且通常更快
  • 需要稳定排序 — 相同优先级的元素在操作间可能改变顺序

更多生产案例

相关模式

模式关系
合并迭代器 (Merge Iterator / K-Way Merge)K 路合并使用最小堆从多个流中选取最小元素
协作调度 (Cooperative Scheduling)React 调度器使用最小堆选择最高优先级任务
事件循环 / 反应器 (Event Loop / Reactor)事件循环中的定时器队列通常使用最小堆实现最早截止时间调度
B+ 树 (B+ Tree)另一种有序结构——B+ 树优化磁盘访问,堆优化优先级访问

挑战题

Q1: 如何在不改变数据结构的情况下将最小堆转换为最大堆?

答案: 在插入时取反排序键,在提取时再取反回来。

推入 -priority 而不是 priority。最小堆将最负的值(原始最高优先级)放在根部。弹出时再次取反键以恢复原始值。这之所以可行,是因为对取反值的最小堆等价于对原始值的最大堆。Python 的 heapq 社区使用这个技巧,因为标准库只提供最小堆。

Q2: 为什么 React 使用最小堆来调度而不是有序数组?

答案: 有序数组的插入是 O(n)(需要移动元素),而最小堆的插入是 O(log n),查看最小值是 O(1)。

React 的调度器频繁插入具有不同过期时间的新任务,并且总是需要最早过期的任务。有序数组给出 O(1) 的最小值访问但 O(n) 的插入成本(二分搜索 + 移动)。最小堆给出 O(1) 的 peek 和 O(log n) 的插入/移除——对于任务不断添加和移除的动态队列来说,这是更好的权衡。对于静态的一次性排序,有序数组更优。

Q3: 平衡 BST(如红黑树)也能提供 O(log n) 的插入和 O(log n) 的查找最小值。为什么 Linux CFS 使用红黑树而 React 使用最小堆?

答案: CFS 需要在进程退出时移除任意任务(不仅是最小值),BST 处理这个操作是 O(log n),而堆是 O(n)。

最小堆只能高效移除根节点。删除任意元素需要 O(n) 搜索 + O(log n) 筛选。红黑树支持 O(log n) 删除任意节点。CFS 经常移除退出或改变优先级的进程,因此 BST 是合理的。React 的调度器几乎只从前端弹出最高优先级的任务,使更简单的最小堆(具有更小的常数因子和缓存友好的数组布局)成为更好的选择。

Q4: 你有 10 亿条日志条目,需要找到最近的 10 条。应该使用最小堆还是最大堆?大小是多少?

答案: 使用大小为 10 的最小堆。对于每个条目,如果它比堆的最小值大,就弹出最小值并推入新条目。

这是"top-K"模式。大小为 K 的最小堆保持到目前为止看到的 K 个最大元素,其中 K 个里最小的在根部作为守门员。每个新元素与根比较,O(1)——如果更小,跳过;如果更大,替换根,O(log K)。总成本:O(n log K) 加上 O(K) 内存,而不是完全排序的 O(n log n)。

基于 MIT 许可证发布。