构建基于DDD的CQRS架构:融合Ruby on Rails命令端与Phoenix实时投影


一个系统同时要求两件看似矛盾的事情:一方面,核心业务逻辑必须具备事务级别的强一致性,业务规则复杂且不容出错;另一方面,成千上万的用户需要以毫秒级的延迟接收状态更新,对并发和实时性要求极高。试图用单一技术栈完美解决这两个问题,通常会导致架构上的妥协。要么为了实时性牺牲业务模型的严谨性,要么为了模型的一致性而牺牲系统的响应能力。

在真实项目中,这种两难处境很常见。例如,一个现代化的物流调度平台。调度员创建一个运单、指派司机,这些操作是核心业务流程,必须在事务中完成,确保数据的完整性和业务规则的执行。这是典型的写操作,或者说是“命令(Command)”。与此同时,司机、客户、后台运营人员都需要在各自的界面(Headless UI)上实时看到运单状态的流转,从“待指派”到“运输中”再到“已签收”。这是典型的读操作,或称“查询(Query)”,但它需要极高的实时性。

方案A:单一技术栈的权衡

选择单一技术栈是最初最自然的想法。

1. Ruby on Rails 独挑大梁:
我们可以使用 Rails 处理所有业务逻辑,并通过 ActionCable 将更新推送到前端。

  • 优势: 代码库统一,开发体验连贯,Rails 生态中用于构建复杂业务模型的工具(如 dry-rb 系列)非常成熟,非常适合实践领域驱动设计(DDD)。
  • 劣势: 真正的瓶颈在于并发模型。MRI 的全局解释器锁(GIL)意味着 Rails 在处理 I/O 密集型任务时表现尚可,但在 CPU 密集或需要管理大量持久连接(如 WebSocket)时,其并发能力远不如 BEAM 虚拟机。将成千上万的 WebSocket 连接全部压在 Rails 实例上,不仅会消耗大量内存,还可能因为一个耗时操作阻塞其他客户端的实时通信,系统的整体伸缩性会受限于命令处理和实时推送中最薄弱的一环。

2. Phoenix (Elixir) 一统天下:
或者,完全拥抱 Elixir 和 Phoenix 带来的并发优势。

  • 优势: Phoenix 基于 BEAM 虚拟机,其轻量级进程模型能轻松处理数十万甚至上百万的并发连接,是构建实时系统的绝佳选择。其 Channels 功能天生就是为这类场景设计的。
  • 劣势: 虽然 Ecto 是一个极其强大的数据映射和查询工具,但在实现那些包含复杂业务规则、多步骤验证和状态转换的 DDD 聚合根(Aggregate Root)时,其表达力和社区实践的成熟度,在某些场景下可能不如 Ruby 生态中经过多年打磨的模式。对于一个习惯了 Rails MVC 和 Active Record 便利性的团队来说,完全切换到函数式编程和 Ecto 来构建复杂的命令处理逻辑,学习曲线和开发成本不容忽视。

两种方案都存在根本性的妥协。我们需要的是一个既能利用 Rails 生态在 DDD 上的深厚积累,又能发挥 Phoenix 在实时通信上的极致性能的架构。

最终选择:异构CQRS架构

命令查询职责分离(CQRS)模式为我们指明了方向。我们将系统明确地划分为两部分:

  • 命令端 (Command Side): 负责处理所有状态变更的请求。它必须是事务性的、强一致的,并且严格执行业务规则。
  • 查询端 (Query Side): 负责处理所有数据查询和读取请求。它可以是高度优化的、非规范化的,并且最终一致的。

这个模式与我们的技术选型完美契合:

  • Ruby on Rails 作为命令端: 利用其生态和开发效率,专注于实现基于 DDD 的、健壮的业务逻辑核心。
  • Phoenix 作为查询端: 利用其无与伦比的并发能力,构建一个专门用于数据投影(Projection)和实时推送的读取服务。

这种架构的代价是复杂性。我们需要引入一个消息中间件(如 RabbitMQ 或 Kafka)来连接命令端和查询端,并且必须解决分布式系统带来的最终一致性和可观测性问题。这是一个深思熟虑后的取舍:用可控的架构复杂性,换取系统在一致性和实时性两个维度上的极致表现。

graph TD
    subgraph "用户界面 (Headless UI)"
        A[React/Vue App]
    end

    subgraph "命令端 (Ruby on Rails)"
        B(API Controller)
        C{DDD Command Bus}
        D[Dispatch Aggregate]
        E(Event Publisher)
    end

    subgraph "查询端 (Phoenix)"
        G(Event Consumer)
        H[Projection Builder]
        I(Read Model DB)
        J{Phoenix Channel}
    end

    subgraph "消息中间件"
        F[RabbitMQ]
    end

    subgraph "可观测性 (Loki)"
        LOKI[Loki]
    end

    A -- HTTP POST (Command) --> B
    B -- Dispatches Command --> C
    C -- Loads & Executes --> D
    D -- Produces Events --> E
    E -- Publishes to --> F
    
    F -- Delivers Event --> G
    G -- Processes Event --> H
    H -- Updates --> I
    I -- Notifies --> J
    J -- WebSocket Push --> A

    B -- Logs with trace_id --> LOKI
    E -- Logs with trace_id --> LOKI
    G -- Logs with trace_id --> LOKI
    H -- Logs with trace_id --> LOKI

核心实现:代码中的架构

我们以一个简化的物流运单 Dispatch 为例,来展示这个架构的关键代码实现。

1. Rails 命令端:DDD 与事件发布

在 Rails 应用中,我们不使用传统的 fat model 模式,而是严格遵循 DDD 的原则。

目录结构:

app/
  controllers/
    api/v1/dispatches_controller.rb
  domain/
    dispatching/
      dispatch.rb             # Aggregate Root
      dispatch_repository.rb  # Repository
      commands/
        assign_driver.rb
      events/
        driver_assigned.rb
  services/
    command_bus.rb
    event_publisher.rb
config/
  initializers/
    sneakers.rb             # RabbitMQ publisher config
    structured_logging.rb

聚合根 Dispatch:
这个对象封装了所有的业务规则,外部只能通过调用其方法来改变状态,方法执行成功后会产生领域事件。

# app/domain/dispatching/dispatch.rb

module Dispatching
  class Dispatch
    include AggregateRoot

    attr_reader :id, :status, :driver_id, :package_id

    def initialize(id)
      @id = id
      @status = :created
    end

    # command method
    def assign_driver(driver_id:, package_id:)
      # Business rule validation
      raise 'Cannot assign driver to a non-created dispatch' unless @status == :created
      raise 'Driver ID cannot be nil' if driver_id.nil?

      # If validation passes, apply the event
      apply Events::DriverAssigned.new(data: {
        dispatch_id: @id,
        driver_id: driver_id,
        package_id: package_id,
        assigned_at: Time.now.utc
      })
    end

    private

    # State mutation happens here, only in response to an event
    on Events::DriverAssigned do |event|
      @status = :assigned
      @driver_id = event.data[:driver_id]
      @package_id = event.data[:package_id]
    end
  end
end

命令处理器与事件发布:
Controller 接收到请求后,将任务委托给 CommandBusCommandBus 负责加载聚合、执行命令、持久化聚合状态,并最终将产生的领域事件发布出去。

# app/services/command_bus.rb

class CommandBus
  def self.handle(command)
    # In a real app, this would dynamically find the handler
    # For simplicity, we hardcode it here.
    trace_id = SecureRandom.uuid

    # Setup logger context for this request
    Current.trace_id = trace_id
    Rails.logger.info("Handling command: #{command.class.name} with payload: #{command.payload}")

    # Transactional boundary
    ActiveRecord::Base.transaction do
      # 1. Load the aggregate from repository
      # repo = Dispatching::DispatchRepository.new
      # dispatch = repo.find(command.dispatch_id)
      # For demo, let's assume we are creating a new one
      dispatch = Dispatching::Dispatch.new(command.dispatch_id)

      # 2. Execute the command on the aggregate
      dispatch.assign_driver(
        driver_id: command.driver_id,
        package_id: command.package_id
      )

      # 3. Save the aggregate's new state
      # repo.save(dispatch)
      
      # 4. Publish events with trace_id in headers
      dispatch.unpublished_events.each do |event|
        EventPublisher.publish(event, trace_id: trace_id)
      end
    end
  rescue => e
    Rails.logger.error("Command handling failed: #{e.message}")
    raise
  end
end

# app/services/event_publisher.rb
class EventPublisher
  def self.publish(event, trace_id:)
    payload = event.to_json
    routing_key = event.class.name.demodulize.underscore # e.g., driver_assigned

    # Using Sneakers::Publisher for RabbitMQ
    publisher = Sneakers::Publisher.new
    publisher.publish(
      payload,
      routing_key: routing_key,
      exchange: 'dispatch_events',
      headers: { trace_id: trace_id } # CRITICAL for observability
    )
    publisher.stop

    Rails.logger.info("Event published: #{routing_key} with trace_id: #{trace_id}")
  end
end

这里的关键点在于,我们将 trace_id 放入了消息头中。这是实现跨服务链路追踪的基石。

2. Phoenix 查询端:事件消费与实时投影

Phoenix 应用的角色是消费事件,更新为查询优化的只读数据模型,并通过 Channel 推送给客户端。

目录结构:

lib/
  my_app/
    dispatching/
      projection.ex          # The projection logic
  my_app_web/
    channels/
      dispatch_channel.ex    # Phoenix Channel
  my_app/
    workers/
      event_consumer.ex      # RabbitMQ consumer

事件消费者:
我们使用 Elixir 的 AMQP 库来消费来自 RabbitMQ 的事件。

# lib/my_app/workers/event_consumer.ex

defmodule MyApp.Workers.EventConsumer do
  use GenServer
  require Logger
  alias AMQP.{Connection, Channel}

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    # Standard RabbitMQ connection boilerplate
    {:ok, conn} = Connection.open("amqp://guest:guest@localhost")
    {:ok, chan} = Channel.open(conn)

    # Ensure exchange and queue exist
    AMQP.Exchange.declare(chan, "dispatch_events", :topic)
    {:ok, %{queue: queue_name}} = AMQP.Queue.declare(chan, "phoenix_projector", durable: true)
    AMQP.Queue.bind(chan, queue_name, "dispatch_events", routing_key: "#")

    # Start consuming
    AMQP.Basic.consume(chan, queue_name, nil, no_ack: false)
    {:ok, {conn, chan}}
  end

  # Callback for delivered messages
  def handle_info({:basic_deliver, payload, meta}, state) do
    # Extract trace_id from headers
    headers = meta.headers |> Enum.into(%{})
    trace_id = get_in(headers, ["trace_id", :longstr])

    # Put trace_id into Logger's metadata for structured logging
    Logger.metadata(trace_id: trace_id)
    Logger.info("Received event with routing key: #{meta.routing_key}")
    
    try do
      # Delegate to the projection logic
      MyApp.Dispatching.Projection.handle(meta.routing_key, payload)
      # Acknowledge the message on success
      AMQP.Basic.ack(elem(state, 1), meta.delivery_tag)
    rescue
      e ->
        Logger.error("Failed to process event. Error: #{inspect(e)}")
        # Reject and requeue (or move to dead-letter-queue in production)
        AMQP.Basic.reject(elem(state, 1), meta.delivery_tag, requeue: true)
    end

    {:noreply, state}
  end
end

投影与广播:
Projection 模块负责将事件数据写入一个为读取优化的数据表,然后通过 Phoenix 的 Endpoint 广播更新。

# lib/my_app/dispatching/projection.ex

defmodule MyApp.Dispatching.Projection do
  alias MyApp.Repo
  alias MyApp.Dispatching.DispatchReadModel
  require Logger

  # Pattern match on routing key to handle different events
  def handle("driver_assigned", payload) do
    data = Jason.decode!(payload)["data"]
    dispatch_id = data["dispatch_id"]
    
    Logger.info("Projecting DriverAssigned event for dispatch_id: #{dispatch_id}")

    # Use Ecto.Multi for transactional updates to the read model
    multi =
      Ecto.Multi.new()
      |> Ecto.Multi.update_all(:update_dispatch,
        from(d in DispatchReadModel, where: d.id == ^dispatch_id),
        set: [
          status: "assigned",
          driver_id: data["driver_id"],
          package_id: data["package_id"]
        ]
      )

    case Repo.transaction(multi) do
      {:ok, %{update_dispatch: {[dispatch], _}}} ->
        # On success, broadcast the updated read model
        broadcast_update(dispatch)
      {:ok, %{update_dispatch: {[], _}}} ->
        # The record might not exist yet, so we insert it
        # This is a common eventual consistency scenario
        Logger.warn("DispatchReadModel not found for id #{dispatch_id}, creating...")
        # ... insert logic ...
      {:error, _reason, _failed_op, _changes} ->
        Logger.error("Failed to execute projection transaction")
    end
  end

  # Default case for unhandled events
  def handle(routing_key, _payload) do
    Logger.warn("Unhandled event with routing key: #{routing_key}")
  end

  defp broadcast_update(dispatch_read_model) do
    topic = "dispatch:#{dispatch_read_model.id}"
    payload = %{
      event: "dispatch_updated",
      data: dispatch_read_model # Send the full, updated object
    }
    
    # Use Phoenix.Endpoint to broadcast to the channel
    MyAppWeb.Endpoint.broadcast!(topic, "update", payload)
    Logger.info("Broadcasted update on topic: #{topic}")
  end
end

Phoenix Channel:
前端通过 WebSocket 连接到这个 Channel,订阅特定运单的更新。

# lib/my_app_web/channels/dispatch_channel.ex

defmodule MyAppWeb.DispatchChannel do
  use Phoenix.Channel
  require Logger

  def join("dispatch:" <> dispatch_id, _payload, socket) do
    Logger.info("Client joined dispatch channel: #{dispatch_id}")
    {:ok, assign(socket, :dispatch_id, dispatch_id)}
  end
  
  # This callback is just for joining, updates are pushed via Endpoint.broadcast
  # No handle_in functions are needed for this one-way communication
end

3. 可观测性:使用Loki串联日志

到目前为止,我们已经构建了一个功能上可行的分布式系统。但当出现问题时——比如一个事件丢失,或者投影数据错误——排查起来将是噩梦。这就是 Loki 发挥作用的地方。

结构化日志配置:
首先,两边的应用都必须输出结构化的 JSON 日志,并且包含 trace_id

Rails (config/initializers/structured_logging.rb):

# Use a JSON formatter for the logger
Rails.application.config.log_formatter = proc do |severity, datetime, progname, msg|
  {
    level: severity,
    timestamp: datetime.iso8601,
    app: 'rails-command-service',
    trace_id: Current.trace_id, # From our ApplicationController/CommandBus
    message: msg
  }.to_json + "\n"
end

# A simple way to manage trace_id per request
class Current < ActiveSupport::CurrentAttributes
  attribute :trace_id
end

Phoenix (config/config.exs):

# Use a JSON logger and include metadata
config :logger, :console,
  format: "{\"level\":\"$level\", \"message\":\"$message\", \"timestamp\":\"$time\", \"app\":\"phoenix-query-service\", \"metadata\":$metadata}\n",
  metadata: [:trace_id]

Promtail 配置:
Promtail 是 Loki 的代理,负责收集日志并发送给 Loki。

# promtail-config.yml
server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
- job_name: rails-app
  static_configs:
  - targets:
      - localhost
    labels:
      job: rails-command-service
      __path__: /path/to/your/rails/log/production.log
  pipeline_stages:
  - json:
      expressions:
        app: app
        level: level
        trace_id: trace_id
  - labels:
      trace_id:

- job_name: phoenix-app
  static_configs:
  - targets:
      - localhost
    labels:
      job: phoenix-query-service
      __path__: /path/to/your/phoenix/app/logs/output.log # Assuming you log to a file
  pipeline_stages:
  - json:
      expressions:
        app: app
        level: level
        trace_id: metadata.trace_id
  - labels:
      trace_id:

这里的关键是 pipeline_stages。我们解析 JSON 日志,并将 trace_id 提取为一个 Loki 的标签。为 trace_id 创建索引是性能的关键。

使用 LogQL 进行诊断:
现在,当一个用户报告他的运单状态没有更新时,我们不再需要分别登录两台服务器去 grep 日志。我们只需要从 Rails 的入口日志中找到该操作的 trace_id,然后在 Grafana(Loki 的前端)中执行一个简单的 LogQL 查询:

{trace_id="a1b2c3d4-e5f6-7890-1234-56789abcdef0"}

这个查询会立即返回一个按时间排序的、来自两个服务的完整日志流:

  1. [rails-command-service] Handling command: AssignDriver…
  2. [rails-command-service] Event published: driver_assigned…
  3. [phoenix-query-service] Received event with routing key: driver_assigned
  4. [phoenix-query-service] Projecting DriverAssigned event for dispatch_id: …
  5. [phoenix-query-service] Broadcasted update on topic: dispatch:123…

整个业务流程的生命周期一目了然。如果日志在第2步之后就中断了,我们就知道问题出在 RabbitMQ 或网络连接。如果在第4步之后中断,问题可能出在 Phoenix 的数据库操作。这种级别的可观测性,将调试分布式系统的难度从“大海捞针”降低到了“按图索骥”。

架构的扩展性与局限性

这个架构模式的扩展性非常强。查询端可以独立于命令端进行扩展。如果未来需要一个新的数据视图,例如一个用于数据分析的运单状态变更历史表,我们只需添加一个新的事件消费者和投影逻辑,而完全不需要触碰核心的、脆弱的命令端业务代码。

然而,这个架构并非没有代价。首先是最终一致性。从命令执行成功到查询端数据更新并推送到 UI,存在一个毫秒级的延迟。UI/UX 设计必须考虑到这一点,例如通过乐观更新或加载指示器来处理这种短暂的不一致状态。对于不能接受最终一致性的场景,此架构并不适用。

其次是运维复杂性。团队现在需要维护两个独立的应用、一个消息中间件和一个中心化的日志系统。这要求团队具备更广泛的技术栈知识和更成熟的 DevOps 文化。

最后,事件契约的管理变得至关重要。命令端发布的事件格式一旦改变,就可能破坏下游所有的消费者。必须建立严格的事件版本控制和向前/向后兼容策略,以确保系统的平滑演进。未来的一个优化方向可能是引入一个共享的事件 Schema Registry 来强制管理这些契约。


  目录