构建基于 Apache Hudi 的统一可观测性数据湖以融合 APISIX 与 Sentry 数据


我们面临一个日益棘手的困境:部署在 Kubernetes 集群中的 API 网关 APISIX 每秒产生数以万计的访问日志,而后端微服务通过 Sentry 报告了大量的异常事件。当线上出现故障时,Sentry 警报拉响,但异常堆栈本身往往缺乏上下文。为了定位根因,工程师必须手动根据时间戳、用户 ID 等零散信息,在浩如烟海的 APISIX 日志中艰难地大海捞针,试图还原出异常发生前的完整请求链路。这个过程效率低下,且极易出错,尤其是在高并发的复杂调用场景下。

问题根源在于数据的割裂。APISIX 的访问日志和 Sentry 的异常事件是两个独立的数据孤岛。我们需要构建一个统一的数据模型,将请求在网关层的行为(如请求头、客户端 IP、认证信息、响应延迟)与应用层的异常无缝关联起来。初步构想是建立一个统一的可观测性数据湖,将所有相关数据近实时地汇集一处,并提供强大的分析查询能力。

技术选型的决策过程很直接。APISIX 因其基于 Lua 的高性能插件生态系统成为不二之选,我们可以开发一个自定义插件来捕获和丰富日志。Sentry 是我们现有的错误追踪系统。核心在于数据湖的构建,我们放弃了传统的 Elasticsearch 或日志服务方案,因为它们在处理海量结构化数据、支持复杂分析查询以及存储成本上存在短板。我们选择了 Apache Hudi,因为它能在 HDFS 或 S3 等对象存储之上提供事务、记录级别的更新/删除(Upsert)以及增量查询能力,这对于构建一个不断演进的可观测性数据集至关重要。数据从网关到数据湖的管道,我们采用 Kafka 作为缓冲队列,Flink 作为流处理引擎,因其强大的状态管理和对 Hudi 的原生支持。

整个架构的数据流如下:

flowchart TD
    subgraph "客户端请求"
        A[Client]
    end

    subgraph "API 网关层 (APISIX)"
        A -- HTTPS Request --> B(APISIX Gateway)
        B -- 1. 生成/获取 Trace ID --> P1[Custom Lua Plugin]
        P1 -- 2. 注入 Trace ID 到 Header --> C(Upstream Service)
        P1 -- 3. 构造日志并发送 --> D[Kafka Topic: apisix-logs]
    end

    subgraph "应用服务层"
        C -- 4. 读取 Trace ID --> SDK(Sentry SDK)
        SDK -- 5. 上报异常 (带 Trace ID) --> E[Sentry Platform]
    end

    subgraph "数据处理与存储"
        D -- 6. 消费日志 --> F[Apache Flink Job]
        F -- 7. 转换并写入 --> G[Apache Hudi Table on S3]
    end

    subgraph "分析与查询"
        G -- 8. SQL 查询 --> H{Query Engine}
        H -- Spark SQL --> I[Data Analyst]
        H -- Presto/Trino --> J[Dashboard]
    end

我们的实现将从核心的 APISIX 自定义插件开始。

一、APISIX 自定义 Logger 插件:数据源头的捕获与丰富

APISIX 的插件在请求生命周期的不同阶段执行。我们的 hudi-logger 插件需要在 access 阶段注入追踪 ID,并在 log 阶段收集所有信息并将其发送到 Kafka。在真实项目中,这个插件必须是健壮的,能够处理 Kafka 连接失败、序列化错误等异常情况。

首先,定义插件的 schema,这决定了用户如何在 config.yaml 中配置它。

plugins/hudi-logger.lua:

-- hudi-logger.lua
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local json = require("cjson.safe")
local producer = require("resty.kafka.producer")
local uuid = require("resty.uuid")

local schema = {
    type = "object",
    properties = {
        kafka_brokers = {
            type = "array",
            items = {
                type = "string",
            },
            minItems = 1
        },
        kafka_topic = {
            type = "string"
        },
        trace_header_name = {
            type = "string",
            default = "X-Request-Trace-Id"
        },
        -- 生产级配置:用于控制Kafka producer的行为
        kafka_producer_config = {
            type = "object",
            properties = {
                keepalive_timeout = { type = "integer", default = 60000 },
                keepalive_pool_size = { type = "integer", default = 10 },
                broker_request_timeout = { type = "integer", default = 3000 },
                max_retry_times = { type = "integer", default = 3 }
            },
            default = {}
        }
    },
    required = {"kafka_brokers", "kafka_topic"}
}

local _M = {
    version = 0.1,
    priority = 10, -- 确保在其他日志插件之前或之后执行
    name = "hudi-logger",
    schema = schema
}

function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end

-- 插件的核心逻辑
function _M.access(conf, ctx)
    local trace_id = core.request.header(ctx, conf.trace_header_name)
    if not trace_id or trace_id == "" then
        -- 如果请求头中没有,则生成一个新的 UUID v4
        trace_id = uuid.new()
    end

    -- 将 trace_id 存储在 ctx 中,以便 log 阶段可以访问
    ctx.vars.trace_id = trace_id
    
    -- 将 trace_id 注入到发往上游服务的请求头中
    core.request.set_header(ctx, conf.trace_header_name, trace_id)
end

function _M.log(conf, ctx)
    local trace_id = ctx.vars.trace_id
    if not trace_id then
        -- 理论上 access 阶段已经设置,这是一个防御性编程
        core.log.warn("hudi-logger: trace_id not found in ctx")
        return
    end
    
    -- 收集日志信息
    local log_entry = {
        trace_id = trace_id,
        request_id = core.request.get_id(),
        timestamp = ngx.now() * 1000, -- 毫秒级时间戳
        client_ip = core.request.get_remote_addr(ctx),
        method = core.request.get_method(ctx),
        host = core.request.header(ctx, "host"),
        uri = core.request.get_uri(ctx),
        status_code = ctx.response.status,
        latency_ms = ctx.latency,
        user_agent = core.request.header(ctx, "user-agent"),
        -- 可选择性地记录部分请求/响应体,但要注意性能和安全
        -- request_body_snippet = get_snippet(core.request.get_body(ctx)),
        upstream_addr = ctx.upstream and ctx.upstream.addr or "N/A",
        service_id = ctx.service and ctx.service.id or "N/A",
        route_id = ctx.route and ctx.route.id or "N/A"
    }
    
    -- 序列化为 JSON
    local ok, msg_json = pcall(json.encode, log_entry)
    if not ok then
        core.log.error("hudi-logger: failed to encode log entry to JSON: ", msg_json)
        return
    }

    -- 初始化 Kafka producer
    -- 在生产环境中,这里的 producer 实例应该被缓存和复用,避免每次请求都创建新连接
    local bp, err = producer:new(conf.kafka_brokers, conf.kafka_producer_config)
    if not bp then
        core.log.error("hudi-logger: failed to create kafka producer: ", err)
        return
    }
    
    -- 发送消息到 Kafka
    -- 这里的 key 使用 trace_id 可以保证同一个 trace 的日志被发送到 Kafka 的同一个分区,
    -- 这对于后续 Flink 处理的有序性或状态一致性可能很重要。
    local bytes, err = bp:send(conf.kafka_topic, trace_id, msg_json)
    if not bytes then
        core.log.error("hudi-logger: failed to send message to kafka topic ",
            conf.kafka_topic, ": ", err)
        return
    }
    
    -- 注意:在真实高并发场景,`bp:send` 是同步阻塞操作,可能会影响性能。
    -- 优化方案是使用 ngx.timer.at 实现异步批量发送,这会显著增加插件复杂度。
    -- 这里为了演示清晰,采用同步方式。
end

return _M

将此文件放入 APISIX 的 apisix/plugins/ 目录下,并在 config.yaml 中启用它。

# in config.yaml
apisix:
  plugins:
    - ...
    - hudi-logger # 启用插件

# 在具体路由上配置
routes:
  - id: "my-backend-route"
    uri: "/api/v1/*"
    upstream:
      type: roundrobin
      nodes:
        "backend-service:8080": 1
    plugins:
      hudi-logger:
        kafka_brokers:
          - "kafka-broker-1:9092"
          - "kafka-broker-2:9092"
        kafka_topic: "apisix-raw-logs"
        trace_header_name: "X-Trace-Id"

这个插件实现了核心功能:确保每个请求都有一个唯一的 X-Trace-Id,将此 ID 传递给后端服务,并将包含此 ID 的详细日志发送到 Kafka。后端服务(如 Java, Python, Go)只需配置其 Sentry SDK,从请求头中读取 X-Trace-Id 并将其设置为 Sentry 事件的 trace_id 标签。这样,数据孤岛的第一座桥梁就搭建完成了。

数据进入 Kafka 后,我们需要一个 Flink 作业来消费这些 JSON 格式的日志,并以 Hudi 表的格式写入对象存储。这个过程不仅是数据搬运,还包括了 schema 定义、分区策略和写入模式的选择。

我们选择 Copy-on-Write (CoW) 模式,因为它为读密集型分析场景提供了更好的性能。分区键选择 dt (日期) 和 hour,这是一种常见的按时间分区策略,能有效裁剪查询范围。记录键(Record Key)自然是 trace_id,因为它唯一标识了一个请求事件。

以下是一个完整的 Flink SQL 作业示例,它使用 Flink SQL Client 或 Ververica Platform 提交。在生产中,这通常是一个打包好的 Java/Scala 应用。

-- Flink SQL 作业脚本

-- 设置作业参数
SET 'execution.checkpointing.interval' = '1min';
SET 'state.backend' = 'rocksdb';
-- ... 其他生产环境配置

-- 步骤 1: 创建 Kafka Source 表,用于读取 APISIX 的原始 JSON 日志
CREATE TABLE kafka_apisix_logs (
    `trace_id` STRING,
    `request_id` STRING,
    `timestamp` BIGINT,
    `client_ip` STRING,
    `method` STRING,
    `host` STRING,
    `uri` STRING,
    `status_code` INT,
    `latency_ms` BIGINT,
    `user_agent` STRING,
    `upstream_addr` STRING,
    `service_id` STRING,
    `route_id` STRING,
    -- 使用 Flink 的计算列从时间戳中提取分区字段
    `ts_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
    `dt` AS DATE_FORMAT(`ts_ltz`, 'yyyy-MM-dd'),
    `hour` AS DATE_FORMAT(`ts_ltz`, 'HH')
) WITH (
    'connector' = 'kafka',
    'topic' = 'apisix-raw-logs',
    'properties.bootstrap.servers' = 'kafka-broker-1:9092,kafka-broker-2:9092',
    'properties.group.id' = 'flink-hudi-ingestion-group',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

-- 步骤 2: 创建 Hudi Sink 表,定义数据湖中的最终存储格式
-- 这是一个 Copy-on-Write 表,适用于读多写少的分析场景
CREATE TABLE hudi_observability_lake (
    `trace_id` STRING PRIMARY KEY NOT ENFORCED, -- 记录键
    `request_id` STRING,
    `timestamp` BIGINT,
    `client_ip` STRING,
    `method` STRING,
    `host` STRING,
    `uri` STRING,
    `status_code` INT,
    `latency_ms` BIGINT,
    `user_agent` STRING,
    `upstream_addr` STRING,
    `service_id` STRING,
    `route_id` STRING,
    `ts_ltz` TIMESTAMP(3) WITH LOCAL TIME ZONE,
    `dt` STRING, -- 分区键 1
    `hour` STRING  -- 分区键 2
)
PARTITIONED BY (`dt`, `hour`)
WITH (
    'connector' = 'hudi',
    'path' = 's3://my-observability-lake/apisix_logs',
    'table.type' = 'COPY_ON_WRITE',
    'write.operation' = 'upsert', -- 使用 upsert 保证数据幂等性
    'hoodie.datasource.write.recordkey.field' = 'trace_id',
    'hoodie.datasource.write.precombine.field' = 'timestamp', -- 预合并字段,当 trace_id 冲突时,保留时间戳最新的记录
    'partition.path.field' = 'dt,hour',
    'hoodie.datasource.write.hive_style_partitioning' = 'true', -- 使用 Hive 风格分区 (dt=.../hour=...)
    'compaction.async.enabled' = 'false', -- CoW 模式下不需要 compaction
    'write.tasks' = '4', -- 写入并行度
    'hive_sync.enable' = 'true',          -- 自动同步元数据到 Hive Metastore
    'hive_sync.metastore.uris' = 'thrift://hive-metastore:9083',
    'hive_sync.db' = 'observability',
    'hive_sync.table' = 'apisix_logs'
);

-- 步骤 3: 启动 INSERT INTO 作业,将数据从 Kafka 流向 Hudi
INSERT INTO hudi_observability_lake
SELECT
    `trace_id`,
    `request_id`,
    `timestamp`,
    `client_ip`,
    `method`,
    `host`,
    `uri`,
    `status_code`,
    `latency_ms`,
    `user_agent`,
    `upstream_addr`,
    `service_id`,
    `route_id`,
    `ts_ltz`,
    `dt`,
    `hour`
FROM kafka_apisix_logs;

这个 Flink 作业一旦运行,就会持续地将 APISIX 的日志数据写入到 S3 上的 Hudi 表中。Hudi 会负责处理小文件合并、数据版本控制和事务性写入,最终在 S3 上形成一个按日、按小时分区的高度优化的 Parquet 文件集合。

三、数据分析与价值实现

数据进入 Hudi 表后,我们便拥有了一个强大的分析平台。通过 Spark SQL、Presto 或 Trino 等查询引擎,我们可以执行之前无法想象的复杂关联查询。

假设我们已经通过 Sentry 的 Webhook 或其他方式,将 Sentry 异常的核心信息(如 issue_id, project, message, trace_id)也同步到了另一个名为 sentry_issues 的 Hudi 表中。

现在,我们可以回答最初的那个棘手问题了。

查询场景: 找到导致特定 Sentry issue (issue_id = '12345') 的所有请求的客户端 IP 和 User-Agent 分布。

-- 使用 Spark SQL 或 Trino 查询
SELECT
    t1.client_ip,
    t1.user_agent,
    COUNT(*) as request_count
FROM
    observability.apisix_logs t1
JOIN
    observability.sentry_issues t2 ON t1.trace_id = t2.trace_id
WHERE
    t2.issue_id = '12345'
    AND t1.dt >= '2023-11-14' -- 利用分区裁剪,极大提升查询效率
GROUP BY
    t1.client_ip,
    t1.user_agent
ORDER BY
    request_count DESC
LIMIT 10;

这个查询的威力在于它直接将应用层异常(sentry_issues)和网关层流量(apisix_logs)通过 trace_id 关联起来。工程师不再需要在多个系统之间切换,只需一条 SQL 就能获得精准的上下文信息,从而将故障排查时间从数小时缩短到几分钟。

更进一步,我们可以进行更复杂的行为分析:

  • 分析高延迟请求SELECT * FROM apisix_logs WHERE latency_ms > 2000 AND dt = '...'
  • 识别异常状态码的来源SELECT client_ip, COUNT(*) FROM apisix_logs WHERE status_code >= 500 GROUP BY client_ip
  • 结合业务维度:如果 APISIX 日志中还包含了通过 JWT 解析出的 user_id,那么分析就可以深入到单个用户的行为层面。

方案的局限性与未来迭代路径

这套架构虽然强大,但也并非没有权衡。首先,它引入了更多的技术组件(Kafka, Flink),增加了系统的运维复杂性。数据从 APISIX 到 Hudi 可供查询存在分钟级的延迟,不适用于需要亚秒级响应的实时监控场景。

其次,APISIX Lua 插件的性能至关重要。在我们的例子中,同步发送 Kafka 消息在极端高并发下可能成为瓶颈。一个生产级的插件需要实现异步、批量的发送机制,例如使用 ngx.timer.at 将日志先暂存在 lua_shared_dict 中,再由一个后台 timer 定期批量发送。这需要更复杂的并发控制和内存管理。

未来的迭代方向可以集中在以下几点:

  1. 数据成本优化:对于旧数据,可以利用 Hudi 的 CLEAN 服务自动归档或删除,或者使用 S3 的生命周期策略将老的分区迁移到成本更低的存储层(如 Glacier)。
  2. Schema 演进:随着业务发展,日志的字段会发生变化。需要建立一套基于 Schema Registry 的机制来管理和演进 Flink 作业和 Hudi 表的 schema,避免不兼容的变更导致数据管道中断。
  3. 查询性能优化:除了分区,还可以利用 Hudi 的聚簇(Clustering)功能,根据常用的查询条件(如 user_id)对数据进行物理重排,进一步提升查询性能。
  4. 数据反哺:数据湖中的分析结果可以反哺给线上系统。例如,可以运行一个批处理作业,识别出恶意 IP,并将结果写入 Redis,供 APISIX 的动态防火墙插件实时读取和拦截。这形成了一个从数据采集、分析到决策执行的完整闭环。

  目录