我们面临一个日益棘手的困境:部署在 Kubernetes 集群中的 API 网关 APISIX 每秒产生数以万计的访问日志,而后端微服务通过 Sentry 报告了大量的异常事件。当线上出现故障时,Sentry 警报拉响,但异常堆栈本身往往缺乏上下文。为了定位根因,工程师必须手动根据时间戳、用户 ID 等零散信息,在浩如烟海的 APISIX 日志中艰难地大海捞针,试图还原出异常发生前的完整请求链路。这个过程效率低下,且极易出错,尤其是在高并发的复杂调用场景下。
问题根源在于数据的割裂。APISIX 的访问日志和 Sentry 的异常事件是两个独立的数据孤岛。我们需要构建一个统一的数据模型,将请求在网关层的行为(如请求头、客户端 IP、认证信息、响应延迟)与应用层的异常无缝关联起来。初步构想是建立一个统一的可观测性数据湖,将所有相关数据近实时地汇集一处,并提供强大的分析查询能力。
技术选型的决策过程很直接。APISIX 因其基于 Lua 的高性能插件生态系统成为不二之选,我们可以开发一个自定义插件来捕获和丰富日志。Sentry 是我们现有的错误追踪系统。核心在于数据湖的构建,我们放弃了传统的 Elasticsearch 或日志服务方案,因为它们在处理海量结构化数据、支持复杂分析查询以及存储成本上存在短板。我们选择了 Apache Hudi,因为它能在 HDFS 或 S3 等对象存储之上提供事务、记录级别的更新/删除(Upsert)以及增量查询能力,这对于构建一个不断演进的可观测性数据集至关重要。数据从网关到数据湖的管道,我们采用 Kafka 作为缓冲队列,Flink 作为流处理引擎,因其强大的状态管理和对 Hudi 的原生支持。
整个架构的数据流如下:
flowchart TD
subgraph "客户端请求"
A[Client]
end
subgraph "API 网关层 (APISIX)"
A -- HTTPS Request --> B(APISIX Gateway)
B -- 1. 生成/获取 Trace ID --> P1[Custom Lua Plugin]
P1 -- 2. 注入 Trace ID 到 Header --> C(Upstream Service)
P1 -- 3. 构造日志并发送 --> D[Kafka Topic: apisix-logs]
end
subgraph "应用服务层"
C -- 4. 读取 Trace ID --> SDK(Sentry SDK)
SDK -- 5. 上报异常 (带 Trace ID) --> E[Sentry Platform]
end
subgraph "数据处理与存储"
D -- 6. 消费日志 --> F[Apache Flink Job]
F -- 7. 转换并写入 --> G[Apache Hudi Table on S3]
end
subgraph "分析与查询"
G -- 8. SQL 查询 --> H{Query Engine}
H -- Spark SQL --> I[Data Analyst]
H -- Presto/Trino --> J[Dashboard]
end
我们的实现将从核心的 APISIX 自定义插件开始。
一、APISIX 自定义 Logger 插件:数据源头的捕获与丰富
APISIX 的插件在请求生命周期的不同阶段执行。我们的 hudi-logger 插件需要在 access 阶段注入追踪 ID,并在 log 阶段收集所有信息并将其发送到 Kafka。在真实项目中,这个插件必须是健壮的,能够处理 Kafka 连接失败、序列化错误等异常情况。
首先,定义插件的 schema,这决定了用户如何在 config.yaml 中配置它。
plugins/hudi-logger.lua:
-- hudi-logger.lua
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local json = require("cjson.safe")
local producer = require("resty.kafka.producer")
local uuid = require("resty.uuid")
local schema = {
type = "object",
properties = {
kafka_brokers = {
type = "array",
items = {
type = "string",
},
minItems = 1
},
kafka_topic = {
type = "string"
},
trace_header_name = {
type = "string",
default = "X-Request-Trace-Id"
},
-- 生产级配置:用于控制Kafka producer的行为
kafka_producer_config = {
type = "object",
properties = {
keepalive_timeout = { type = "integer", default = 60000 },
keepalive_pool_size = { type = "integer", default = 10 },
broker_request_timeout = { type = "integer", default = 3000 },
max_retry_times = { type = "integer", default = 3 }
},
default = {}
}
},
required = {"kafka_brokers", "kafka_topic"}
}
local _M = {
version = 0.1,
priority = 10, -- 确保在其他日志插件之前或之后执行
name = "hudi-logger",
schema = schema
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
-- 插件的核心逻辑
function _M.access(conf, ctx)
local trace_id = core.request.header(ctx, conf.trace_header_name)
if not trace_id or trace_id == "" then
-- 如果请求头中没有,则生成一个新的 UUID v4
trace_id = uuid.new()
end
-- 将 trace_id 存储在 ctx 中,以便 log 阶段可以访问
ctx.vars.trace_id = trace_id
-- 将 trace_id 注入到发往上游服务的请求头中
core.request.set_header(ctx, conf.trace_header_name, trace_id)
end
function _M.log(conf, ctx)
local trace_id = ctx.vars.trace_id
if not trace_id then
-- 理论上 access 阶段已经设置,这是一个防御性编程
core.log.warn("hudi-logger: trace_id not found in ctx")
return
end
-- 收集日志信息
local log_entry = {
trace_id = trace_id,
request_id = core.request.get_id(),
timestamp = ngx.now() * 1000, -- 毫秒级时间戳
client_ip = core.request.get_remote_addr(ctx),
method = core.request.get_method(ctx),
host = core.request.header(ctx, "host"),
uri = core.request.get_uri(ctx),
status_code = ctx.response.status,
latency_ms = ctx.latency,
user_agent = core.request.header(ctx, "user-agent"),
-- 可选择性地记录部分请求/响应体,但要注意性能和安全
-- request_body_snippet = get_snippet(core.request.get_body(ctx)),
upstream_addr = ctx.upstream and ctx.upstream.addr or "N/A",
service_id = ctx.service and ctx.service.id or "N/A",
route_id = ctx.route and ctx.route.id or "N/A"
}
-- 序列化为 JSON
local ok, msg_json = pcall(json.encode, log_entry)
if not ok then
core.log.error("hudi-logger: failed to encode log entry to JSON: ", msg_json)
return
}
-- 初始化 Kafka producer
-- 在生产环境中,这里的 producer 实例应该被缓存和复用,避免每次请求都创建新连接
local bp, err = producer:new(conf.kafka_brokers, conf.kafka_producer_config)
if not bp then
core.log.error("hudi-logger: failed to create kafka producer: ", err)
return
}
-- 发送消息到 Kafka
-- 这里的 key 使用 trace_id 可以保证同一个 trace 的日志被发送到 Kafka 的同一个分区,
-- 这对于后续 Flink 处理的有序性或状态一致性可能很重要。
local bytes, err = bp:send(conf.kafka_topic, trace_id, msg_json)
if not bytes then
core.log.error("hudi-logger: failed to send message to kafka topic ",
conf.kafka_topic, ": ", err)
return
}
-- 注意:在真实高并发场景,`bp:send` 是同步阻塞操作,可能会影响性能。
-- 优化方案是使用 ngx.timer.at 实现异步批量发送,这会显著增加插件复杂度。
-- 这里为了演示清晰,采用同步方式。
end
return _M
将此文件放入 APISIX 的 apisix/plugins/ 目录下,并在 config.yaml 中启用它。
# in config.yaml
apisix:
plugins:
- ...
- hudi-logger # 启用插件
# 在具体路由上配置
routes:
- id: "my-backend-route"
uri: "/api/v1/*"
upstream:
type: roundrobin
nodes:
"backend-service:8080": 1
plugins:
hudi-logger:
kafka_brokers:
- "kafka-broker-1:9092"
- "kafka-broker-2:9092"
kafka_topic: "apisix-raw-logs"
trace_header_name: "X-Trace-Id"
这个插件实现了核心功能:确保每个请求都有一个唯一的 X-Trace-Id,将此 ID 传递给后端服务,并将包含此 ID 的详细日志发送到 Kafka。后端服务(如 Java, Python, Go)只需配置其 Sentry SDK,从请求头中读取 X-Trace-Id 并将其设置为 Sentry 事件的 trace_id 标签。这样,数据孤岛的第一座桥梁就搭建完成了。
二、Flink 作业:流式数据写入 Apache Hudi
数据进入 Kafka 后,我们需要一个 Flink 作业来消费这些 JSON 格式的日志,并以 Hudi 表的格式写入对象存储。这个过程不仅是数据搬运,还包括了 schema 定义、分区策略和写入模式的选择。
我们选择 Copy-on-Write (CoW) 模式,因为它为读密集型分析场景提供了更好的性能。分区键选择 dt (日期) 和 hour,这是一种常见的按时间分区策略,能有效裁剪查询范围。记录键(Record Key)自然是 trace_id,因为它唯一标识了一个请求事件。
以下是一个完整的 Flink SQL 作业示例,它使用 Flink SQL Client 或 Ververica Platform 提交。在生产中,这通常是一个打包好的 Java/Scala 应用。
-- Flink SQL 作业脚本
-- 设置作业参数
SET 'execution.checkpointing.interval' = '1min';
SET 'state.backend' = 'rocksdb';
-- ... 其他生产环境配置
-- 步骤 1: 创建 Kafka Source 表,用于读取 APISIX 的原始 JSON 日志
CREATE TABLE kafka_apisix_logs (
`trace_id` STRING,
`request_id` STRING,
`timestamp` BIGINT,
`client_ip` STRING,
`method` STRING,
`host` STRING,
`uri` STRING,
`status_code` INT,
`latency_ms` BIGINT,
`user_agent` STRING,
`upstream_addr` STRING,
`service_id` STRING,
`route_id` STRING,
-- 使用 Flink 的计算列从时间戳中提取分区字段
`ts_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
`dt` AS DATE_FORMAT(`ts_ltz`, 'yyyy-MM-dd'),
`hour` AS DATE_FORMAT(`ts_ltz`, 'HH')
) WITH (
'connector' = 'kafka',
'topic' = 'apisix-raw-logs',
'properties.bootstrap.servers' = 'kafka-broker-1:9092,kafka-broker-2:9092',
'properties.group.id' = 'flink-hudi-ingestion-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- 步骤 2: 创建 Hudi Sink 表,定义数据湖中的最终存储格式
-- 这是一个 Copy-on-Write 表,适用于读多写少的分析场景
CREATE TABLE hudi_observability_lake (
`trace_id` STRING PRIMARY KEY NOT ENFORCED, -- 记录键
`request_id` STRING,
`timestamp` BIGINT,
`client_ip` STRING,
`method` STRING,
`host` STRING,
`uri` STRING,
`status_code` INT,
`latency_ms` BIGINT,
`user_agent` STRING,
`upstream_addr` STRING,
`service_id` STRING,
`route_id` STRING,
`ts_ltz` TIMESTAMP(3) WITH LOCAL TIME ZONE,
`dt` STRING, -- 分区键 1
`hour` STRING -- 分区键 2
)
PARTITIONED BY (`dt`, `hour`)
WITH (
'connector' = 'hudi',
'path' = 's3://my-observability-lake/apisix_logs',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'upsert', -- 使用 upsert 保证数据幂等性
'hoodie.datasource.write.recordkey.field' = 'trace_id',
'hoodie.datasource.write.precombine.field' = 'timestamp', -- 预合并字段,当 trace_id 冲突时,保留时间戳最新的记录
'partition.path.field' = 'dt,hour',
'hoodie.datasource.write.hive_style_partitioning' = 'true', -- 使用 Hive 风格分区 (dt=.../hour=...)
'compaction.async.enabled' = 'false', -- CoW 模式下不需要 compaction
'write.tasks' = '4', -- 写入并行度
'hive_sync.enable' = 'true', -- 自动同步元数据到 Hive Metastore
'hive_sync.metastore.uris' = 'thrift://hive-metastore:9083',
'hive_sync.db' = 'observability',
'hive_sync.table' = 'apisix_logs'
);
-- 步骤 3: 启动 INSERT INTO 作业,将数据从 Kafka 流向 Hudi
INSERT INTO hudi_observability_lake
SELECT
`trace_id`,
`request_id`,
`timestamp`,
`client_ip`,
`method`,
`host`,
`uri`,
`status_code`,
`latency_ms`,
`user_agent`,
`upstream_addr`,
`service_id`,
`route_id`,
`ts_ltz`,
`dt`,
`hour`
FROM kafka_apisix_logs;
这个 Flink 作业一旦运行,就会持续地将 APISIX 的日志数据写入到 S3 上的 Hudi 表中。Hudi 会负责处理小文件合并、数据版本控制和事务性写入,最终在 S3 上形成一个按日、按小时分区的高度优化的 Parquet 文件集合。
三、数据分析与价值实现
数据进入 Hudi 表后,我们便拥有了一个强大的分析平台。通过 Spark SQL、Presto 或 Trino 等查询引擎,我们可以执行之前无法想象的复杂关联查询。
假设我们已经通过 Sentry 的 Webhook 或其他方式,将 Sentry 异常的核心信息(如 issue_id, project, message, trace_id)也同步到了另一个名为 sentry_issues 的 Hudi 表中。
现在,我们可以回答最初的那个棘手问题了。
查询场景: 找到导致特定 Sentry issue (issue_id = '12345') 的所有请求的客户端 IP 和 User-Agent 分布。
-- 使用 Spark SQL 或 Trino 查询
SELECT
t1.client_ip,
t1.user_agent,
COUNT(*) as request_count
FROM
observability.apisix_logs t1
JOIN
observability.sentry_issues t2 ON t1.trace_id = t2.trace_id
WHERE
t2.issue_id = '12345'
AND t1.dt >= '2023-11-14' -- 利用分区裁剪,极大提升查询效率
GROUP BY
t1.client_ip,
t1.user_agent
ORDER BY
request_count DESC
LIMIT 10;
这个查询的威力在于它直接将应用层异常(sentry_issues)和网关层流量(apisix_logs)通过 trace_id 关联起来。工程师不再需要在多个系统之间切换,只需一条 SQL 就能获得精准的上下文信息,从而将故障排查时间从数小时缩短到几分钟。
更进一步,我们可以进行更复杂的行为分析:
- 分析高延迟请求:
SELECT * FROM apisix_logs WHERE latency_ms > 2000 AND dt = '...' - 识别异常状态码的来源:
SELECT client_ip, COUNT(*) FROM apisix_logs WHERE status_code >= 500 GROUP BY client_ip - 结合业务维度:如果 APISIX 日志中还包含了通过 JWT 解析出的
user_id,那么分析就可以深入到单个用户的行为层面。
方案的局限性与未来迭代路径
这套架构虽然强大,但也并非没有权衡。首先,它引入了更多的技术组件(Kafka, Flink),增加了系统的运维复杂性。数据从 APISIX 到 Hudi 可供查询存在分钟级的延迟,不适用于需要亚秒级响应的实时监控场景。
其次,APISIX Lua 插件的性能至关重要。在我们的例子中,同步发送 Kafka 消息在极端高并发下可能成为瓶颈。一个生产级的插件需要实现异步、批量的发送机制,例如使用 ngx.timer.at 将日志先暂存在 lua_shared_dict 中,再由一个后台 timer 定期批量发送。这需要更复杂的并发控制和内存管理。
未来的迭代方向可以集中在以下几点:
- 数据成本优化:对于旧数据,可以利用 Hudi 的
CLEAN服务自动归档或删除,或者使用 S3 的生命周期策略将老的分区迁移到成本更低的存储层(如 Glacier)。 - Schema 演进:随着业务发展,日志的字段会发生变化。需要建立一套基于 Schema Registry 的机制来管理和演进 Flink 作业和 Hudi 表的 schema,避免不兼容的变更导致数据管道中断。
- 查询性能优化:除了分区,还可以利用 Hudi 的聚簇(Clustering)功能,根据常用的查询条件(如
user_id)对数据进行物理重排,进一步提升查询性能。 - 数据反哺:数据湖中的分析结果可以反哺给线上系统。例如,可以运行一个批处理作业,识别出恶意 IP,并将结果写入 Redis,供 APISIX 的动态防火墙插件实时读取和拦截。这形成了一个从数据采集、分析到决策执行的完整闭环。