构建异构系统全链路可观测性:整合Solid.js、Spring Boot与Celery的分布式追踪实践


凌晨两点,线上告警系统发来一条消息:用户ID 1a2b3c 的数据导出任务“已提交”,但超过30分钟未生成结果。排查开始。前端日志显示,用户在Solid.js界面上点击了导出按钮,前端应用成功收到了后端的 202 Accepted 响应。检查Spring Boot后端服务的日志,也清晰地记录了接收到请求,并将任务消息成功投递到了RabbitMQ。然而,下游消费任务的Python Celery worker日志却没有任何相关记录。

消息去哪了?是网络分区导致RabbitMQ消息丢失,还是Celery worker进程僵死,或是任务执行前发生了某个未被捕获的异常?在三个独立的技术栈和日志系统中,手动关联这些信息就像在黑暗中寻找一根针。没有统一的请求ID,我们甚至无法确定后端投递的消息就是前端发起的那个。这就是典型的异构微服务系统中的“可观测性黑洞”。解决这个问题的唯一办法是建立一个贯穿始终的全链路追踪体系。

我们的目标很明确:为 Solid.js (前端) -> Spring Boot (后端API) -> Celery (异步任务处理) 这条链路上的每一次请求生成一个唯一的 traceId,并将它从浏览器一直传递到Python任务执行器。技术选型上,OpenTelemetry (OTel) 是当前业界标准,它提供了跨语言的规范和SDK,是解决这种异构系统追踪问题的不二之-选。

架构与追踪流程设计

在动手写代码之前,必须先理清追踪上下文(Trace Context)的传递路径。W3C Trace Context规范定义了标准的HTTP头 traceparent,它将是我们的核心载体。

sequenceDiagram
    participant Browser (Solid.js)
    participant API (Spring Boot)
    participant Broker (RabbitMQ)
    participant Worker (Celery)
    participant Collector (OTel)

    Browser (Solid.js)->>+API (Spring Boot): POST /api/export (Header: traceparent)
    Note right of API (Spring Boot): 1. OTel Agent/SDK 自动解析 traceparent
    API (Spring Boot)->>API (Spring Boot): 2. 创建子 Span: "process-request"
    API (Spring Boot)->>+Broker (RabbitMQ): 3. publish message (Headers: traceparent)
    Note right of Broker (RabbitMQ): 上下文注入消息头
    API (Spring Boot)-->>-Browser (Solid.js): HTTP 202 Accepted
    Broker (RabbitMQ)-->>-Worker (Celery): consume message
    Note left of Worker (Celery): 4. OTel SDK 从消息头提取 traceparent
    Worker (Celery)->>Worker (Celery): 5. 创建子 Span: "run-export-task"
    Worker (Celery)->>+Collector (OTel): 6. 导出 Task Span
    API (Spring Boot)->>+Collector (OTel): 7. 导出 API Span
    Browser (Solid.js)->>+Collector (OTel): 8. 导出 Browser Span

整个流程的关键节点在于:

  1. 前端注入: Solid.js应用在发起API请求时,必须由OTel JS SDK生成或传递 traceparent 头。
  2. 后端传递: Spring Boot应用需要配置OTel Java Agent自动解析传入的 traceparent,并在将任务消息发送到RabbitMQ时,手动将当前的追踪上下文注入到消息的headers中。这是一个常见的坑,因为跨进程通信从HTTP到AMQP,上下文不会自动传递。
  3. Worker提取: Celery worker在接收到任务消息时,必须有机制从消息头中提取出 traceparent,并基于它继续追踪链路。

部署环境使用Docker Swarm,它足够轻量,又能满足服务发现和基本的编排需求。整个可观测性栈包括应用服务和一个OTel Collector,后者负责接收、处理并导出追踪数据到后端存储(例如Jaeger)。

Spring Boot后端:追踪上下文的桥梁

后端是整个链路的中枢。这里的核心挑战是将从HTTP请求中获得的追踪上下文,无缝地传递给即将发送到RabbitMQ的消息。在真实项目中,我们不会选择手动修改每一个发送消息的方法,而是利用AOP或者Spring提供的扩展点。

1. 项目依赖

pom.xml中,除了spring-boot-starter-webspring-boot-starter-amqp,我们不需要显式添加任何OTel SDK依赖。我们将使用OTel Java Agent,它通过字节码增强的方式实现自动埋点,对代码的侵入性最低。

<!-- pom.xml -->
<dependencies>
    <!-- Spring Boot Web and RabbitMQ -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- Lombok for cleaner code -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- JSON processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

2. 实现追踪上下文注入

一个常见的错误是认为OTel Agent会自动处理所有出站调用。对于HTTP客户端和JDBC,它确实做得很好。但对于消息队列,特别是像RabbitMQ这样的客户端库,自动注入支持可能不完善或需要特定配置。最稳妥的方式是手动控制。

我们将使用RabbitTemplateMessagePostProcessor来实现这一点。它允许我们在消息发送前对其进行修改,是注入headers的完美时机。

// src/main/java/com/example/tracing/config/RabbitMqConfig.java
package com.example.tracing.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapSetter;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.HashMap;

@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {

    private final RabbitTemplate rabbitTemplate;
    private final OpenTelemetry openTelemetry; // The agent provides a global instance

    // This setter tells OTel how to inject context into a map-like structure.
    // We will use message headers which are essentially a map.
    private final TextMapSetter<HashMap<String, Object>> setter =
            (carrier, key, value) -> carrier.put(key, value);

    @PostConstruct
    public void setupRabbitTemplate() {
        // Add a post processor that will be applied to every message sent.
        rabbitTemplate.addBeforePublishPostProcessors(createTracingPostProcessor());
    }

    private MessagePostProcessor createTracingPostProcessor() {
        return message -> {
            // A mutable map to hold our new headers
            HashMap<String, Object> headers = new HashMap<>(message.getMessageProperties().getHeaders());

            // The core logic: inject the current OpenTelemetry context into the headers map.
            // This captures the active span's context (traceId, spanId) and injects it
            // as 'traceparent' and 'tracestate' headers.
            openTelemetry.getPropagators()
                         .getTextMapPropagator()
                         .inject(io.opentelemetry.context.Context.current(), headers, setter);

            // Copy the new headers back to the message properties.
            headers.forEach(message.getMessageProperties()::setHeader);

            // Log for debugging purposes. In production, this should be at DEBUG level.
            System.out.println("Injecting trace context into RabbitMQ message: " + headers);

            return message;
        };
    }
}
  • 代码核心: 我们通过@PostConstructRabbitTemplate添加了一个MessagePostProcessor。在这个处理器中,我们从全局OpenTelemetry实例获取TextMapPropagator,并调用inject方法。inject方法会将当前线程上下文中的traceIdspanId等信息,按照W3C Trace Context规范,序列化成字符串,并使用我们提供的setter将其写入一个HashMap。最后,我们将这个HashMap中的键值对设置为消息的headers。

3. API控制器

控制器代码本身非常简单,它接收请求,然后通过RabbitTemplate发送消息。由于OTel Agent的存在,进入controller方法的HTTP请求会自动创建一个span。我们上面配置的MessagePostProcessor会确保这个span的上下文被传递出去。

// src/main/java/com/example/tracing/controller/TaskController.java
package com.example.tracing.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import java.util.Map;

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class TaskController {

    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper;

    @PostMapping("/export")
    @ResponseStatus(HttpStatus.ACCEPTED)
    @SneakyThrows
    public void submitExportTask(@RequestBody Map<String, Object> payload) {
        // The business logic is simple: serialize the payload and send it.
        // The tracing logic is handled transparently by the agent and our post-processor.
        String messageBody = objectMapper.writeValueAsString(payload);
        
        // This call will trigger the MessagePostProcessor we configured.
        rabbitTemplate.convertAndSend("tasks.exchange", "tasks.routing.key", messageBody);

        System.out.println("Task submitted to RabbitMQ: " + messageBody);
    }
}

Celery Worker:消费端的上下文重建

Python端的挑战与Java端正好相反:我们需要在任务执行前,从消息头中提取追踪上下文,并用它来启动一个新的span,使其成为上游span的子节点。

1. 依赖与配置

requirements.txt需要包含celery以及一系列OTel库。

# requirements.txt
celery[librabbitmq]==5.3.6
pika==1.3.2
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-exporter-otlp-proto-http==1.21.0
opentelemetry-instrumentation-celery==0.42b0

Celery的OTel集成库opentelemetry-instrumentation-celery为我们做了大部分繁重的工作,但正确配置它至关重要。

# worker/celery_app.py
import os
from celery import Celery
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

# --- OpenTelemetry Setup ---
# A common mistake is to place this setup inside a function that might be called
# multiple times. It should be configured once when the worker process starts.
def setup_opentelemetry():
    """Configures OpenTelemetry for the Celery worker."""
    provider = TracerProvider()
    
    # The OTLP exporter sends data to the OTel Collector.
    # The endpoint must be configurable for different environments.
    otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://otel-collector:4318")
    processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces"))
    provider.add_span_processor(processor)
    
    # Sets the global default tracer provider
    trace.set_tracer_provider(provider)
    
    print(f"OpenTelemetry configured to export to {otlp_endpoint}")

setup_opentelemetry()

# Instrument Celery. This MUST be done *before* the Celery app is created.
# The instrumentor patches Celery's internals to handle context propagation.
CeleryInstrumentor().instrument()

# --- Celery App Definition ---
broker_url = os.getenv('CELERY_BROKER_URL', 'amqp://guest:guest@rabbitmq:5672//')
celery_app = Celery('tasks', broker=broker_url, backend='rpc://')
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

# Load tasks from the tasks module
celery_app.autodiscover_tasks(['worker.tasks'])
  • 关键点: CeleryInstrumentor().instrument()必须在Celery('tasks', ...)实例化之前调用。它通过猴子补丁(monkey-patching)的方式,重写了Celery的信号处理器和任务执行逻辑,自动处理从消息头提取traceparent的逻辑。如果顺序反了,追踪将不会生效。

2. 任务定义

有了自动埋点,我们的任务代码可以完全专注于业务逻辑,非常干净。

# worker/tasks.py
import time
import random
from worker.celery_app import celery_app
from opentelemetry import trace

# Get a tracer for this module. This is the standard OTel practice.
tracer = trace.get_tracer(__name__)

@celery_app.task(name='tasks.export_data')
def export_data(payload):
    """
    Simulates a long-running data export task.
    """
    # The CeleryInstrumentor has already created a 'process' span for the task execution.
    # We can create child spans for specific sub-operations within the task.
    with tracer.start_as_current_span("data_processing") as span:
        try:
            user_id = payload.get('userId', 'unknown')
            export_format = payload.get('format', 'csv')
            
            span.set_attribute("export.user_id", user_id)
            span.set_attribute("export.format", export_format)

            print(f"Starting export for user {user_id} in {export_format} format...")
            
            # Simulate some work
            time.sleep(random.randint(5, 10))
            
            # Simulate a potential failure
            if random.random() < 0.2:
                raise ValueError("Failed to connect to data source")

            span.set_attribute("export.status", "success")
            print(f"Export successful for user {user_id}")
            return {"status": "success", "userId": user_id}

        except Exception as e:
            # Record the exception on the span for better error analysis in Jaeger/Zipkin.
            span.record_exception(e)
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.set_attribute("export.status", "failure")
            print(f"Export failed for user {payload.get('userId')}: {e}")
            # Re-raise or handle as per business requirements
            raise

Solid.js前端:追踪的起点

前端是用户交互的源头,也是整条追踪链路的起点。我们需要配置OTel JS SDK来自动捕获用户操作(如点击)和网络请求。

1. 依赖安装

npm install @opentelemetry/api \
    @opentelemetry/sdk-trace-web \
    @opentelemetry/context-zone \
    @opentelemetry/instrumentation-fetch \
    @opentelemetry/exporter-trace-otlp-http \
    @opentelemetry/propagator-w3c

2. OTel初始化

在应用的入口文件(例如 src/index.tsx)中,我们需要尽早地初始化Tracer。

// src/tracing.ts
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { W3CTraceContextPropagator } from '@opentelemetry/propagator-w3c';
import { WebTracerProvider, BatchSpanProcessor } from '@opentelemetry/sdk-trace-web';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';

const provider = new WebTracerProvider();

// Configure the exporter to send traces to the OTel Collector
const exporter = new OTLPTraceExporter({
  url: '/v1/traces' // Use a relative path to proxy through the local server in dev
  // In production, this would be 'https://your-collector-endpoint/v1/traces'
});

provider.addSpanProcessor(new BatchSpanProcessor(exporter));

// Set up context management and propagation
provider.register({
  contextManager: new ZoneContextManager(),
  propagator: new W3CTraceContextPropagator(),
});

// Register instrumentations. The FetchInstrumentation is key here.
// It will automatically create spans for `fetch` calls and inject the
// 'traceparent' header.
registerInstrumentations({
  instrumentations: [
    new FetchInstrumentation({
      // We can add logic to ignore certain requests, e.g., tracking beacons
      ignoreUrls: [/localhost:4318/],
      // This is crucial for cross-origin requests to include tracing headers
      propagateTraceHeaderCorsUrls: [/.*/] 
    }),
  ],
});

console.log("OpenTelemetry Web Tracer initialized.");

3. Solid.js组件中的应用

有了自动埋点,我们的组件代码几乎不需要改变。FetchInstrumentation会自动拦截fetch调用。

// src/App.tsx
import { createSignal, Component } from 'solid-js';
import { api } from '@opentelemetry/api';

// It's good practice to get a tracer instance for your component or service
const tracer = api.trace.getTracer('solid-app-tracer');

const App: Component = () => {
  const [status, setStatus] = createSignal('Idle');

  const handleExport = async () => {
    // Manually create a span to trace the user interaction (the click itself)
    const span = tracer.startSpan('ui.handleExportClick');
    
    // Use `api.context.with` to make this span the active one for the enclosed code
    api.context.with(api.trace.setSpan(api.context.active(), span), async () => {
      try {
        setStatus('Submitting...');
        const userId = `user-${Math.floor(Math.random() * 1000)}`;
        span.setAttribute("app.user.id", userId);

        // This fetch call will be automatically instrumented.
        // A new child span will be created, and 'traceparent' header will be added.
        const response = await fetch('/api/export', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ userId, format: 'csv', timestamp: new Date().toISOString() }),
        });

        if (response.status === 202) {
          setStatus(`Task accepted for user ${userId}.`);
          span.setStatus({ code: api.SpanStatusCode.OK });
        } else {
          setStatus(`Error: ${response.statusText}`);
          span.setStatus({ code: api.SpanStatusCode.ERROR, message: response.statusText });
        }
      } catch (error) {
        setStatus(`Request failed: ${error}`);
        span.recordException(error as Error);
        span.setStatus({ code: api.SpanStatusCode.ERROR, message: (error as Error).message });
      } finally {
        span.end();
        console.log("UI Span ended.");
      }
    });
  };

  return (
    <div>
      <h1>Data Exporter</h1>
      <button onClick={handleExport}>Export Data</button>
      <p>Status: {status()}</p>
    </div>
  );
};

export default App;
  • 手动与自动结合: 我们手动创建了一个ui.handleExportClick span来包裹整个点击事件的逻辑,这提供了更丰富的业务上下文。而内部的fetch调用则由FetchInstrumentation自动处理,它会自动将ui.handleExportClick作为父span,形成完美的调用链。

Docker Swarm部署

最后,使用docker-compose.yml将所有部分串联起来。这个文件定义了5个服务:

  1. frontend: Nginx服务,用于提供Solid.js的静态文件。
  2. backend: Spring Boot应用。
  3. worker: Celery worker。
  4. rabbitmq: 消息代理。
  5. otel-collector: 接收来自前端和后端的追踪数据。
  6. jaeger: 用于存储和可视化追踪数据。
# docker-compose.yml
version: '3.8'

services:
  frontend:
    image: nginx:alpine
    volumes:
      - ./frontend/dist:/usr/share/nginx/html
      - ./nginx.conf:/etc/nginx/conf.d/default.conf
    ports:
      - "8080:80"
    networks:
      - app-net

  backend:
    build: ./backend
    environment:
      - SPRING_RABBITMQ_HOST=rabbitmq
      # Agent configuration
      - JAVA_TOOL_OPTIONS=-javaagent:/app/opentelemetry-javaagent.jar
      - OTEL_SERVICE_NAME=spring-boot-backend
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
      - OTEL_TRACES_SAMPLER=always_on # Use with caution in production
    depends_on:
      - rabbitmq
      - otel-collector
    networks:
      - app-net

  worker:
    build: ./worker
    environment:
      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
      - OTEL_SERVICE_NAME=celery-worker
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
      - OTEL_TRACES_SAMPLER=always_on
    depends_on:
      - rabbitmq
      - otel-collector
    networks:
      - app-net

  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"
    networks:
      - app-net

  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    volumes:
      - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
    ports:
      - "4318:4318" # OTLP HTTP receiver
    depends_on:
      - jaeger
    networks:
      - app-net

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686" # Jaeger UI
    networks:
      - app-net

networks:
  app-net:
    driver: overlay
  • otel-collector-config.yaml是OTel Collector的配置文件,定义了接收器(receivers)、处理器(processors)和导出器(exporters)。一个最小化的配置会接收OTLP协议的数据,然后将其导出到Jaeger。

局限性与未来展望

这套方案成功地解决了最初的“可观测性黑洞”问题,为我们的异构系统提供了端到端的可见性。然而,它并非银弹,在生产环境中还有一些需要权衡和优化的点。

首先,always_on采样策略对于任何有一定流量的生产系统来说都是不可持续的,它会产生大量的追踪数据和性能开销。下一步需要引入更智能的采样策略,例如基于概率的采样,或者更高级的基于尾部的采样(tail-based sampling),后者允许我们在整个追踪完成后再决定是否保留它,能更好地捕获错误和长尾请求。

其次,当前的上下文传递机制虽然有效,但在更复杂的Celery工作流中(如chainschords)可能会遇到挑战。需要确保在任务之间传递状态时,追踪上下文也能被正确地接力下去,这可能需要更深度的Celery信号定制。

最后,追踪只是可观测性的三驾马车之一。一个真正健壮的系统还需要将这些Trace与结构化日志(包含traceId)以及关键业务指标(Metrics)关联起来。未来的工作方向是将traceId注入到所有日志中,并通过Prometheus等工具收集与任务处理相关的指标,最终在一个统一的平台(如Grafana)上实现日志、指标和追踪的下钻和关联分析。


  目录