Skip to content

模式:Actor 模型

高级

一句话

每个 Actor 拥有一个信箱并按顺序处理消息——没有共享状态,没有锁,仅通过消息传递实现安全并发。

互动演示

现实类比

同事之间只通过密封信件和信箱沟通。没人直接走进别人的办公室——你写一封信,投到他的信箱,然后回去继续自己的工作。每个人一次只处理一封信。

核心思想

Actor 是拥有私有状态和信箱(消息队列)的轻量级进程。Actor 之间仅通过发送异步消息通信。每个 Actor 一次处理一条消息,更新状态并可选地向其他 Actor 发送消息。这从设计上消除了共享状态并发 bug。

text
  Actor A                    Actor B                    Actor C
  ┌──────────────────┐      ┌──────────────────┐      ┌──────────────────┐
  │ State: count=3   │      │ State: items=[]  │      │ State: total=0   │
  │                  │      │                  │      │                  │
  │ Mailbox:         │      │ Mailbox:         │      │ Mailbox:         │
  │ ┌──┬──┬──┐       │ send │ ┌──┬──┐          │      │ ┌──┐             │
  │ │m1│m2│m3│       │─────►│ │m4│m5│          │      │ │m6│             │
  │ └──┴──┴──┘       │      │ └──┴──┘          │      │ └──┘             │
  │ Processing: m1   │      │ Processing: m4   │      │ Idle             │
  └──────────────────┘      └──────────────────┘      └──────────────────┘
属性
并发无共享状态 — 仅消息传递
处理每个 Actor 串行(一次一条消息)
故障隔离Actor 崩溃不会损坏其他 Actor
可扩展性数百万轻量级 Actor(Erlang: 每进程 2KB)

动手试试 — 在 Actor 之间发送消息,观察邮箱处理和状态隔离:

生产验证

项目源码用途
Akka (Scala)Actor.scala#L476-L547trait Actor — 核心 Actor 接口。定义 contextselfsender()def receive: Actor.Receive(L528),每个 Akka Actor 通过偏函数指定其消息处理行为。aroundReceive(L540-L546)是分发钩子。
Erlang/OTPerl_process.h#L1043-L1205struct process — BEAM 虚拟机中 Erlang 进程(Actor)的表示。关键字段:sig_qs(L1107,信号/消息队列 — 信箱)、sig_inq(L1168,并发信号输入队列)、state(L1165,原子进程状态标志)。每个进程是拥有自己堆和信箱的轻量级 Actor。

实现

typescript
type MessageHandler<S> = (state: S, msg: unknown) => S;

class Actor<S> {
  private state: S;
  private mailbox: unknown[] = [];
  private processing = false;

  constructor(initialState: S, private handler: MessageHandler<S>) {
    this.state = initialState;
  }

  send(msg: unknown): void {
    this.mailbox.push(msg);
    if (!this.processing) this.processMailbox();
  }

  private processMailbox(): void {
    this.processing = true;
    while (this.mailbox.length > 0) {
      const msg = this.mailbox.shift()!;
      this.state = this.handler(this.state, msg);
    }
    this.processing = false;
  }

  getState(): S {
    return this.state;
  }
}
rust
use std::collections::VecDeque;

pub struct Actor<M, S> {
    state: S,
    mailbox: VecDeque<M>,
}

impl<M, S> Actor<M, S> {
    pub fn new(initial_state: S) -> Self {
        Actor { state: initial_state, mailbox: VecDeque::new() }
    }

    pub fn send(&mut self, msg: M) {
        self.mailbox.push_back(msg);
    }

    pub fn process<F>(&mut self, handler: F)
    where F: Fn(&S, M) -> S {
        while let Some(msg) = self.mailbox.pop_front() {
            self.state = handler(&self.state, msg);
        }
    }

    pub fn state(&self) -> &S {
        &self.state
    }
}
go
type Actor struct {
	state   interface{}
	mailbox chan interface{}
	handler func(state interface{}, msg interface{}) interface{}
}

func NewActor(initial interface{}, handler func(interface{}, interface{}) interface{}) *Actor {
	a := &Actor{
		state:   initial,
		mailbox: make(chan interface{}, 100),
		handler: handler,
	}
	go a.run()
	return a
}

func (a *Actor) Send(msg interface{}) {
	a.mailbox <- msg
}

func (a *Actor) run() {
	for msg := range a.mailbox {
		a.state = a.handler(a.state, msg)
	}
}
python
from collections import deque
from typing import Any, Callable

class Actor:
    def __init__(self, initial_state: Any, handler: Callable[[Any, Any], Any]):
        self.state = initial_state
        self.handler = handler
        self._mailbox: deque[Any] = deque()
        self._processing = False

    def send(self, msg: Any) -> None:
        self._mailbox.append(msg)
        if not self._processing:
            self._process_mailbox()

    def _process_mailbox(self) -> None:
        self._processing = True
        while self._mailbox:
            msg = self._mailbox.popleft()
            self.state = self.handler(self.state, msg)
        self._processing = False

    def get_state(self) -> Any:
        return self.state

练习

难度练习文件
基础实现带信箱和消息处理的 Actorexercises/typescript/actor-model/01-basic.test.ts
进阶Actor 监督 — 父 Actor 重启崩溃的子 Actorexercises/typescript/actor-model/02-intermediate.test.ts

运行练习:pnpm test:exercises(TypeScript)· cargo test(Rust)· go test ./...(Go)· pytest(Python)

练习文件: Rust exercises/rust/src/actor_model/mod.rs · Go exercises/go/actor_model/actor_model_test.go · Python exercises/python/actor_model/test_actor_model.py

何时使用

  • 分布式系统 — Actor 自然映射到网络节点(Erlang/OTP、Akka Cluster)
  • 游戏服务器 — 每个实体(玩家、NPC、房间)作为独立 Actor
  • 物联网 — 每个设备作为处理传感器事件的 Actor
  • 电信 — Erlang 的起源:数百万并发通话会话
  • 聊天系统 — 每个对话/聊天室作为一个 Actor

何时不用

  • 紧密数据耦合 — 组件需要共享可变状态时,消息传递增加延迟
  • 简单请求-响应 — 函数调用比 Actor 往返更简单
  • 计算密集无并发 — 没有并发收益的 Actor 开销
  • 强一致性 — Actor 提供最终一致性;ACID 需要事务

更多生产案例

相关模式

模式关系
观察者 / 发布-订阅 (Observer / Pub-Sub)Actor 通过消息通信,类似观察者的发布/订阅模式
事件循环 / 反应器 (Event Loop / Reactor)每个 Actor 按顺序处理其信箱,类似单线程事件循环
状态机 (State Machine)Actor 行为通常遵循状态机模式管理其内部逻辑

挑战题

Q1: Actor 之间只通过异步消息通信,没有共享状态和锁。一位同事声称"Actor 不会死锁,因为没有锁"。这个说法正确吗?

答案: 即使没有锁,Actor 仍然可能因为循环消息依赖而发生死锁。

如果 Actor A 向 Actor B 发送消息并等待回复,同时 Actor B 也向 Actor A 发送消息并等待回复,那么双方都无法处理对方的消息——两个邮箱中各有一条未处理的消息,而处理它需要对方先响应。这在逻辑上等价于基于锁的死锁。缓解措施包括:避免 Actor 之间的同步请求-应答模式、为所有消息交换设置超时,或者将消息流设计为 DAG(有向无环图)而非环形结构。

Q2: 你的 Actor 系统中,一个高速生产者 Actor 以每秒 10,000 条消息的速率发送,而消费者 Actor 每秒只能处理 100 条。消费者的邮箱无限增长。Actor 系统应该如何处理这种背压?

答案: 使用有界邮箱并配合显式的背压信号——当邮箱满时,发送方必须丢弃消息、阻塞或收到拒绝信号。

无界邮箱是 Actor 系统中常见的陷阱——它用内存换取活性,最终会导致 OOM 崩溃。Akka 提供了 BoundedMailbox,在邮箱满时阻塞发送方,并通过 Akka Streams 实现流控(响应式流背压)。Erlang 进程的邮箱在设计上是无界的,但依赖 OTP 监督树来重启内存消耗过大的进程。架构层面的洞察是:背压是系统设计问题,而不仅仅是单个 Actor 的问题——你需要在每个生产者-消费者边界上决定当消费者跟不上时该怎么办。

Q3: 一个正在处理支付消息的 Actor 由于 bug 在执行过程中崩溃。支付被部分处理(资金已扣除但未入账)。Erlang/OTP 如何在不破坏系统的情况下处理 Actor 崩溃?

答案: OTP 的监督树会用全新的状态重启崩溃的 Actor——关键洞察是 Actor 状态是临时的,真正的数据源在别处(数据库、消息日志)。

Erlang 的"任其崩溃"哲学意味着 Actor 不会尝试从意外错误中恢复——它们直接终止,由 supervisor 进程重启。但这只在 Actor 的副作用是幂等的或事务性的情况下才有效。对于支付场景,扣款和入账应该包裹在数据库事务中,或者 Actor 应该使用 outbox 模式:先将意图写入持久化日志,再执行操作。如果在执行过程中崩溃,重启后的 Actor 会重放日志。Actor 模型隔离了崩溃(其他 Actor 不受影响),但持久性和一致性仍然需要显式设计。

Q4: Erlang 可以在单台机器上运行数百万个 Actor(进程),每个仅占用约 2KB 内存。本文档中的 Go 实现使用 goroutine 配合 channel 邮箱。你能以同样的方式运行 100 万个 Go Actor 吗?

答案: 就 goroutine 数量而言可以(Go 支持数百万个 goroutine),但实现中每个 channel 分配了 100 个元素的缓冲区,累积的 channel 开销是显著的。

一个 goroutine 的初始栈为 2KB(自 Go 1.4 起固定),因此 100 万个 goroutine 仅栈内存就需要约 2GB。每个带缓冲的 channel 还会额外占用"缓冲区大小 x 元素大小"的内存。自 Go 1.14 起,goroutine 通过信号实现异步抢占,因此 CPU 密集型 Actor 不会饿死其他 Actor。更深层的区别在于 Erlang 的按进程垃圾回收——每个 Actor 的 GC 暂停是独立的且在微秒级别。Go 的 GC 是全局但并发的,STW 暂停通常在亚毫秒级别(自 Go 1.8+ 起常低于 100μs)。真正的权衡在于 Erlang 的按进程 GC 将暂停影响局部化,而 Go 的并发 GC 需要遍历整个堆——在极端 Actor 数量下差异显著。对于真正大规模的 Actor 数量,Erlang 的 BEAM VM 是专门为此构建的;Go 可以近似实现,但有不同的 GC 权衡。

基于 MIT 许可证发布。