一个日益复杂的业务系统,其数据模型的演进往往会陷入一个两难境地:写操作要求严格的事务一致性、数据校验和领域规则,而读操作则需要极高的灵活性和性能,常常涉及多表聚合与复杂筛选。当单一的、高度规范化的数据模型同时服务于这两种截然不同的需求时,性能瓶颈和维护性灾难几乎是必然的结局。在真实项目中,我们经常看到一个模型因为要迁就某个复杂查询而被迫添加冗余字段,或者一个简单的读取操作因为需要跨越五六张表进行JOIN而变得奇慢无比。
这种矛盾的根源在于,命令(Commands,改变系统状态的操作)和查询(Queries,读取系统状态的操作)的关注点本质上是不同的。强行用一套模型来满足两者,就是问题的开始。命令查询职责分离(Command Query Responsibility Segregation, CQRS)模式正是为了解决这一核心矛盾而生。它的核心思想很简单:将数据修改操作的模型与数据查询操作的模型分离开。
我们将通过一个具体的场景,从零开始构建一个基于CQRS的混合数据访问层。写模型将使用经典的设计模式(如Repository, Unit of Work)来保证MySQL数据库的事务完整性;读模型则通过一个独立的、为查询优化的数据视图来提供服务,并最终暴露为一个GraphQL端点,供GraphQL Client消费。
架构构想:解耦命令与查询
在动手之前,先明确我们的架构蓝图。客户端的意图被分为两种:命令和查询。
命令路径 (Write Path):
- 一个命令对象(如
CreateProductCommand)被分发到对应的处理器(Handler)。 - Handler 负责执行业务逻辑、数据校验。
- 它通过仓储(Repository)模式与领域实体交互,并使用工作单元(Unit of Work)模式来统一管理数据库事务。所有写操作必须在MySQL事务中完成,确保ACID。
- 成功提交事务后,发布一个领域事件(如
ProductCreatedEvent),通知系统状态已发生变化。
- 一个命令对象(如
查询路径 (Read Path):
- 一个事件处理器(Event Handler)监听领域事件。
- 接收到事件后,它负责更新一个或多个为查询而优化的“读模型”(Read Model)。这个读模型可以是MySQL中的一张反规范化的宽表,也可以是Elasticsearch文档或Redis缓存。
- 一个GraphQL API服务器直接从这个读模型中读取数据。
- GraphQL客户端通过发送灵活的查询语句,按需获取数据,无需关心底层复杂的表结构。
这个流程可以用下面的图来表示:
graph TD
subgraph Client
A[GraphQL Client]
B[Command Issuer]
end
subgraph "Service Layer"
C[Command Bus]
D{GraphQL Endpoint}
end
subgraph "Write Model (MySQL)"
E[Command Handler]
F[Unit of Work]
G[Repository]
H[Domain Models]
I[Events]
end
subgraph "Read Model (MySQL Denormalized Table)"
J[Event Handler]
K[Read Model DTOs]
end
B -- CreateProductCommand --> C
C -- dispatches --> E
E -- uses --> F
F -- manages --> G
G -- operates on --> H
E -- validates --> H
F -- on commit, dispatches --> I
I -- notifies --> J
J -- updates --> K
D -- queries --> K
A -- GraphQL Query --> D
D -- response --> A
style F fill:#f9f,stroke:#333,stroke-width:2px
style G fill:#f9f,stroke:#333,stroke-width:2px
style J fill:#ccf,stroke:#333,stroke-width:2px
搭建项目环境与数据库
我们使用Node.js和TypeScript。首先,初始化项目并安装必要的依赖。
# 初始化项目
npm init -y
npm i typescript @types/node ts-node --save-dev
# 核心依赖
npm i mysql2 reflect-metadata inversify express @apollo/server graphql
inversify 用于依赖注入,这在构建分层架构时至关重要。mysql2 是MySQL驱动。@apollo/server 用于快速搭建GraphQL服务。
数据库层面,我们需要两类表。
写模型表 (Write Model Tables): 规范化的表,反映领域模型。
-- products.sql (Write Model)
CREATE TABLE `products` (
`id` VARCHAR(36) NOT NULL PRIMARY KEY,
`name` VARCHAR(255) NOT NULL,
`description` TEXT,
`price_amount` DECIMAL(10, 2) NOT NULL,
`price_currency` VARCHAR(3) NOT NULL,
`stock_quantity` INT UNSIGNED NOT NULL,
`version` INT UNSIGNED NOT NULL DEFAULT 1,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB;
读模型表 (Read Model Table): 反规范化的宽表,为查询优化。
-- product_summaries.sql (Read Model)
CREATE TABLE `product_summaries` (
`product_id` VARCHAR(36) NOT NULL PRIMARY KEY,
`product_name` VARCHAR(255) NOT NULL,
`price_display` VARCHAR(50) NOT NULL, -- e.g., "¥199.99"
`is_in_stock` BOOLEAN NOT NULL,
`updated_at` TIMESTAMP NOT NULL
) ENGINE=InnoDB;
这里的product_summaries表就是一个典型的读模型。它把价格和货币组合成了一个显示字符串,并且用一个布尔值表示库存状态,查询时无需计算和JOIN,可以直接取出用于展示。
第一部分:构建坚实的写模型
写模型的首要任务是保证数据一致性和业务规则的正确执行。
1. 工作单元(Unit of Work)模式
在真实业务中,一个操作可能涉及多个仓储(Repository)。比如创建一个订单,既要扣减产品库存,又要创建订单记录。这两个操作必须在同一个事务中完成。Unit of Work模式正是为此而生,它负责跟踪所有变更,并在最后统一提交。
// src/database/unit-of-work.ts
import { Pool, PoolConnection } from 'mysql2/promise';
// 一个常见的错误是让每个Repository自己管理连接和事务,这会导致事务无法跨Repository。
// Unit of Work 模式通过提供一个事务范围内的Connection来解决这个问题。
export interface IUnitOfWork {
getConnection(): Promise<PoolConnection>;
beginTransaction(): Promise<void>;
commit(): Promise<void>;
rollback(): Promise<void>;
}
export class UnitOfWork implements IUnitOfWork {
private connection: PoolConnection | null = null;
private isTransactionActive: boolean = false;
constructor(private readonly pool: Pool) {}
public async getConnection(): Promise<PoolConnection> {
if (!this.isTransactionActive) {
throw new Error("Transaction has not been started. Call beginTransaction() first.");
}
return this.connection!;
}
public async beginTransaction(): Promise<void> {
if (this.isTransactionActive) {
// 避免重复开启事务
return;
}
this.connection = await this.pool.getConnection();
await this.connection.beginTransaction();
this.isTransactionActive = true;
console.log("Transaction started.");
}
public async commit(): Promise<void> {
if (!this.isTransactionActive || !this.connection) {
return;
}
try {
await this.connection.commit();
console.log("Transaction committed.");
} finally {
this.connection.release();
this.isTransactionActive = false;
this.connection = null;
}
}
public async rollback(): Promise<void> {
if (!this.isTransactionActive || !this.connection) {
return;
}
try {
await this.connection.rollback();
console.warn("Transaction rolled back.");
} finally {
this.connection.release();
this.isTransactionActive = false;
this.connection = null;
}
}
}
2. 仓储(Repository)模式
仓储模式封装了数据访问逻辑,让业务代码与具体的数据库实现解耦。
// src/domain/product.ts
export class Product {
constructor(
public readonly id: string,
public name: string,
public priceAmount: number,
public stockQuantity: number,
public version: number
) {}
public decreaseStock(quantity: number): void {
if (this.stockQuantity < quantity) {
throw new Error("Insufficient stock.");
}
this.stockQuantity -= quantity;
}
}
// src/repositories/product-repository.ts
import { IUnitOfWork } from '../database/unit-of-work';
import { Product } from '../domain/product';
export interface IProductRepository {
findById(id: string): Promise<Product | null>;
save(product: Product): Promise<void>;
}
// 依赖注入IUnitOfWork,而不是直接依赖数据库连接池
export class ProductRepository implements IProductRepository {
constructor(private readonly uow: IUnitOfWork) {}
async findById(id: string): Promise<Product | null> {
const connection = await this.uow.getConnection();
const [rows] = await connection.execute<any[]>(
'SELECT * FROM products WHERE id = ? FOR UPDATE', // 使用悲观锁确保事务中的数据一致性
[id]
);
if (rows.length === 0) {
return null;
}
const row = rows[0];
return new Product(row.id, row.name, row.price_amount, row.stock_quantity, row.version);
}
async save(product: Product): Promise<void> {
const connection = await this.uow.getConnection();
// 这里的实现使用了简单的版本号进行乐观锁控制。
// 在真实项目中,如果并发冲突频繁,可能需要更复杂的策略。
const sql = `
UPDATE products
SET name = ?, price_amount = ?, stock_quantity = ?, version = version + 1
WHERE id = ? AND version = ?
`;
const [result] = await connection.execute<any>(sql, [
product.name,
product.priceAmount,
product.stockQuantity,
product.id,
product.version,
]);
if (result.affectedRows === 0) {
// 如果更新失败,说明数据版本已过时,存在并发修改。
throw new Error(`Concurrency conflict: Product ${product.id} has been modified by another transaction.`);
}
}
// 创建产品的逻辑也应在此处
async add(product: Product): Promise<void> {
const connection = await this.uow.getConnection();
const sql = `
INSERT INTO products (id, name, description, price_amount, price_currency, stock_quantity, version)
VALUES (?, ?, ?, ?, ?, ?, 1)
`;
await connection.execute(sql, [
product.id,
product.name,
'Default description', // for simplicity
product.priceAmount,
'CNY',
product.stockQuantity,
]);
}
}
3. 命令与命令处理器(Command & Handler)
现在,我们将业务逻辑封装在CommandHandler中。
// src/commands/create-product-command.ts
import { randomUUID } from 'crypto';
import { IUnitOfWork } from '../database/unit-of-work';
import { Product } from '../domain/product';
import { ProductRepository } from '../repositories/product-repository';
import { DomainEvent, EventBus } from '../events/event-bus';
export class CreateProductCommand {
constructor(
public readonly name: string,
public readonly price: number,
public readonly stock: number
) {}
}
// 事件定义
export class ProductCreatedEvent implements DomainEvent {
public readonly occurredOn: Date;
constructor(
public readonly productId: string,
public readonly name: string,
public readonly price: number,
public readonly stock: number
) {
this.occurredOn = new Date();
}
nameOf(): string { return 'ProductCreatedEvent'; }
}
export class CreateProductHandler {
// 依赖注入UoW和EventBus
constructor(
private readonly uow: IUnitOfWork,
private readonly eventBus: EventBus
) {}
public async handle(command: CreateProductCommand): Promise<string> {
await this.uow.beginTransaction();
try {
const productRepo = new ProductRepository(this.uow);
// 业务校验
if (command.price <= 0) {
throw new Error("Price must be positive.");
}
const productId = randomUUID();
const product = new Product(
productId,
command.name,
command.price,
command.stock,
0 // 新创建的对象版本为0,插入后DB中为1
);
await productRepo.add(product);
// 在事务提交 *之前* 准备好事件,但不要立即分发。
const event = new ProductCreatedEvent(
productId,
command.name,
command.price,
command.stock
);
await this.uow.commit();
// 事务成功后,分发事件。这是一个关键点。
// 如果在事务内分发,一旦事务回滚,事件却已发出,会导致数据不一致。
this.eventBus.dispatch(event);
return productId;
} catch (error) {
await this.uow.rollback();
console.error("Failed to create product:", error);
// 向上抛出或转换为业务异常
throw error;
}
}
}
第二部分:构建轻量级的读模型
写模型完成后,我们需要一种机制来同步数据到读模型。事件驱动是最佳选择。
1. 简单的内存事件总线
为了演示,我们先实现一个简单的内存事件总线。在生产环境中,这应该被替换为RabbitMQ, Kafka等消息队列。
// src/events/event-bus.ts
export interface DomainEvent {
occurredOn: Date;
nameOf(): string;
}
export type EventHandler = (event: DomainEvent) => Promise<void>;
export class EventBus {
private readonly handlers: Map<string, EventHandler[]> = new Map();
public register(eventName: string, handler: EventHandler): void {
if (!this.handlers.has(eventName)) {
this.handlers.set(eventName, []);
}
this.handlers.get(eventName)!.push(handler);
}
public dispatch(event: DomainEvent): void {
const eventName = event.nameOf();
const eventHandlers = this.handlers.get(eventName);
if (eventHandlers) {
// 异步执行,不阻塞主流程
eventHandlers.forEach(handler => {
Promise.resolve().then(() => handler(event)).catch(err => {
// 这里的错误处理至关重要。生产环境需要有重试和死信队列机制。
console.error(`Error in event handler for ${eventName}:`, err);
});
});
}
}
}
2. 事件处理器与读模型更新
当ProductCreatedEvent被分发时,一个专门的处理器会监听到,并更新product_summaries表。
// src/read-models/product-summary-updater.ts
import { Pool } from 'mysql2/promise';
import { DomainEvent } from '../events/event-bus';
import { ProductCreatedEvent } from '../commands/create-product-command';
export class ProductSummaryUpdater {
constructor(private readonly dbPool: Pool) {}
public async handleProductCreated(event: ProductCreatedEvent): Promise<void> {
console.log(`Updating read model for new product: ${event.productId}`);
const sql = `
INSERT INTO product_summaries (product_id, product_name, price_display, is_in_stock, updated_at)
VALUES (?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
product_name = VALUES(product_name),
price_display = VALUES(price_display),
is_in_stock = VALUES(is_in_stock),
updated_at = NOW();
`;
const priceDisplay = `¥${event.price.toFixed(2)}`;
const isInStock = event.stock > 0;
try {
const connection = await this.dbPool.getConnection();
await connection.execute(sql, [
event.productId,
event.name,
priceDisplay,
isInStock,
]);
connection.release();
} catch (error) {
// 如果读模型更新失败,这是一个严重问题。
// 需要记录日志,并有后台任务或手动流程来修复数据不一致。
console.error(`Failed to update product_summary for product ${event.productId}`, error);
throw error; // 抛出异常以便上层捕获
}
}
}
第三部分:GraphQL端点与客户端消费
读模型准备就绪后,最后一步就是通过GraphQL将其暴露出去。
1. GraphQL Schema 与 Resolver
// src/graphql/schema.ts
import { Pool } from 'mysql2/promise';
export const typeDefs = `#graphql
type ProductSummary {
productId: ID!
productName: String!
priceDisplay: String!
isInStock: Boolean!
}
type Query {
products: [ProductSummary]
product(id: ID!): ProductSummary
}
`;
export const resolvers = {
Query: {
products: async (_: any, __: any, { dbPool }: { dbPool: Pool }) => {
const connection = await dbPool.getConnection();
try {
const [rows] = await connection.execute('SELECT * FROM product_summaries ORDER BY updated_at DESC');
return rows;
} finally {
connection.release();
}
},
product: async (_: any, { id }: { id: string }, { dbPool }: { dbPool: Pool }) => {
const connection = await dbPool.getConnection();
try {
const [rows] = await connection.execute<any[]>('SELECT * FROM product_summaries WHERE product_id = ?', [id]);
return rows.length > 0 ? rows[0] : null;
} finally {
connection.release();
}
}
},
};
2. 组装与启动服务
现在,我们将所有部分串联起来。
// src/main.ts
import 'reflect-metadata';
import express from 'express';
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { createPool } from 'mysql2/promise';
import { typeDefs, resolvers } from './graphql/schema';
import { EventBus } from './events/event-bus';
import { ProductSummaryUpdater } from './read-models/product-summary-updater';
import { UnitOfWork } from './database/unit-of-work';
import { CreateProductCommand, CreateProductHandler } from './commands/create-product-command';
async function bootstrap() {
const app = express();
app.use(express.json());
const pool = createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'cqrs_demo',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// 1. 设置事件总线和处理器
const eventBus = new EventBus();
const productSummaryUpdater = new ProductSummaryUpdater(pool);
eventBus.register(
'ProductCreatedEvent',
(event) => productSummaryUpdater.handleProductCreated(event as CreateProductCreatedEvent)
);
// 2. 设置GraphQL Server
const server = new ApolloServer({
typeDefs,
resolvers,
});
await server.start();
app.use('/graphql', expressMiddleware(server, {
context: async () => ({ dbPool: pool }),
}));
// 3. 设置写操作的RESTful API端点 (为简单起见)
app.post('/products', async (req, res) => {
const uow = new UnitOfWork(pool);
const handler = new CreateProductHandler(uow, eventBus);
try {
const { name, price, stock } = req.body;
const command = new CreateProductCommand(name, price, stock);
const productId = await handler.handle(command);
res.status(201).json({ productId });
} catch (error: any) {
res.status(400).json({ message: error.message });
}
});
app.listen(4000, () => {
console.log(`🚀 Command API running at http://localhost:4000/products`);
console.log(`🚀 Query API running at http://localhost:4000/graphql`);
});
}
bootstrap();
3. GraphQL客户端消费
客户端现在可以通过一个非常干净的接口来查询数据,而无需知道背后复杂的写模型。
// client-example.js
// 这是一个简单的Node.js客户端示例
async function fetchProducts() {
const query = `
query GetProducts {
products {
productId
productName
priceDisplay
}
}
`;
try {
const response = await fetch('http://localhost:4000/graphql', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query }),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result = await response.json();
console.log('Fetched products:', result.data.products);
} catch (error) {
console.error('Failed to fetch products:', error);
}
}
async function createNewProduct() {
try {
const response = await fetch('http://localhost:4000/products', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ name: 'High-Performance Keyboard', price: 599.99, stock: 150 }),
});
const result = await response.json();
console.log('Created product:', result);
} catch (error) {
console.error('Failed to create product:', error);
}
}
// 运行
async function main() {
console.log('--- Creating a new product ---');
await createNewProduct();
// 等待事件处理(在真实世界中这是异步的)
await new Promise(resolve => setTimeout(resolve, 500));
console.log('\n--- Fetching product list via GraphQL ---');
await fetchProducts();
}
main();
局限性与未来迭代方向
这个实现虽然展示了CQRS的核心思想,但在生产环境中还存在一些必须正视的局限性。
首先是最终一致性。写模型和读模型之间存在延迟。对于大多数UI场景来说,这种秒级甚至毫秒级的延迟是可以接受的,但对于需要强一致性读的场景(例如,支付后立即查看订单状态),就需要特殊处理,比如让客户端在写操作成功后,轮询读模型直到数据出现。
其次是事件处理的可靠性。我们使用的内存事件总线在进程重启后会丢失所有事件,且事件处理器失败后没有重试机制。在生产系统中,必须引入像RabbitMQ、Kafka或Pulsar这样的消息中间件,它们能提供持久化、重试和死信队列功能,保证事件至少被处理一次。
最后,架构的复杂性是显而易见的。相比简单的CRUD,CQRS引入了更多的组件(命令、事件、处理器、双模型),对团队的技能要求更高,也增加了系统的运维成本。因此,只有在业务复杂到一定程度,读写矛盾非常突出时,引入CQRS才是合理的权衡。未来的迭代可以考虑将读模型同步到更专业的查询存储中,例如使用Elasticsearch来满足全文搜索需求,或者用Redis来缓存热点数据,进一步发挥读写分离的优势。