凌晨两点,线上告警系统发来一条消息:用户ID 1a2b3c 的数据导出任务“已提交”,但超过30分钟未生成结果。排查开始。前端日志显示,用户在Solid.js界面上点击了导出按钮,前端应用成功收到了后端的 202 Accepted 响应。检查Spring Boot后端服务的日志,也清晰地记录了接收到请求,并将任务消息成功投递到了RabbitMQ。然而,下游消费任务的Python Celery worker日志却没有任何相关记录。
消息去哪了?是网络分区导致RabbitMQ消息丢失,还是Celery worker进程僵死,或是任务执行前发生了某个未被捕获的异常?在三个独立的技术栈和日志系统中,手动关联这些信息就像在黑暗中寻找一根针。没有统一的请求ID,我们甚至无法确定后端投递的消息就是前端发起的那个。这就是典型的异构微服务系统中的“可观测性黑洞”。解决这个问题的唯一办法是建立一个贯穿始终的全链路追踪体系。
我们的目标很明确:为 Solid.js (前端) -> Spring Boot (后端API) -> Celery (异步任务处理) 这条链路上的每一次请求生成一个唯一的 traceId,并将它从浏览器一直传递到Python任务执行器。技术选型上,OpenTelemetry (OTel) 是当前业界标准,它提供了跨语言的规范和SDK,是解决这种异构系统追踪问题的不二之-选。
架构与追踪流程设计
在动手写代码之前,必须先理清追踪上下文(Trace Context)的传递路径。W3C Trace Context规范定义了标准的HTTP头 traceparent,它将是我们的核心载体。
sequenceDiagram
participant Browser (Solid.js)
participant API (Spring Boot)
participant Broker (RabbitMQ)
participant Worker (Celery)
participant Collector (OTel)
Browser (Solid.js)->>+API (Spring Boot): POST /api/export (Header: traceparent)
Note right of API (Spring Boot): 1. OTel Agent/SDK 自动解析 traceparent
API (Spring Boot)->>API (Spring Boot): 2. 创建子 Span: "process-request"
API (Spring Boot)->>+Broker (RabbitMQ): 3. publish message (Headers: traceparent)
Note right of Broker (RabbitMQ): 上下文注入消息头
API (Spring Boot)-->>-Browser (Solid.js): HTTP 202 Accepted
Broker (RabbitMQ)-->>-Worker (Celery): consume message
Note left of Worker (Celery): 4. OTel SDK 从消息头提取 traceparent
Worker (Celery)->>Worker (Celery): 5. 创建子 Span: "run-export-task"
Worker (Celery)->>+Collector (OTel): 6. 导出 Task Span
API (Spring Boot)->>+Collector (OTel): 7. 导出 API Span
Browser (Solid.js)->>+Collector (OTel): 8. 导出 Browser Span
整个流程的关键节点在于:
- 前端注入: Solid.js应用在发起API请求时,必须由OTel JS SDK生成或传递
traceparent头。 - 后端传递: Spring Boot应用需要配置OTel Java Agent自动解析传入的
traceparent,并在将任务消息发送到RabbitMQ时,手动将当前的追踪上下文注入到消息的headers中。这是一个常见的坑,因为跨进程通信从HTTP到AMQP,上下文不会自动传递。 - Worker提取: Celery worker在接收到任务消息时,必须有机制从消息头中提取出
traceparent,并基于它继续追踪链路。
部署环境使用Docker Swarm,它足够轻量,又能满足服务发现和基本的编排需求。整个可观测性栈包括应用服务和一个OTel Collector,后者负责接收、处理并导出追踪数据到后端存储(例如Jaeger)。
Spring Boot后端:追踪上下文的桥梁
后端是整个链路的中枢。这里的核心挑战是将从HTTP请求中获得的追踪上下文,无缝地传递给即将发送到RabbitMQ的消息。在真实项目中,我们不会选择手动修改每一个发送消息的方法,而是利用AOP或者Spring提供的扩展点。
1. 项目依赖
在pom.xml中,除了spring-boot-starter-web和spring-boot-starter-amqp,我们不需要显式添加任何OTel SDK依赖。我们将使用OTel Java Agent,它通过字节码增强的方式实现自动埋点,对代码的侵入性最低。
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web and RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Lombok for cleaner code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
2. 实现追踪上下文注入
一个常见的错误是认为OTel Agent会自动处理所有出站调用。对于HTTP客户端和JDBC,它确实做得很好。但对于消息队列,特别是像RabbitMQ这样的客户端库,自动注入支持可能不完善或需要特定配置。最稳妥的方式是手动控制。
我们将使用RabbitTemplate的MessagePostProcessor来实现这一点。它允许我们在消息发送前对其进行修改,是注入headers的完美时机。
// src/main/java/com/example/tracing/config/RabbitMqConfig.java
package com.example.tracing.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapSetter;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.HashMap;
@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {
private final RabbitTemplate rabbitTemplate;
private final OpenTelemetry openTelemetry; // The agent provides a global instance
// This setter tells OTel how to inject context into a map-like structure.
// We will use message headers which are essentially a map.
private final TextMapSetter<HashMap<String, Object>> setter =
(carrier, key, value) -> carrier.put(key, value);
@PostConstruct
public void setupRabbitTemplate() {
// Add a post processor that will be applied to every message sent.
rabbitTemplate.addBeforePublishPostProcessors(createTracingPostProcessor());
}
private MessagePostProcessor createTracingPostProcessor() {
return message -> {
// A mutable map to hold our new headers
HashMap<String, Object> headers = new HashMap<>(message.getMessageProperties().getHeaders());
// The core logic: inject the current OpenTelemetry context into the headers map.
// This captures the active span's context (traceId, spanId) and injects it
// as 'traceparent' and 'tracestate' headers.
openTelemetry.getPropagators()
.getTextMapPropagator()
.inject(io.opentelemetry.context.Context.current(), headers, setter);
// Copy the new headers back to the message properties.
headers.forEach(message.getMessageProperties()::setHeader);
// Log for debugging purposes. In production, this should be at DEBUG level.
System.out.println("Injecting trace context into RabbitMQ message: " + headers);
return message;
};
}
}
- 代码核心: 我们通过
@PostConstruct为RabbitTemplate添加了一个MessagePostProcessor。在这个处理器中,我们从全局OpenTelemetry实例获取TextMapPropagator,并调用inject方法。inject方法会将当前线程上下文中的traceId和spanId等信息,按照W3C Trace Context规范,序列化成字符串,并使用我们提供的setter将其写入一个HashMap。最后,我们将这个HashMap中的键值对设置为消息的headers。
3. API控制器
控制器代码本身非常简单,它接收请求,然后通过RabbitTemplate发送消息。由于OTel Agent的存在,进入controller方法的HTTP请求会自动创建一个span。我们上面配置的MessagePostProcessor会确保这个span的上下文被传递出去。
// src/main/java/com/example/tracing/controller/TaskController.java
package com.example.tracing.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class TaskController {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
@PostMapping("/export")
@ResponseStatus(HttpStatus.ACCEPTED)
@SneakyThrows
public void submitExportTask(@RequestBody Map<String, Object> payload) {
// The business logic is simple: serialize the payload and send it.
// The tracing logic is handled transparently by the agent and our post-processor.
String messageBody = objectMapper.writeValueAsString(payload);
// This call will trigger the MessagePostProcessor we configured.
rabbitTemplate.convertAndSend("tasks.exchange", "tasks.routing.key", messageBody);
System.out.println("Task submitted to RabbitMQ: " + messageBody);
}
}
Celery Worker:消费端的上下文重建
Python端的挑战与Java端正好相反:我们需要在任务执行前,从消息头中提取追踪上下文,并用它来启动一个新的span,使其成为上游span的子节点。
1. 依赖与配置
requirements.txt需要包含celery以及一系列OTel库。
# requirements.txt
celery[librabbitmq]==5.3.6
pika==1.3.2
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-exporter-otlp-proto-http==1.21.0
opentelemetry-instrumentation-celery==0.42b0
Celery的OTel集成库opentelemetry-instrumentation-celery为我们做了大部分繁重的工作,但正确配置它至关重要。
# worker/celery_app.py
import os
from celery import Celery
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
# --- OpenTelemetry Setup ---
# A common mistake is to place this setup inside a function that might be called
# multiple times. It should be configured once when the worker process starts.
def setup_opentelemetry():
"""Configures OpenTelemetry for the Celery worker."""
provider = TracerProvider()
# The OTLP exporter sends data to the OTel Collector.
# The endpoint must be configurable for different environments.
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://otel-collector:4318")
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces"))
provider.add_span_processor(processor)
# Sets the global default tracer provider
trace.set_tracer_provider(provider)
print(f"OpenTelemetry configured to export to {otlp_endpoint}")
setup_opentelemetry()
# Instrument Celery. This MUST be done *before* the Celery app is created.
# The instrumentor patches Celery's internals to handle context propagation.
CeleryInstrumentor().instrument()
# --- Celery App Definition ---
broker_url = os.getenv('CELERY_BROKER_URL', 'amqp://guest:guest@rabbitmq:5672//')
celery_app = Celery('tasks', broker=broker_url, backend='rpc://')
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# Load tasks from the tasks module
celery_app.autodiscover_tasks(['worker.tasks'])
- 关键点:
CeleryInstrumentor().instrument()必须在Celery('tasks', ...)实例化之前调用。它通过猴子补丁(monkey-patching)的方式,重写了Celery的信号处理器和任务执行逻辑,自动处理从消息头提取traceparent的逻辑。如果顺序反了,追踪将不会生效。
2. 任务定义
有了自动埋点,我们的任务代码可以完全专注于业务逻辑,非常干净。
# worker/tasks.py
import time
import random
from worker.celery_app import celery_app
from opentelemetry import trace
# Get a tracer for this module. This is the standard OTel practice.
tracer = trace.get_tracer(__name__)
@celery_app.task(name='tasks.export_data')
def export_data(payload):
"""
Simulates a long-running data export task.
"""
# The CeleryInstrumentor has already created a 'process' span for the task execution.
# We can create child spans for specific sub-operations within the task.
with tracer.start_as_current_span("data_processing") as span:
try:
user_id = payload.get('userId', 'unknown')
export_format = payload.get('format', 'csv')
span.set_attribute("export.user_id", user_id)
span.set_attribute("export.format", export_format)
print(f"Starting export for user {user_id} in {export_format} format...")
# Simulate some work
time.sleep(random.randint(5, 10))
# Simulate a potential failure
if random.random() < 0.2:
raise ValueError("Failed to connect to data source")
span.set_attribute("export.status", "success")
print(f"Export successful for user {user_id}")
return {"status": "success", "userId": user_id}
except Exception as e:
# Record the exception on the span for better error analysis in Jaeger/Zipkin.
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.set_attribute("export.status", "failure")
print(f"Export failed for user {payload.get('userId')}: {e}")
# Re-raise or handle as per business requirements
raise
Solid.js前端:追踪的起点
前端是用户交互的源头,也是整条追踪链路的起点。我们需要配置OTel JS SDK来自动捕获用户操作(如点击)和网络请求。
1. 依赖安装
npm install @opentelemetry/api \
@opentelemetry/sdk-trace-web \
@opentelemetry/context-zone \
@opentelemetry/instrumentation-fetch \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/propagator-w3c
2. OTel初始化
在应用的入口文件(例如 src/index.tsx)中,我们需要尽早地初始化Tracer。
// src/tracing.ts
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { W3CTraceContextPropagator } from '@opentelemetry/propagator-w3c';
import { WebTracerProvider, BatchSpanProcessor } from '@opentelemetry/sdk-trace-web';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';
const provider = new WebTracerProvider();
// Configure the exporter to send traces to the OTel Collector
const exporter = new OTLPTraceExporter({
url: '/v1/traces' // Use a relative path to proxy through the local server in dev
// In production, this would be 'https://your-collector-endpoint/v1/traces'
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
// Set up context management and propagation
provider.register({
contextManager: new ZoneContextManager(),
propagator: new W3CTraceContextPropagator(),
});
// Register instrumentations. The FetchInstrumentation is key here.
// It will automatically create spans for `fetch` calls and inject the
// 'traceparent' header.
registerInstrumentations({
instrumentations: [
new FetchInstrumentation({
// We can add logic to ignore certain requests, e.g., tracking beacons
ignoreUrls: [/localhost:4318/],
// This is crucial for cross-origin requests to include tracing headers
propagateTraceHeaderCorsUrls: [/.*/]
}),
],
});
console.log("OpenTelemetry Web Tracer initialized.");
3. Solid.js组件中的应用
有了自动埋点,我们的组件代码几乎不需要改变。FetchInstrumentation会自动拦截fetch调用。
// src/App.tsx
import { createSignal, Component } from 'solid-js';
import { api } from '@opentelemetry/api';
// It's good practice to get a tracer instance for your component or service
const tracer = api.trace.getTracer('solid-app-tracer');
const App: Component = () => {
const [status, setStatus] = createSignal('Idle');
const handleExport = async () => {
// Manually create a span to trace the user interaction (the click itself)
const span = tracer.startSpan('ui.handleExportClick');
// Use `api.context.with` to make this span the active one for the enclosed code
api.context.with(api.trace.setSpan(api.context.active(), span), async () => {
try {
setStatus('Submitting...');
const userId = `user-${Math.floor(Math.random() * 1000)}`;
span.setAttribute("app.user.id", userId);
// This fetch call will be automatically instrumented.
// A new child span will be created, and 'traceparent' header will be added.
const response = await fetch('/api/export', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userId, format: 'csv', timestamp: new Date().toISOString() }),
});
if (response.status === 202) {
setStatus(`Task accepted for user ${userId}.`);
span.setStatus({ code: api.SpanStatusCode.OK });
} else {
setStatus(`Error: ${response.statusText}`);
span.setStatus({ code: api.SpanStatusCode.ERROR, message: response.statusText });
}
} catch (error) {
setStatus(`Request failed: ${error}`);
span.recordException(error as Error);
span.setStatus({ code: api.SpanStatusCode.ERROR, message: (error as Error).message });
} finally {
span.end();
console.log("UI Span ended.");
}
});
};
return (
<div>
<h1>Data Exporter</h1>
<button onClick={handleExport}>Export Data</button>
<p>Status: {status()}</p>
</div>
);
};
export default App;
- 手动与自动结合: 我们手动创建了一个
ui.handleExportClickspan来包裹整个点击事件的逻辑,这提供了更丰富的业务上下文。而内部的fetch调用则由FetchInstrumentation自动处理,它会自动将ui.handleExportClick作为父span,形成完美的调用链。
Docker Swarm部署
最后,使用docker-compose.yml将所有部分串联起来。这个文件定义了5个服务:
- frontend: Nginx服务,用于提供Solid.js的静态文件。
- backend: Spring Boot应用。
- worker: Celery worker。
- rabbitmq: 消息代理。
- otel-collector: 接收来自前端和后端的追踪数据。
- jaeger: 用于存储和可视化追踪数据。
# docker-compose.yml
version: '3.8'
services:
frontend:
image: nginx:alpine
volumes:
- ./frontend/dist:/usr/share/nginx/html
- ./nginx.conf:/etc/nginx/conf.d/default.conf
ports:
- "8080:80"
networks:
- app-net
backend:
build: ./backend
environment:
- SPRING_RABBITMQ_HOST=rabbitmq
# Agent configuration
- JAVA_TOOL_OPTIONS=-javaagent:/app/opentelemetry-javaagent.jar
- OTEL_SERVICE_NAME=spring-boot-backend
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
- OTEL_TRACES_SAMPLER=always_on # Use with caution in production
depends_on:
- rabbitmq
- otel-collector
networks:
- app-net
worker:
build: ./worker
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- OTEL_SERVICE_NAME=celery-worker
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
- OTEL_TRACES_SAMPLER=always_on
depends_on:
- rabbitmq
- otel-collector
networks:
- app-net
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
networks:
- app-net
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
ports:
- "4318:4318" # OTLP HTTP receiver
depends_on:
- jaeger
networks:
- app-net
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
networks:
- app-net
networks:
app-net:
driver: overlay
-
otel-collector-config.yaml是OTel Collector的配置文件,定义了接收器(receivers)、处理器(processors)和导出器(exporters)。一个最小化的配置会接收OTLP协议的数据,然后将其导出到Jaeger。
局限性与未来展望
这套方案成功地解决了最初的“可观测性黑洞”问题,为我们的异构系统提供了端到端的可见性。然而,它并非银弹,在生产环境中还有一些需要权衡和优化的点。
首先,always_on采样策略对于任何有一定流量的生产系统来说都是不可持续的,它会产生大量的追踪数据和性能开销。下一步需要引入更智能的采样策略,例如基于概率的采样,或者更高级的基于尾部的采样(tail-based sampling),后者允许我们在整个追踪完成后再决定是否保留它,能更好地捕获错误和长尾请求。
其次,当前的上下文传递机制虽然有效,但在更复杂的Celery工作流中(如chains或chords)可能会遇到挑战。需要确保在任务之间传递状态时,追踪上下文也能被正确地接力下去,这可能需要更深度的Celery信号定制。
最后,追踪只是可观测性的三驾马车之一。一个真正健壮的系统还需要将这些Trace与结构化日志(包含traceId)以及关键业务指标(Metrics)关联起来。未来的工作方向是将traceId注入到所有日志中,并通过Prometheus等工具收集与任务处理相关的指标,最终在一个统一的平台(如Grafana)上实现日志、指标和追踪的下钻和关联分析。