一个系统同时要求两件看似矛盾的事情:一方面,核心业务逻辑必须具备事务级别的强一致性,业务规则复杂且不容出错;另一方面,成千上万的用户需要以毫秒级的延迟接收状态更新,对并发和实时性要求极高。试图用单一技术栈完美解决这两个问题,通常会导致架构上的妥协。要么为了实时性牺牲业务模型的严谨性,要么为了模型的一致性而牺牲系统的响应能力。
在真实项目中,这种两难处境很常见。例如,一个现代化的物流调度平台。调度员创建一个运单、指派司机,这些操作是核心业务流程,必须在事务中完成,确保数据的完整性和业务规则的执行。这是典型的写操作,或者说是“命令(Command)”。与此同时,司机、客户、后台运营人员都需要在各自的界面(Headless UI)上实时看到运单状态的流转,从“待指派”到“运输中”再到“已签收”。这是典型的读操作,或称“查询(Query)”,但它需要极高的实时性。
方案A:单一技术栈的权衡
选择单一技术栈是最初最自然的想法。
1. Ruby on Rails 独挑大梁:
我们可以使用 Rails 处理所有业务逻辑,并通过 ActionCable 将更新推送到前端。
- 优势: 代码库统一,开发体验连贯,Rails 生态中用于构建复杂业务模型的工具(如
dry-rb系列)非常成熟,非常适合实践领域驱动设计(DDD)。 - 劣势: 真正的瓶颈在于并发模型。MRI 的全局解释器锁(GIL)意味着 Rails 在处理 I/O 密集型任务时表现尚可,但在 CPU 密集或需要管理大量持久连接(如 WebSocket)时,其并发能力远不如 BEAM 虚拟机。将成千上万的 WebSocket 连接全部压在 Rails 实例上,不仅会消耗大量内存,还可能因为一个耗时操作阻塞其他客户端的实时通信,系统的整体伸缩性会受限于命令处理和实时推送中最薄弱的一环。
2. Phoenix (Elixir) 一统天下:
或者,完全拥抱 Elixir 和 Phoenix 带来的并发优势。
- 优势: Phoenix 基于 BEAM 虚拟机,其轻量级进程模型能轻松处理数十万甚至上百万的并发连接,是构建实时系统的绝佳选择。其 Channels 功能天生就是为这类场景设计的。
- 劣势: 虽然 Ecto 是一个极其强大的数据映射和查询工具,但在实现那些包含复杂业务规则、多步骤验证和状态转换的 DDD 聚合根(Aggregate Root)时,其表达力和社区实践的成熟度,在某些场景下可能不如 Ruby 生态中经过多年打磨的模式。对于一个习惯了 Rails MVC 和 Active Record 便利性的团队来说,完全切换到函数式编程和 Ecto 来构建复杂的命令处理逻辑,学习曲线和开发成本不容忽视。
两种方案都存在根本性的妥协。我们需要的是一个既能利用 Rails 生态在 DDD 上的深厚积累,又能发挥 Phoenix 在实时通信上的极致性能的架构。
最终选择:异构CQRS架构
命令查询职责分离(CQRS)模式为我们指明了方向。我们将系统明确地划分为两部分:
- 命令端 (Command Side): 负责处理所有状态变更的请求。它必须是事务性的、强一致的,并且严格执行业务规则。
- 查询端 (Query Side): 负责处理所有数据查询和读取请求。它可以是高度优化的、非规范化的,并且最终一致的。
这个模式与我们的技术选型完美契合:
- Ruby on Rails 作为命令端: 利用其生态和开发效率,专注于实现基于 DDD 的、健壮的业务逻辑核心。
- Phoenix 作为查询端: 利用其无与伦比的并发能力,构建一个专门用于数据投影(Projection)和实时推送的读取服务。
这种架构的代价是复杂性。我们需要引入一个消息中间件(如 RabbitMQ 或 Kafka)来连接命令端和查询端,并且必须解决分布式系统带来的最终一致性和可观测性问题。这是一个深思熟虑后的取舍:用可控的架构复杂性,换取系统在一致性和实时性两个维度上的极致表现。
graph TD
subgraph "用户界面 (Headless UI)"
A[React/Vue App]
end
subgraph "命令端 (Ruby on Rails)"
B(API Controller)
C{DDD Command Bus}
D[Dispatch Aggregate]
E(Event Publisher)
end
subgraph "查询端 (Phoenix)"
G(Event Consumer)
H[Projection Builder]
I(Read Model DB)
J{Phoenix Channel}
end
subgraph "消息中间件"
F[RabbitMQ]
end
subgraph "可观测性 (Loki)"
LOKI[Loki]
end
A -- HTTP POST (Command) --> B
B -- Dispatches Command --> C
C -- Loads & Executes --> D
D -- Produces Events --> E
E -- Publishes to --> F
F -- Delivers Event --> G
G -- Processes Event --> H
H -- Updates --> I
I -- Notifies --> J
J -- WebSocket Push --> A
B -- Logs with trace_id --> LOKI
E -- Logs with trace_id --> LOKI
G -- Logs with trace_id --> LOKI
H -- Logs with trace_id --> LOKI
核心实现:代码中的架构
我们以一个简化的物流运单 Dispatch 为例,来展示这个架构的关键代码实现。
1. Rails 命令端:DDD 与事件发布
在 Rails 应用中,我们不使用传统的 fat model 模式,而是严格遵循 DDD 的原则。
目录结构:
app/
controllers/
api/v1/dispatches_controller.rb
domain/
dispatching/
dispatch.rb # Aggregate Root
dispatch_repository.rb # Repository
commands/
assign_driver.rb
events/
driver_assigned.rb
services/
command_bus.rb
event_publisher.rb
config/
initializers/
sneakers.rb # RabbitMQ publisher config
structured_logging.rb
聚合根 Dispatch:
这个对象封装了所有的业务规则,外部只能通过调用其方法来改变状态,方法执行成功后会产生领域事件。
# app/domain/dispatching/dispatch.rb
module Dispatching
class Dispatch
include AggregateRoot
attr_reader :id, :status, :driver_id, :package_id
def initialize(id)
@id = id
@status = :created
end
# command method
def assign_driver(driver_id:, package_id:)
# Business rule validation
raise 'Cannot assign driver to a non-created dispatch' unless @status == :created
raise 'Driver ID cannot be nil' if driver_id.nil?
# If validation passes, apply the event
apply Events::DriverAssigned.new(data: {
dispatch_id: @id,
driver_id: driver_id,
package_id: package_id,
assigned_at: Time.now.utc
})
end
private
# State mutation happens here, only in response to an event
on Events::DriverAssigned do |event|
@status = :assigned
@driver_id = event.data[:driver_id]
@package_id = event.data[:package_id]
end
end
end
命令处理器与事件发布:
Controller 接收到请求后,将任务委托给 CommandBus。CommandBus 负责加载聚合、执行命令、持久化聚合状态,并最终将产生的领域事件发布出去。
# app/services/command_bus.rb
class CommandBus
def self.handle(command)
# In a real app, this would dynamically find the handler
# For simplicity, we hardcode it here.
trace_id = SecureRandom.uuid
# Setup logger context for this request
Current.trace_id = trace_id
Rails.logger.info("Handling command: #{command.class.name} with payload: #{command.payload}")
# Transactional boundary
ActiveRecord::Base.transaction do
# 1. Load the aggregate from repository
# repo = Dispatching::DispatchRepository.new
# dispatch = repo.find(command.dispatch_id)
# For demo, let's assume we are creating a new one
dispatch = Dispatching::Dispatch.new(command.dispatch_id)
# 2. Execute the command on the aggregate
dispatch.assign_driver(
driver_id: command.driver_id,
package_id: command.package_id
)
# 3. Save the aggregate's new state
# repo.save(dispatch)
# 4. Publish events with trace_id in headers
dispatch.unpublished_events.each do |event|
EventPublisher.publish(event, trace_id: trace_id)
end
end
rescue => e
Rails.logger.error("Command handling failed: #{e.message}")
raise
end
end
# app/services/event_publisher.rb
class EventPublisher
def self.publish(event, trace_id:)
payload = event.to_json
routing_key = event.class.name.demodulize.underscore # e.g., driver_assigned
# Using Sneakers::Publisher for RabbitMQ
publisher = Sneakers::Publisher.new
publisher.publish(
payload,
routing_key: routing_key,
exchange: 'dispatch_events',
headers: { trace_id: trace_id } # CRITICAL for observability
)
publisher.stop
Rails.logger.info("Event published: #{routing_key} with trace_id: #{trace_id}")
end
end
这里的关键点在于,我们将 trace_id 放入了消息头中。这是实现跨服务链路追踪的基石。
2. Phoenix 查询端:事件消费与实时投影
Phoenix 应用的角色是消费事件,更新为查询优化的只读数据模型,并通过 Channel 推送给客户端。
目录结构:
lib/
my_app/
dispatching/
projection.ex # The projection logic
my_app_web/
channels/
dispatch_channel.ex # Phoenix Channel
my_app/
workers/
event_consumer.ex # RabbitMQ consumer
事件消费者:
我们使用 Elixir 的 AMQP 库来消费来自 RabbitMQ 的事件。
# lib/my_app/workers/event_consumer.ex
defmodule MyApp.Workers.EventConsumer do
use GenServer
require Logger
alias AMQP.{Connection, Channel}
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
# Standard RabbitMQ connection boilerplate
{:ok, conn} = Connection.open("amqp://guest:guest@localhost")
{:ok, chan} = Channel.open(conn)
# Ensure exchange and queue exist
AMQP.Exchange.declare(chan, "dispatch_events", :topic)
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(chan, "phoenix_projector", durable: true)
AMQP.Queue.bind(chan, queue_name, "dispatch_events", routing_key: "#")
# Start consuming
AMQP.Basic.consume(chan, queue_name, nil, no_ack: false)
{:ok, {conn, chan}}
end
# Callback for delivered messages
def handle_info({:basic_deliver, payload, meta}, state) do
# Extract trace_id from headers
headers = meta.headers |> Enum.into(%{})
trace_id = get_in(headers, ["trace_id", :longstr])
# Put trace_id into Logger's metadata for structured logging
Logger.metadata(trace_id: trace_id)
Logger.info("Received event with routing key: #{meta.routing_key}")
try do
# Delegate to the projection logic
MyApp.Dispatching.Projection.handle(meta.routing_key, payload)
# Acknowledge the message on success
AMQP.Basic.ack(elem(state, 1), meta.delivery_tag)
rescue
e ->
Logger.error("Failed to process event. Error: #{inspect(e)}")
# Reject and requeue (or move to dead-letter-queue in production)
AMQP.Basic.reject(elem(state, 1), meta.delivery_tag, requeue: true)
end
{:noreply, state}
end
end
投影与广播:Projection 模块负责将事件数据写入一个为读取优化的数据表,然后通过 Phoenix 的 Endpoint 广播更新。
# lib/my_app/dispatching/projection.ex
defmodule MyApp.Dispatching.Projection do
alias MyApp.Repo
alias MyApp.Dispatching.DispatchReadModel
require Logger
# Pattern match on routing key to handle different events
def handle("driver_assigned", payload) do
data = Jason.decode!(payload)["data"]
dispatch_id = data["dispatch_id"]
Logger.info("Projecting DriverAssigned event for dispatch_id: #{dispatch_id}")
# Use Ecto.Multi for transactional updates to the read model
multi =
Ecto.Multi.new()
|> Ecto.Multi.update_all(:update_dispatch,
from(d in DispatchReadModel, where: d.id == ^dispatch_id),
set: [
status: "assigned",
driver_id: data["driver_id"],
package_id: data["package_id"]
]
)
case Repo.transaction(multi) do
{:ok, %{update_dispatch: {[dispatch], _}}} ->
# On success, broadcast the updated read model
broadcast_update(dispatch)
{:ok, %{update_dispatch: {[], _}}} ->
# The record might not exist yet, so we insert it
# This is a common eventual consistency scenario
Logger.warn("DispatchReadModel not found for id #{dispatch_id}, creating...")
# ... insert logic ...
{:error, _reason, _failed_op, _changes} ->
Logger.error("Failed to execute projection transaction")
end
end
# Default case for unhandled events
def handle(routing_key, _payload) do
Logger.warn("Unhandled event with routing key: #{routing_key}")
end
defp broadcast_update(dispatch_read_model) do
topic = "dispatch:#{dispatch_read_model.id}"
payload = %{
event: "dispatch_updated",
data: dispatch_read_model # Send the full, updated object
}
# Use Phoenix.Endpoint to broadcast to the channel
MyAppWeb.Endpoint.broadcast!(topic, "update", payload)
Logger.info("Broadcasted update on topic: #{topic}")
end
end
Phoenix Channel:
前端通过 WebSocket 连接到这个 Channel,订阅特定运单的更新。
# lib/my_app_web/channels/dispatch_channel.ex
defmodule MyAppWeb.DispatchChannel do
use Phoenix.Channel
require Logger
def join("dispatch:" <> dispatch_id, _payload, socket) do
Logger.info("Client joined dispatch channel: #{dispatch_id}")
{:ok, assign(socket, :dispatch_id, dispatch_id)}
end
# This callback is just for joining, updates are pushed via Endpoint.broadcast
# No handle_in functions are needed for this one-way communication
end
3. 可观测性:使用Loki串联日志
到目前为止,我们已经构建了一个功能上可行的分布式系统。但当出现问题时——比如一个事件丢失,或者投影数据错误——排查起来将是噩梦。这就是 Loki 发挥作用的地方。
结构化日志配置:
首先,两边的应用都必须输出结构化的 JSON 日志,并且包含 trace_id。
Rails (config/initializers/structured_logging.rb):
# Use a JSON formatter for the logger
Rails.application.config.log_formatter = proc do |severity, datetime, progname, msg|
{
level: severity,
timestamp: datetime.iso8601,
app: 'rails-command-service',
trace_id: Current.trace_id, # From our ApplicationController/CommandBus
message: msg
}.to_json + "\n"
end
# A simple way to manage trace_id per request
class Current < ActiveSupport::CurrentAttributes
attribute :trace_id
end
Phoenix (config/config.exs):
# Use a JSON logger and include metadata
config :logger, :console,
format: "{\"level\":\"$level\", \"message\":\"$message\", \"timestamp\":\"$time\", \"app\":\"phoenix-query-service\", \"metadata\":$metadata}\n",
metadata: [:trace_id]
Promtail 配置:Promtail 是 Loki 的代理,负责收集日志并发送给 Loki。
# promtail-config.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: rails-app
static_configs:
- targets:
- localhost
labels:
job: rails-command-service
__path__: /path/to/your/rails/log/production.log
pipeline_stages:
- json:
expressions:
app: app
level: level
trace_id: trace_id
- labels:
trace_id:
- job_name: phoenix-app
static_configs:
- targets:
- localhost
labels:
job: phoenix-query-service
__path__: /path/to/your/phoenix/app/logs/output.log # Assuming you log to a file
pipeline_stages:
- json:
expressions:
app: app
level: level
trace_id: metadata.trace_id
- labels:
trace_id:
这里的关键是 pipeline_stages。我们解析 JSON 日志,并将 trace_id 提取为一个 Loki 的标签。为 trace_id 创建索引是性能的关键。
使用 LogQL 进行诊断:
现在,当一个用户报告他的运单状态没有更新时,我们不再需要分别登录两台服务器去 grep 日志。我们只需要从 Rails 的入口日志中找到该操作的 trace_id,然后在 Grafana(Loki 的前端)中执行一个简单的 LogQL 查询:
{trace_id="a1b2c3d4-e5f6-7890-1234-56789abcdef0"}
这个查询会立即返回一个按时间排序的、来自两个服务的完整日志流:
-
[rails-command-service]Handling command: AssignDriver… -
[rails-command-service]Event published: driver_assigned… -
[phoenix-query-service]Received event with routing key: driver_assigned -
[phoenix-query-service]Projecting DriverAssigned event for dispatch_id: … -
[phoenix-query-service]Broadcasted update on topic: dispatch:123…
整个业务流程的生命周期一目了然。如果日志在第2步之后就中断了,我们就知道问题出在 RabbitMQ 或网络连接。如果在第4步之后中断,问题可能出在 Phoenix 的数据库操作。这种级别的可观测性,将调试分布式系统的难度从“大海捞针”降低到了“按图索骥”。
架构的扩展性与局限性
这个架构模式的扩展性非常强。查询端可以独立于命令端进行扩展。如果未来需要一个新的数据视图,例如一个用于数据分析的运单状态变更历史表,我们只需添加一个新的事件消费者和投影逻辑,而完全不需要触碰核心的、脆弱的命令端业务代码。
然而,这个架构并非没有代价。首先是最终一致性。从命令执行成功到查询端数据更新并推送到 UI,存在一个毫秒级的延迟。UI/UX 设计必须考虑到这一点,例如通过乐观更新或加载指示器来处理这种短暂的不一致状态。对于不能接受最终一致性的场景,此架构并不适用。
其次是运维复杂性。团队现在需要维护两个独立的应用、一个消息中间件和一个中心化的日志系统。这要求团队具备更广泛的技术栈知识和更成熟的 DevOps 文化。
最后,事件契约的管理变得至关重要。命令端发布的事件格式一旦改变,就可能破坏下游所有的消费者。必须建立严格的事件版本控制和向前/向后兼容策略,以确保系统的平滑演进。未来的一个优化方向可能是引入一个共享的事件 Schema Registry 来强制管理这些契约。