通过自定义网关与ELK实现基于JWT会话的XState状态机全链路追踪


项目中的日志系统,起初只是为了记录异常。随着业务变得越来越复杂,尤其是那些包含多个步骤、长周期的用户流程,比如贷款申请、保险理赔或者多阶段的注册流程,我们发现现有的日志完全无法回答一个核心问题:“某个用户的特定请求,到底卡在了哪一步?”

当客服反馈“用户A说他提交申请后页面一直没反应”,我们能做的就是拿着用户ID去grep散落在各个微服务里的海量日志。这些日志格式不一,缺乏统一的事务ID,上下文严重缺失。我们能看到某个服务报错了,但无法还原出这个错误是用户完整操作链路中的哪个环节、由哪个前置状态触发的。定位一个问题,往往需要数小时的跨团队沟通和日志排查,效率极低。

问题根源在于,我们的日志是“无状态”的,而我们的核心业务逻辑是“有状态”的。我们需要一套机制,将业务的“状态”投射到日志系统中,让日志本身能够讲述一个完整的故事。

初步构想与技术选型

我们的构想是,构建一个以“用户会话”和“业务状态”为核心的可观测性体系。具体来说,需要满足以下几点:

  1. 统一上下文注入:所有进入系统的请求,都必须被注入一个唯一的correlationId,并且能够从用户的JWT中解析出userIdsessionId,这些信息必须贯穿整个请求链路。
  2. 业务逻辑状态化:使用有限状态机(FSM)来对复杂业务流程进行建模。每一次状态的迁移都必须是可追踪的、可记录的事件。
  3. 结构化状态日志:状态迁移的日志必须是结构化的,包含前一状态、当前状态、触发事件、上下文信息(correlationId, userId等),并集中推送到日志中心。
  4. 可视化分析:能够在日志中心(如Kibana)轻松地检索出任意一个用户的完整操作链路,或者聚合分析出哪些状态是流程瓶颈或高发错误点。

基于这些要求,技术选型变得清晰:

  • 网关与代理 (Node.js + Express): 作为所有请求的入口,它是注入correlationId和解析JWT的最佳位置。我们选择自建一个轻量级API网关中间件,以便于深度定制。
  • JWT (JSON Web Token): 行业标准,用于无状态的用户认证和信息传递。我们将利用其payload来携带userIdsessionId (jti claim)。
  • XState: 一个功能强大的JavaScript/TypeScript状态机和状态图库。它不仅仅是一个状态机,更是一种思考和构建复杂应用逻辑的方式。它的interpret服务可以监听所有状态转换,完美契合我们的日志记录需求。
  • ELK Stack (Elasticsearch, Logstash, Kibana): 成熟、强大的集中式日志解决方案。Elasticsearch用于存储和索引,Logstash用于处理和丰富日志,Kibana用于查询和可视化。

步骤化实现:从网关到Kibana

1. 网关层:构建上下文注入与结构化日志中间件

这是整个体系的地基。我们需要一个Express中间件,它负责在请求的最开始就建立起追踪上下文。

// src/middlewares/observability.js

const { v4: uuidv4 } = require('uuid');
const jwt = require('jsonwebtoken');
const pino = require('pino');

// 在真实项目中,pino的配置会更复杂,比如设置不同的transport将日志输出到stdout和文件/Logstash
// 这里为了演示,简化为输出到控制台。Logstash会监听这个stdout。
const baseLogger = pino({
  level: 'info',
  formatters: {
    level: (label) => {
      return { level: label };
    },
  },
  timestamp: () => `,"time":"${new Date(Date.now()).toISOString()}"`,
});

const JWT_SECRET = process.env.JWT_SECRET || 'your-super-secret-key';

function observabilityMiddleware(req, res, next) {
  // 1. 提取或生成 Correlation ID
  const correlationId = req.get('X-Correlation-ID') || uuidv4();
  res.setHeader('X-Correlation-ID', correlationId);

  const context = { correlationId };

  // 2. 解析 JWT 并提取关键信息
  const authHeader = req.get('Authorization');
  if (authHeader && authHeader.startsWith('Bearer ')) {
    const token = authHeader.substring(7);
    try {
      const decoded = jwt.verify(token, JWT_SECRET);
      context.userId = decoded.sub; // 'sub' (subject) claim for user ID
      context.sessionId = decoded.jti; // 'jti' (JWT ID) claim for session ID
    } catch (err) {
      // JWT无效或过期,我们依然继续,但日志中不会有用户信息
      // 在生产环境中,这后面通常会跟一个认证中间件来拒绝请求
      context.jwtError = err.message;
    }
  }

  // 3. 创建一个附加了上下文的子Logger,并挂载到请求对象上
  // 这样,应用逻辑中的任何地方都可以通过 `req.logger` 来记录带有完整上下文的日志
  req.logger = baseLogger.child({ context });
  
  // 挂载上下文,方便业务逻辑直接访问
  req.context = context;

  req.logger.info({ req: { method: req.method, url: req.url } }, 'Request received');

  next();
}

module.exports = observabilityMiddleware;

这个中间件做了三件关键的事:

  • 确保correlationId的存在:它允许上游系统传入X-Correlation-ID,也能够在请求入口处生成一个,保证了链路的起点。
  • 尽力而为的JWT解析:它会尝试解析JWT,即使失败了也不会中断请求(认证是另一个中间件的职责),但会把错误记录下来。这保证了即使是匿名或认证失败的请求也能被追踪。
  • **注入req.logger**:这是核心。我们使用了pinochild logger功能。所有通过req.logger打出的日志,都会自动带上context对象(包含correlationId, userId, sessionId),避免了在每个日志点手动传递这些参数的繁琐和遗漏。

2. 业务逻辑层:用XState定义和实现状态机

假设我们有一个复杂的“文档审批”流程。它包含draft(草稿), pendingReview(待审核), approved(已批准), rejected(已驳回)等状态。

// src/machines/documentMachine.js

const { createMachine, assign } = require('xstate');

// 模拟异步API调用
const submitForReviewAPI = async (documentId) => {
  console.log(`Submitting document ${documentId} for review...`);
  await new Promise(resolve => setTimeout(resolve, 500));
  // 模拟可能失败的情况
  if (documentId.includes('fail')) {
    throw new Error('Submission failed due to invalid content');
  }
  return { reviewId: `rev-${Date.now()}` };
};

const documentMachine = createMachine({
  id: 'documentApproval',
  initial: 'draft',
  // context用来存储状态机的内部数据
  context: {
    documentId: null,
    reviewId: null,
    rejectionReason: null,
    retries: 0,
  },
  states: {
    draft: {
      on: {
        // 事件:提交审核
        SUBMIT: {
          target: 'pendingReview',
        },
      },
    },
    pendingReview: {
      // 进入该状态时执行的动作 (Entry Action)
      invoke: {
        id: 'submitForReview',
        src: (context, event) => submitForReviewAPI(context.documentId),
        onDone: {
          target: 'awaitingApproval',
          // `assign` action 用来更新 context
          actions: assign({
            reviewId: (context, event) => event.data.reviewId,
          }),
        },
        onError: {
          target: 'submissionFailed',
          actions: assign({
            rejectionReason: (context, event) => event.data.message,
          }),
        },
      },
    },
    awaitingApproval: {
      on: {
        APPROVE: 'approved',
        REJECT: {
          target: 'rejected',
          actions: assign({
            rejectionReason: (context, event) => event.reason,
          }),
        },
      },
    },
    submissionFailed: {
      on: {
        RETRY: {
            target: 'pendingReview',
            actions: assign({ retries: (context) => context.retries + 1 }),
            // 增加守卫条件,比如最多重试3次
            cond: (context) => context.retries < 3
        }
      }
    },
    approved: {
      type: 'final', // 终态
    },
    rejected: {
      type: 'final',
    },
  },
});

module.exports = { documentMachine };

这个状态机定义了清晰的流程、状态转换、异步调用(invoke)以及错误处理。它本身是纯粹的业务逻辑,不包含任何日志记录的代码,保持了高度的内聚性。

3. 桥接层:监听状态机并输出结构化日志

现在,我们需要将网关的logger和XState的interpreter连接起来。我们将创建一个服务来“运行”状态机,并在这个服务中注入日志记录逻辑。

// src/services/documentService.js

const { interpret } = require('xstate');
const { documentMachine } = require('../machines/documentMachine');

class DocumentService {
  constructor(logger) {
    // 这里的logger就是从req.logger传入的,自带上下文
    this.logger = logger;
  }

  // 核心方法:执行一个完整的审批流程
  async processDocument(documentId, initialEvent) {
    return new Promise((resolve, reject) => {
      // 为每个请求创建一个新的状态机实例
      const machineInstance = documentMachine.withContext({
        ...documentMachine.context,
        documentId: documentId,
      });

      const service = interpret(machineInstance).onTransition(state => {
        // 这就是魔法发生的地方!监听每一次状态转换
        // 这里的state对象包含了所有需要的信息
        if (state.changed) {
          this.logger.info(
            {
              // 将状态机的核心信息结构化
              fsm: {
                id: machineInstance.id,
                value: state.value, // 当前状态,如 'draft', 'pendingReview'
                event: state.event.type, // 触发转换的事件
                context: state.context, // 状态机的内部数据
                changed: state.changed,
                // 记录上一个状态,便于分析转换路径
                previousValue: state.history ? state.history.value : null
              },
            },
            `FSM Transition: ${state.event.type} -> ${JSON.stringify(state.value)}`
          );
        }
      });
      
      service.onDone(() => {
        this.logger.info({ fsm: { status: 'done' }}, 'FSM process completed');
        resolve(service.state);
      });

      service.onError((error) => {
        this.logger.error({ err: error, fsm: { status: 'error' }}, 'FSM process failed');
        reject(error);
      });

      // 启动状态机
      service.start();
      
      // 发送初始事件来驱动状态机
      if (initialEvent) {
        service.send(initialEvent);
      }
      
      // 在真实应用中,状态机实例可能会被持久化,而不是在一次请求中完成
      // 这里为了演示,我们模拟几个后续事件来驱动它走完流程
      this.simulateWorkflow(service);
    });
  }

  // 模拟外部事件驱动状态机
  simulateWorkflow(service) {
      setTimeout(() => service.send({ type: 'SUBMIT' }), 100);
      
      // 模拟审批决策
      setTimeout(() => {
          const finalStateValue = service.state.value;
          if (finalStateValue === 'awaitingApproval') {
              service.send({ type: 'APPROVE' });
          } else if (finalStateValue === 'submissionFailed') {
              service.send({ type: 'RETRY' });
          }
      }, 1000);
  }
}

module.exports = DocumentService;

onTransition回调是关键。它在每次状态机转换时被触发,我们在这里调用req.logger记录日志。因为logger实例本身已经包含了来自网关的correlationIduserId,所以我们只需要关注状态机自身的数据。生成的日志会是类似这样的JSON:

{
  "level": "info",
  "time": "2023-10-27T10:35:01.123Z",
  "context": {
    "correlationId": "abc-123-def-456",
    "userId": "user-prod-789",
    "sessionId": "sess-xyz-987"
  },
  "fsm": {
    "id": "documentApproval",
    "value": "pendingReview",
    "event": "SUBMIT",
    "context": { "documentId": "doc-001", "reviewId": null, "retries": 0 },
    "changed": true,
    "previousValue": "draft"
  },
  "msg": "FSM Transition: SUBMIT -> \"pendingReview\""
}

4. Logstash 配置:解析与处理JSON日志

当我们的Node.js应用将上述JSON日志输出到stdout后,需要Logstash来消费它们,并发送到Elasticsearch。

# /etc/logstash/conf.d/node-fsm-app.conf

input {
  # 假设我们用Filebeat或直接用docker log driver收集容器的stdout
  # 这里为了简单,我们用beats input
  beats {
    port => 5044
  }
}

filter {
  # 日志已经是JSON格式,我们只需要用json codec解析它
  json {
    source => "message"
  }

  # Logstash默认的时间戳可能不是我们想要的,
  # 使用日志中由pino生成的时间戳作为事件时间
  date {
    match => [ "time", "ISO8601" ]
    target => "@timestamp"
  }
  
  # 提升一些关键字段到顶层,便于在Kibana中筛选和聚合
  if [context][correlationId] {
    mutate {
      add_field => { "correlation_id" => "%{[context][correlationId]}" }
      add_field => { "user_id" => "%{[context][userId]}" }
      add_field => { "session_id" => "%{[context][sessionId]}" }
    }
  }

  # 对状态机日志进行更精细的处理
  if [fsm] {
      mutate {
        add_field => { "fsm_id" => "%{[fsm][id]}" }
        add_field => { "fsm_current_state" => "%{[fsm][value]}" }
        add_field => { "fsm_event" => "%{[fsm][event]}" }
        add_field => { "fsm_previous_state" => "%{[fsm][previousValue]}" }
      }
  }

  # 移除不再需要的原始字段,保持索引清洁
  mutate {
    remove_field => ["message", "context", "time", "log", "agent", "ecs", "host", "input"]
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "fsm-traces-%{+YYYY.MM.dd}"
  }
  # 在调试时,也可以输出到控制台方便查看
  # stdout { codec => rubydebug }
}

这个配置文件的作用是将扁平的JSON日志解析成结构化的Elasticsearch文档,并创建了易于查询的顶层字段,如correlation_idfsm_current_state

5. Kibana 可视化分析

至此,所有数据都已进入Elasticsearch。现在我们可以利用Kibana的强大能力来回答最初的那个问题了。

在Kibana的“Discover”页面,我们可以执行KQL查询:

  • 追踪单个用户的完整流程
    user_id: "user-prod-789" and correlation_id: "abc-123-def-456"
    然后按@timestamp升序排序,你将清晰地看到该用户这一次操作所经历的每一个状态变迁,从draftpendingReview再到approved

  • 查找所有卡在“待审核”状态超过10分钟的流程
    fsm_current_state: "awaitingApproval" and @timestamp < now-10m
    这个查询能立即找出所有处理缓慢或卡死的流程,为主动运维提供了可能。

  • 分析流程转换的瓶颈
    我们可以创建一个数据可视化图表(Data Visualization),比如一个漏斗图(Funnel),来分析从draft状态到approved状态的转化率,以及在哪个环节(rejectedsubmissionFailed)流失最多。

  • 构建状态转换仪表盘
    创建一个仪表盘,包含:

    1. 一个饼图,显示当前所有流程在各个状态的分布。
    2. 一个条形图,显示最常见的状态转换失败事件(如REJECT)。
    3. 一个数据表,列出最近出错的流程及其correlation_id,点击即可下钻到详细日志。

我们可以用Mermaid图来表示整个数据流:

graph TD
    subgraph "Client"
        A[User Action] --> B{JWT in Header};
    end

    subgraph "API Gateway (Node.js/Express)"
        B --> C[Observability Middleware];
        C -->|1. Parse JWT| D[Extract userId, sessionId];
        C -->|2. Gen/Get CorrID| E[Inject correlationId];
        C -->|3. Create Logger| F[req.logger with context];
    end

    subgraph "Business Service"
        F --> G[DocumentService];
        G --> H[XState Interpreter];
        H -- onTransition --> I[Structured Log via req.logger];
    end
    
    subgraph "Logging Pipeline"
        I --> J[STDOUT];
        J --> K[Logstash];
        K -->|Parse & Enrich| L[Elasticsearch];
    end

    subgraph "Analysis"
        L --> M[Kibana];
        M --> N[Dashboard & Queries];
    end

局限性与未来展望

这套方案有效地解决了业务流程可观测性的问题,但它并非银弹。

首先,它强依赖于开发人员遵循规范:必须使用req.logger来记录日志,并且核心业务必须用状态机来建模。这对团队的技术纪律和代码规范提出了要求。对于那些不适合用状态机描述的简单CRUD业务,这套方案就显得有些重。

其次,当前的实现是基于日志的,虽然能做到“链路追踪”,但它和OpenTelemetry这类真正的分布式追踪系统在概念上有所不同。日志关注的是离散的事件点,而分布式追踪更关注服务间的调用关系和时间耗费(Spans)。一个可行的演进方向是,在onTransition回调中,除了记录日志,还可以利用OpenTelemetry API来创建一个新的Span,将状态机的每次转换都作为一个追踪单元,从而将业务状态与服务调用链更紧密地结合起来。

最后,对于需要持久化的长时间运行的状态机(比如一个持续数天的审批流),当前内存中的interpret服务是不够的。需要引入持久化存储(如Redis或数据库)来保存状态机的当前状态和上下文,以便在服务重启或请求中断后能够从上次的状态恢复。这将引入新的复杂性,但也是构建可靠的长时间任务系统的必经之路。


  目录