项目中的日志系统,起初只是为了记录异常。随着业务变得越来越复杂,尤其是那些包含多个步骤、长周期的用户流程,比如贷款申请、保险理赔或者多阶段的注册流程,我们发现现有的日志完全无法回答一个核心问题:“某个用户的特定请求,到底卡在了哪一步?”
当客服反馈“用户A说他提交申请后页面一直没反应”,我们能做的就是拿着用户ID去grep散落在各个微服务里的海量日志。这些日志格式不一,缺乏统一的事务ID,上下文严重缺失。我们能看到某个服务报错了,但无法还原出这个错误是用户完整操作链路中的哪个环节、由哪个前置状态触发的。定位一个问题,往往需要数小时的跨团队沟通和日志排查,效率极低。
问题根源在于,我们的日志是“无状态”的,而我们的核心业务逻辑是“有状态”的。我们需要一套机制,将业务的“状态”投射到日志系统中,让日志本身能够讲述一个完整的故事。
初步构想与技术选型
我们的构想是,构建一个以“用户会话”和“业务状态”为核心的可观测性体系。具体来说,需要满足以下几点:
- 统一上下文注入:所有进入系统的请求,都必须被注入一个唯一的
correlationId,并且能够从用户的JWT中解析出userId和sessionId,这些信息必须贯穿整个请求链路。 - 业务逻辑状态化:使用有限状态机(FSM)来对复杂业务流程进行建模。每一次状态的迁移都必须是可追踪的、可记录的事件。
- 结构化状态日志:状态迁移的日志必须是结构化的,包含前一状态、当前状态、触发事件、上下文信息(
correlationId,userId等),并集中推送到日志中心。 - 可视化分析:能够在日志中心(如Kibana)轻松地检索出任意一个用户的完整操作链路,或者聚合分析出哪些状态是流程瓶颈或高发错误点。
基于这些要求,技术选型变得清晰:
- 网关与代理 (Node.js + Express): 作为所有请求的入口,它是注入
correlationId和解析JWT的最佳位置。我们选择自建一个轻量级API网关中间件,以便于深度定制。 - JWT (JSON Web Token): 行业标准,用于无状态的用户认证和信息传递。我们将利用其
payload来携带userId和sessionId(jticlaim)。 - XState: 一个功能强大的JavaScript/TypeScript状态机和状态图库。它不仅仅是一个状态机,更是一种思考和构建复杂应用逻辑的方式。它的
interpret服务可以监听所有状态转换,完美契合我们的日志记录需求。 - ELK Stack (Elasticsearch, Logstash, Kibana): 成熟、强大的集中式日志解决方案。Elasticsearch用于存储和索引,Logstash用于处理和丰富日志,Kibana用于查询和可视化。
步骤化实现:从网关到Kibana
1. 网关层:构建上下文注入与结构化日志中间件
这是整个体系的地基。我们需要一个Express中间件,它负责在请求的最开始就建立起追踪上下文。
// src/middlewares/observability.js
const { v4: uuidv4 } = require('uuid');
const jwt = require('jsonwebtoken');
const pino = require('pino');
// 在真实项目中,pino的配置会更复杂,比如设置不同的transport将日志输出到stdout和文件/Logstash
// 这里为了演示,简化为输出到控制台。Logstash会监听这个stdout。
const baseLogger = pino({
level: 'info',
formatters: {
level: (label) => {
return { level: label };
},
},
timestamp: () => `,"time":"${new Date(Date.now()).toISOString()}"`,
});
const JWT_SECRET = process.env.JWT_SECRET || 'your-super-secret-key';
function observabilityMiddleware(req, res, next) {
// 1. 提取或生成 Correlation ID
const correlationId = req.get('X-Correlation-ID') || uuidv4();
res.setHeader('X-Correlation-ID', correlationId);
const context = { correlationId };
// 2. 解析 JWT 并提取关键信息
const authHeader = req.get('Authorization');
if (authHeader && authHeader.startsWith('Bearer ')) {
const token = authHeader.substring(7);
try {
const decoded = jwt.verify(token, JWT_SECRET);
context.userId = decoded.sub; // 'sub' (subject) claim for user ID
context.sessionId = decoded.jti; // 'jti' (JWT ID) claim for session ID
} catch (err) {
// JWT无效或过期,我们依然继续,但日志中不会有用户信息
// 在生产环境中,这后面通常会跟一个认证中间件来拒绝请求
context.jwtError = err.message;
}
}
// 3. 创建一个附加了上下文的子Logger,并挂载到请求对象上
// 这样,应用逻辑中的任何地方都可以通过 `req.logger` 来记录带有完整上下文的日志
req.logger = baseLogger.child({ context });
// 挂载上下文,方便业务逻辑直接访问
req.context = context;
req.logger.info({ req: { method: req.method, url: req.url } }, 'Request received');
next();
}
module.exports = observabilityMiddleware;
这个中间件做了三件关键的事:
- 确保
correlationId的存在:它允许上游系统传入X-Correlation-ID,也能够在请求入口处生成一个,保证了链路的起点。 - 尽力而为的JWT解析:它会尝试解析JWT,即使失败了也不会中断请求(认证是另一个中间件的职责),但会把错误记录下来。这保证了即使是匿名或认证失败的请求也能被追踪。
- **注入
req.logger**:这是核心。我们使用了pino的child logger功能。所有通过req.logger打出的日志,都会自动带上context对象(包含correlationId,userId,sessionId),避免了在每个日志点手动传递这些参数的繁琐和遗漏。
2. 业务逻辑层:用XState定义和实现状态机
假设我们有一个复杂的“文档审批”流程。它包含draft(草稿), pendingReview(待审核), approved(已批准), rejected(已驳回)等状态。
// src/machines/documentMachine.js
const { createMachine, assign } = require('xstate');
// 模拟异步API调用
const submitForReviewAPI = async (documentId) => {
console.log(`Submitting document ${documentId} for review...`);
await new Promise(resolve => setTimeout(resolve, 500));
// 模拟可能失败的情况
if (documentId.includes('fail')) {
throw new Error('Submission failed due to invalid content');
}
return { reviewId: `rev-${Date.now()}` };
};
const documentMachine = createMachine({
id: 'documentApproval',
initial: 'draft',
// context用来存储状态机的内部数据
context: {
documentId: null,
reviewId: null,
rejectionReason: null,
retries: 0,
},
states: {
draft: {
on: {
// 事件:提交审核
SUBMIT: {
target: 'pendingReview',
},
},
},
pendingReview: {
// 进入该状态时执行的动作 (Entry Action)
invoke: {
id: 'submitForReview',
src: (context, event) => submitForReviewAPI(context.documentId),
onDone: {
target: 'awaitingApproval',
// `assign` action 用来更新 context
actions: assign({
reviewId: (context, event) => event.data.reviewId,
}),
},
onError: {
target: 'submissionFailed',
actions: assign({
rejectionReason: (context, event) => event.data.message,
}),
},
},
},
awaitingApproval: {
on: {
APPROVE: 'approved',
REJECT: {
target: 'rejected',
actions: assign({
rejectionReason: (context, event) => event.reason,
}),
},
},
},
submissionFailed: {
on: {
RETRY: {
target: 'pendingReview',
actions: assign({ retries: (context) => context.retries + 1 }),
// 增加守卫条件,比如最多重试3次
cond: (context) => context.retries < 3
}
}
},
approved: {
type: 'final', // 终态
},
rejected: {
type: 'final',
},
},
});
module.exports = { documentMachine };
这个状态机定义了清晰的流程、状态转换、异步调用(invoke)以及错误处理。它本身是纯粹的业务逻辑,不包含任何日志记录的代码,保持了高度的内聚性。
3. 桥接层:监听状态机并输出结构化日志
现在,我们需要将网关的logger和XState的interpreter连接起来。我们将创建一个服务来“运行”状态机,并在这个服务中注入日志记录逻辑。
// src/services/documentService.js
const { interpret } = require('xstate');
const { documentMachine } = require('../machines/documentMachine');
class DocumentService {
constructor(logger) {
// 这里的logger就是从req.logger传入的,自带上下文
this.logger = logger;
}
// 核心方法:执行一个完整的审批流程
async processDocument(documentId, initialEvent) {
return new Promise((resolve, reject) => {
// 为每个请求创建一个新的状态机实例
const machineInstance = documentMachine.withContext({
...documentMachine.context,
documentId: documentId,
});
const service = interpret(machineInstance).onTransition(state => {
// 这就是魔法发生的地方!监听每一次状态转换
// 这里的state对象包含了所有需要的信息
if (state.changed) {
this.logger.info(
{
// 将状态机的核心信息结构化
fsm: {
id: machineInstance.id,
value: state.value, // 当前状态,如 'draft', 'pendingReview'
event: state.event.type, // 触发转换的事件
context: state.context, // 状态机的内部数据
changed: state.changed,
// 记录上一个状态,便于分析转换路径
previousValue: state.history ? state.history.value : null
},
},
`FSM Transition: ${state.event.type} -> ${JSON.stringify(state.value)}`
);
}
});
service.onDone(() => {
this.logger.info({ fsm: { status: 'done' }}, 'FSM process completed');
resolve(service.state);
});
service.onError((error) => {
this.logger.error({ err: error, fsm: { status: 'error' }}, 'FSM process failed');
reject(error);
});
// 启动状态机
service.start();
// 发送初始事件来驱动状态机
if (initialEvent) {
service.send(initialEvent);
}
// 在真实应用中,状态机实例可能会被持久化,而不是在一次请求中完成
// 这里为了演示,我们模拟几个后续事件来驱动它走完流程
this.simulateWorkflow(service);
});
}
// 模拟外部事件驱动状态机
simulateWorkflow(service) {
setTimeout(() => service.send({ type: 'SUBMIT' }), 100);
// 模拟审批决策
setTimeout(() => {
const finalStateValue = service.state.value;
if (finalStateValue === 'awaitingApproval') {
service.send({ type: 'APPROVE' });
} else if (finalStateValue === 'submissionFailed') {
service.send({ type: 'RETRY' });
}
}, 1000);
}
}
module.exports = DocumentService;
onTransition回调是关键。它在每次状态机转换时被触发,我们在这里调用req.logger记录日志。因为logger实例本身已经包含了来自网关的correlationId和userId,所以我们只需要关注状态机自身的数据。生成的日志会是类似这样的JSON:
{
"level": "info",
"time": "2023-10-27T10:35:01.123Z",
"context": {
"correlationId": "abc-123-def-456",
"userId": "user-prod-789",
"sessionId": "sess-xyz-987"
},
"fsm": {
"id": "documentApproval",
"value": "pendingReview",
"event": "SUBMIT",
"context": { "documentId": "doc-001", "reviewId": null, "retries": 0 },
"changed": true,
"previousValue": "draft"
},
"msg": "FSM Transition: SUBMIT -> \"pendingReview\""
}
4. Logstash 配置:解析与处理JSON日志
当我们的Node.js应用将上述JSON日志输出到stdout后,需要Logstash来消费它们,并发送到Elasticsearch。
# /etc/logstash/conf.d/node-fsm-app.conf
input {
# 假设我们用Filebeat或直接用docker log driver收集容器的stdout
# 这里为了简单,我们用beats input
beats {
port => 5044
}
}
filter {
# 日志已经是JSON格式,我们只需要用json codec解析它
json {
source => "message"
}
# Logstash默认的时间戳可能不是我们想要的,
# 使用日志中由pino生成的时间戳作为事件时间
date {
match => [ "time", "ISO8601" ]
target => "@timestamp"
}
# 提升一些关键字段到顶层,便于在Kibana中筛选和聚合
if [context][correlationId] {
mutate {
add_field => { "correlation_id" => "%{[context][correlationId]}" }
add_field => { "user_id" => "%{[context][userId]}" }
add_field => { "session_id" => "%{[context][sessionId]}" }
}
}
# 对状态机日志进行更精细的处理
if [fsm] {
mutate {
add_field => { "fsm_id" => "%{[fsm][id]}" }
add_field => { "fsm_current_state" => "%{[fsm][value]}" }
add_field => { "fsm_event" => "%{[fsm][event]}" }
add_field => { "fsm_previous_state" => "%{[fsm][previousValue]}" }
}
}
# 移除不再需要的原始字段,保持索引清洁
mutate {
remove_field => ["message", "context", "time", "log", "agent", "ecs", "host", "input"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "fsm-traces-%{+YYYY.MM.dd}"
}
# 在调试时,也可以输出到控制台方便查看
# stdout { codec => rubydebug }
}
这个配置文件的作用是将扁平的JSON日志解析成结构化的Elasticsearch文档,并创建了易于查询的顶层字段,如correlation_id和fsm_current_state。
5. Kibana 可视化分析
至此,所有数据都已进入Elasticsearch。现在我们可以利用Kibana的强大能力来回答最初的那个问题了。
在Kibana的“Discover”页面,我们可以执行KQL查询:
追踪单个用户的完整流程:
user_id: "user-prod-789" and correlation_id: "abc-123-def-456"
然后按@timestamp升序排序,你将清晰地看到该用户这一次操作所经历的每一个状态变迁,从draft到pendingReview再到approved。查找所有卡在“待审核”状态超过10分钟的流程:
fsm_current_state: "awaitingApproval" and @timestamp < now-10m
这个查询能立即找出所有处理缓慢或卡死的流程,为主动运维提供了可能。分析流程转换的瓶颈:
我们可以创建一个数据可视化图表(Data Visualization),比如一个漏斗图(Funnel),来分析从draft状态到approved状态的转化率,以及在哪个环节(rejected或submissionFailed)流失最多。构建状态转换仪表盘:
创建一个仪表盘,包含:- 一个饼图,显示当前所有流程在各个状态的分布。
- 一个条形图,显示最常见的状态转换失败事件(如
REJECT)。 - 一个数据表,列出最近出错的流程及其
correlation_id,点击即可下钻到详细日志。
我们可以用Mermaid图来表示整个数据流:
graph TD
subgraph "Client"
A[User Action] --> B{JWT in Header};
end
subgraph "API Gateway (Node.js/Express)"
B --> C[Observability Middleware];
C -->|1. Parse JWT| D[Extract userId, sessionId];
C -->|2. Gen/Get CorrID| E[Inject correlationId];
C -->|3. Create Logger| F[req.logger with context];
end
subgraph "Business Service"
F --> G[DocumentService];
G --> H[XState Interpreter];
H -- onTransition --> I[Structured Log via req.logger];
end
subgraph "Logging Pipeline"
I --> J[STDOUT];
J --> K[Logstash];
K -->|Parse & Enrich| L[Elasticsearch];
end
subgraph "Analysis"
L --> M[Kibana];
M --> N[Dashboard & Queries];
end
局限性与未来展望
这套方案有效地解决了业务流程可观测性的问题,但它并非银弹。
首先,它强依赖于开发人员遵循规范:必须使用req.logger来记录日志,并且核心业务必须用状态机来建模。这对团队的技术纪律和代码规范提出了要求。对于那些不适合用状态机描述的简单CRUD业务,这套方案就显得有些重。
其次,当前的实现是基于日志的,虽然能做到“链路追踪”,但它和OpenTelemetry这类真正的分布式追踪系统在概念上有所不同。日志关注的是离散的事件点,而分布式追踪更关注服务间的调用关系和时间耗费(Spans)。一个可行的演进方向是,在onTransition回调中,除了记录日志,还可以利用OpenTelemetry API来创建一个新的Span,将状态机的每次转换都作为一个追踪单元,从而将业务状态与服务调用链更紧密地结合起来。
最后,对于需要持久化的长时间运行的状态机(比如一个持续数天的审批流),当前内存中的interpret服务是不够的。需要引入持久化存储(如Redis或数据库)来保存状态机的当前状态和上下文,以便在服务重启或请求中断后能够从上次的状态恢复。这将引入新的复杂性,但也是构建可靠的长时间任务系统的必经之路。