构建保护AI服务的Kong自定义Lua插件并采用Vitest进行集成测试


为数据科学团队的多个Python模型服务提供统一、高性能的API入口是当前架构面临的核心挑战。这些模型服务(通常基于FastAPI或Flask)计算成本高昂,且由不同团队维护,导致安全实践、输入验证和访问控制策略千差万别。将这些非功能性需求直接在Python应用层实现,不仅会加剧代码冗余,更会因Python的全局解释器锁(GIL)在高并发场景下引入显著的性能瓶颈。

方案A:应用层安全与验证

在每个AI服务内部实现认证和验证逻辑是最直接的方案。

  • 优势:

    • 逻辑内聚:安全代码与业务逻辑绑定,开发人员在单一代码库中工作。
    • 完全控制:数据科学家可以根据模型的特定需求定制最精细的验证规则。
  • 劣势:

    • 策略不一致:不同服务可能采用不同的认证库、错误格式和验证逻辑,形成管理上的孤岛。
    • 性能开销:每个请求都必须由Python进程处理,即使是无效请求也会消耗昂贵的计算资源,尤其是在面对DDoS攻击或API滥用时。
    • 开发负担:数据科学家被迫关注基础设施层面的安全问题,而非其核心任务——模型开发。
    • 技术栈锁定:安全逻辑与Python深度耦合,未来若引入其他语言(如Go、Rust)构建的模型服务,则需重写整套逻辑。

方案B:网关层集中式处理

在API网关(如Kong)层面统一处理这些横切关注点。通过自定义插件,在请求到达上游AI服务之前完成认证、授权和初步验证。

  • 优势:

    • 集中管理:所有安全策略在网关层统一配置和更新,与后端服务解耦。
    • 高性能:Kong基于OpenResty(Nginx + LuaJIT),其事件驱动、非阻塞的I/O模型能在极低的延迟下处理海量并发请求。LuaJIT的即时编译性能远超CPython。
    • 语言无关:网关保护对上游服务透明,无论其使用何种技术栈。
    • 抢先失败(Fail Fast):无效请求在网关层就被拒绝,根本不会触及和消耗后端的AI计算资源。
  • 劣势:

    • 技术门槛:需要掌握Kong插件开发,即Lua编程和OpenResty的API。
    • 测试复杂性:基础设施代码的测试不同于应用代码,需要构建包含网关和模拟后端的完整集成测试环境。

决策与理由

最终选择方案B。尽管存在初期的学习成本,但从系统可维护性、性能和安全性的长期角度看,将安全边界前移至API网关是唯一合理的架构选择。其带来的集中治理能力和对后端服务的有效保护,是应用层方案无法比拟的。关键在于,我们必须建立一套可靠的自动化测试流程来克服“测试复杂性”这一核心短板,确保插件的质量和行为符合预期。我们将使用Vitest,一个现代化的JavaScript/TypeScript测试框架,结合Docker Compose来编排一个完整的、自包含的集成测试环境。

核心实现概览

整体架构如下,客户端请求首先经过Kong,我们的自定义插件 ai-auth-validator 会被触发。该插件负责三项任务:验证JWT、向内部认证服务发起子请求获取权限元数据、以及对请求体进行JSON Schema校验。

sequenceDiagram
    participant Client
    participant Kong Gateway
    participant Custom Plugin as ai-auth-validator
    participant Auth Service
    participant AI Model Service

    Client->>Kong Gateway: POST /v1/models/predict (Header: Auth-Token, Body: {...})
    Kong Gateway->>Custom Plugin: execute access phase
    Custom Plugin->>Custom Plugin: 1. Extract and decode JWT
    alt JWT valid
        Custom Plugin->>Auth Service: Sub-request to /verify with user_id
        Auth Service-->>Custom Plugin: { "permissions": ["predict:model_a"] }
        Custom Plugin->>Custom Plugin: 2. Check permissions
        Custom Plugin->>Custom Plugin: 3. Validate request body against schema
        alt Permissions & Schema OK
            Custom Plugin-->>Kong Gateway: Allow request
            Kong Gateway->>AI Model Service: Proxy request
            AI Model Service-->>Kong Gateway: Model response
            Kong Gateway-->>Client: Model response
        else Permissions or Schema Invalid
            Custom Plugin-->>Kong Gateway: Deny with 403/400
            Kong Gateway-->>Client: Error response
        end
    else JWT invalid
        Custom Plugin-->>Kong Gateway: Deny with 401
        Kong Gateway-->>Client: Error response
    end

1. 模拟的AI模型服务 (Python/FastAPI)

这是一个简单的服务,它定义了严格的输入模型。我们的目标是在Kong层面就阻止不符合此模型的请求。

model_service/main.py:

# model_service/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import logging
import time

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = FastAPI()

class ModelInput(BaseModel):
    request_id: str
    feature_vector: list[float] = Field(..., min_length=10, max_length=10)
    context: dict[str, str]

@app.post("/v1/models/predict")
def predict(data: ModelInput):
    """
    模拟一个耗时的AI模型预测过程
    """
    logging.info(f"Received prediction request: {data.request_id}")
    
    # 模拟耗时的计算
    try:
        # 实际项目中这里是模型推理代码
        time.sleep(0.5) 
        prediction_score = sum(data.feature_vector) / len(data.feature_vector)
        logging.info(f"Prediction successful for {data.request_id}")
        return {"request_id": data.request_id, "prediction": prediction_score}
    except Exception as e:
        logging.error(f"Prediction failed for {data.request_id}: {e}")
        raise HTTPException(status_code=500, detail="Internal model error")

@app.get("/health")
def health_check():
    return {"status": "ok"}

2. Kong自定义插件 (Lua)

这是架构的核心。插件代码分为 schema.lua(定义配置)和 handler.lua(实现逻辑)。

kong/plugins/ai-auth-validator/schema.lua:

-- kong/plugins/ai-auth-validator/schema.lua
local typedefs = require "kong.db.schema.typedefs"

-- 定义插件的配置项,这些可以在Kong Admin API中进行配置
return {
  name = "ai-auth-validator",
  fields = {
    -- 这是一个嵌套的表,可以有多个字段
    {
      config = {
        type = "record",
        fields = {
          { auth_service_url = { type = "string", required = true, default = "http://auth-service:8000/verify" }, },
          { token_header_name = { type = "string", required = true, default = "X-Auth-Token" }, },
          { request_body_schema = { type = "string", required = true }, }, -- JSON Schema as a string
          { cache_ttl = { type = "number", required = true, default = 300 }, }, -- 认证结果缓存时间(秒)
        },
      },
    },
  },
}

kong/plugins/ai-auth-validator/handler.lua:

-- kong/plugins/ai-auth-validator/handler.lua
local BasePlugin = require "kong.plugins.base_plugin"
local cjson = require "cjson.safe"
local resty_resolver = require "resty.dns.resolver"
local jsonschema = require "resty.jsonschema"

local AIAuthValidatorHandler = BasePlugin:extend()

AIAuthValidatorHandler.PRIORITY = 1000 -- 确保在其他插件之前执行
AIAuthValidatorHandler.VERSION = "0.1.0"

function AIAuthValidatorHandler:new()
  AIAuthValidatorHandler.super.new(self, "ai-auth-validator")
end

-- 在access阶段执行核心逻辑
function AIAuthValidatorHandler:access(conf)
  AIAuthValidatorHandler.super.access(self)

  local token_header = conf.token_header_name
  local token = kong.request.get_header(token_header)

  -- 1. 检查是否存在Token
  if not token then
    return kong.response.exit(401, { message = "Authentication token is missing" })
  end

  -- 2. 尝试从Kong的缓存中获取认证结果
  --    缓存key是token本身,这在真实项目中可能需要hash处理
  local cache_key = "ai_auth:" .. token
  local cached_auth_data = kong.cache:get(cache_key)

  local auth_data
  if cached_auth_data then
    -- 缓存命中
    kong.log.info("Auth cache hit for token")
    auth_data = cjson.decode(cached_auth_data)
  else
    -- 缓存未命中,执行子请求到认证服务
    kong.log.info("Auth cache miss, performing sub-request")
    local res, err = kong.http.request {
      method = "POST",
      url = conf.auth_service_url,
      headers = {
        ["Content-Type"] = "application/json",
      },
      body = cjson.encode({ token = token }),
    }

    if err or not res or res.status ~= 200 then
      kong.log.err("Auth sub-request failed: ", err or (res and res.body))
      return kong.response.exit(401, { message = "Invalid token or auth service unavailable" })
    end
    
    auth_data = cjson.decode(res.body)
    -- 将成功的认证结果写入缓存
    kong.cache:set(cache_key, res.body, conf.cache_ttl)
  end

  -- 可以在这里基于auth_data中的权限信息做更复杂的授权判断
  -- 此处简化为只要认证通过即可
  kong.log.info("Token validation successful for user: ", auth_data.user_id)

  -- 3. 校验请求体
  local body_str, body_err = kong.request.get_raw_body()
  if body_err then
    return kong.response.exit(400, { message = "Failed to read request body" })
  end
  if not body_str or body_str == "" then
     return kong.response.exit(400, { message = "Request body is empty" })
  end

  local body_json, json_err = cjson.decode(body_str)
  if json_err then
    return kong.response.exit(400, { message = "Invalid JSON format in request body" })
  end
  
  -- 加载用户配置的JSON Schema
  local schema, schema_err = cjson.decode(conf.request_body_schema)
  if schema_err then
    kong.log.err("Failed to parse JSON schema from plugin config: ", schema_err)
    return kong.response.exit(500, { message = "Internal server error: Invalid schema configuration" })
  end
  
  local validator = jsonschema.new(schema)
  local ok, validation_errors = validator:validate(body_json)

  if not ok then
    local error_messages = {}
    for _, err_obj in ipairs(validation_errors) do
      table.insert(error_messages, string.format("Field '%s': %s", err_obj.property, err_obj.message))
    end
    return kong.response.exit(400, { message = "Request body validation failed", errors = error_messages })
  end
  
  kong.log.info("Request body validation successful")
end

return AIAuthValidatorHandler

使用Vitest进行集成测试

这里的核心思想是创建一个docker-compose.yml来模拟整个生产环境的微缩版本,包括Kong、我们的插件、模型服务以及一个模拟的认证服务。然后,Vitest测试脚本将作为客户端,向配置好的Kong端点发送HTTP请求,并断言其行为是否正确。

docker-compose.test.yml:

# docker-compose.test.yml
version: '3.8'

services:
  kong-db:
    image: postgres:13
    environment:
      - POSTGRES_USER=kong
      - POSTGRES_DB=kong
      - POSTGRES_PASSWORD=kong
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "kong"]
      interval: 5s
      timeout: 5s
      retries: 5

  kong-migration:
    image: kong:3.4
    depends_on:
      kong-db:
        condition: service_healthy
    environment:
      - KONG_DATABASE=postgres
      - KONG_PG_HOST=kong-db
      - KONG_PG_USER=kong
      - KONG_PG_PASSWORD=kong
    command: "kong migrations bootstrap"

  kong:
    image: kong:3.4
    depends_on:
      kong-migration:
        condition: service_completed_successfully
      model-service:
        condition: service_healthy
      auth-service:
        condition: service_healthy
    environment:
      - KONG_DATABASE=postgres
      - KONG_PG_HOST=kong-db
      - KONG_PG_USER=kong
      - KONG_PG_PASSWORD=kong
      - KONG_DNS_RESOLVER=127.0.0.11 # Use Docker's internal DNS
      - KONG_ADMIN_LISTEN=0.0.0.0:8001
      - KONG_PROXY_LISTEN=0.0.0.0:8000
      - KONG_PLUGINS=bundled,ai-auth-validator
      - KONG_LUA_PACKAGE_PATH=/usr/local/custom_plugins/?.lua;;
    volumes:
      # 将自定义插件代码挂载到Kong容器中
      - ./kong/plugins:/usr/local/custom_plugins
    ports:
      - "8000:8000"
      - "8001:8001"

  model-service:
    build:
      context: ./model_service
    ports:
      - "8080:80" # Expose on host for direct debugging if needed
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost/health"]
      interval: 5s
      timeout: 2s
      retries: 10

  auth-service:
    build:
      context: ./auth_service # A simple mock service
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost/health"]
      interval: 5s
      timeout: 2s
      retries: 10

Note: auth_service is a simple mock Python server that returns a 200 for “valid-token” and 401 for anything else.

integration.test.ts:

// integration.test.ts
import { describe, it, expect, beforeAll } from 'vitest';
import axios from 'axios';
import { execSync } from 'child_process';

const KONG_PROXY_URL = 'http://localhost:8000';
const KONG_ADMIN_URL = 'http://localhost:8001';

// 定义AI模型服务的输入JSON Schema
const AI_MODEL_SCHEMA = {
  type: "object",
  properties: {
    request_id: { type: "string" },
    feature_vector: {
      type: "array",
      items: { type: "number" },
      minItems: 10,
      maxItems: 10,
    },
    context: { type: "object" },
  },
  required: ["request_id", "feature_vector"],
};

// 工具函数,用于通过Kong Admin API配置服务、路由和插件
async function setupKong() {
  // 1. 创建一个指向 model-service 的 Service
  await axios.post(`${KONG_ADMIN_URL}/services`, {
    name: 'ai-model-service',
    url: 'http://model-service', // Docker internal DNS
  }).catch(() => console.log('Service already exists'));

  // 2. 创建一个指向该 Service 的 Route
  await axios.post(`${KONG_ADMIN_URL}/services/ai-model-service/routes`, {
    name: 'ai-model-route',
    paths: ['/v1/models/predict'],
    methods: ['POST'],
  }).catch(() => console.log('Route already exists'));

  // 3. 在 Route 上启用并配置我们的自定义插件
  await axios.post(`${KONG_ADMIN_URL}/routes/ai-model-route/plugins`, {
    name: 'ai-auth-validator',
    config: {
      auth_service_url: 'http://auth-service/verify',
      token_header_name: 'X-Auth-Token',
      // 将JSON Schema转换为字符串传递
      request_body_schema: JSON.stringify(AI_MODEL_SCHEMA),
      cache_ttl: 5, // 短缓存时间用于测试
    },
  }).catch(() => console.log('Plugin already enabled'));
}

describe('Kong AI Auth Validator Plugin Integration Test', () => {

  // 在所有测试开始前,启动docker-compose环境并配置Kong
  beforeAll(async () => {
    console.log('Starting Docker Compose environment for testing...');
    // 使用-d后台运行,--wait确保服务健康检查通过后才继续
    execSync('docker-compose -f docker-compose.test.yml up -d --build --wait', { stdio: 'inherit' });
    console.log('Docker environment is ready. Configuring Kong...');
    await new Promise(resolve => setTimeout(resolve, 5000)); // 等待Kong Admin API完全可用
    await setupKong();
    console.log('Kong configured. Starting tests.');
  }, 60000); // 增加超时时间

  it('should allow request with valid token and valid body', async () => {
    const validBody = {
      request_id: "test-123",
      feature_vector: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
      context: { "source": "test" }
    };

    const response = await axios.post(`${KONG_PROXY_URL}/v1/models/predict`, validBody, {
      headers: {
        'X-Auth-Token': 'valid-token',
        'Content-Type': 'application/json'
      }
    });

    expect(response.status).toBe(200);
    expect(response.data).toHaveProperty('prediction');
    expect(response.data.request_id).toBe('test-123');
  });

  it('should reject request with missing token', async () => {
    const validBody = {
      request_id: "test-456",
      feature_vector: Array(10).fill(0.5)
    };

    try {
      await axios.post(`${KONG_PROXY_URL}/v1/models/predict`, validBody);
    } catch (error: any) {
      expect(error.response.status).toBe(401);
      expect(error.response.data.message).toBe('Authentication token is missing');
    }
  });

  it('should reject request with invalid token', async () => {
    const validBody = {
      request_id: "test-789",
      feature_vector: Array(10).fill(0.5)
    };

    try {
      await axios.post(`${KONG_PROXY_URL}/v1/models/predict`, validBody, {
        headers: { 'X-Auth-Token': 'invalid-token' }
      });
    } catch (error: any) {
      expect(error.response.status).toBe(401);
      expect(error.response.data.message).toContain('Invalid token');
    }
  });

  it('should reject request with invalid body schema (wrong data type)', async () => {
    const invalidBody = {
      request_id: "test-abc",
      // feature_vector 应该是数字数组,这里是字符串
      feature_vector: ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"], 
    };

    try {
      await axios.post(`${KONG_PROXY_URL}/v1/models/predict`, invalidBody, {
        headers: { 'X-Auth-Token': 'valid-token' }
      });
    } catch (error: any) {
      expect(error.response.status).toBe(400);
      expect(error.response.data.message).toBe('Request body validation failed');
      expect(error.response.data.errors[0]).toContain("Field 'feature_vector.0': Expected number, got string");
    }
  });

   it('should reject request with invalid body schema (missing required field)', async () => {
    const invalidBody = {
      // 缺少 request_id
      feature_vector: Array(10).fill(0.1),
    };

    try {
      await axios.post(`${KONG_PROXY_URL}/v1/models/predict`, invalidBody, {
        headers: { 'X-Auth-Token': 'valid-token' }
      });
    } catch (error: any) {
      expect(error.response.status).toBe(400);
      expect(error.response.data.message).toBe('Request body validation failed');
      expect(error.response.data.errors[0]).toContain("Field 'root': Missing required property: request_id");
    }
  });

});

架构的扩展性与局限性

当前方案成功地将认证和验证逻辑从应用服务中剥离,并置于一个高性能、可集中管理的网关层。通过结合Docker和Vitest,我们为这个关键的基础设施组件建立了可靠的自动化集成测试,确保了其行为的正确性。

然而,该方案也存在一些局限性。首先,Lua虽然性能卓越,但其生态系统和开发者社区相较于Go或Python等语言要小,这可能带来人才招聘和维护上的挑战。其次,当前插件的JSON Schema是静态配置在插件实例中的。在模型频繁迭代,输入结构经常变化的场景下,手动更新插件配置会成为一个运维瓶颈。

未来的一个迭代方向是让插件动态地从一个中心化的Schema注册中心(如Schema Registry或一个简单的KV存储)拉取最新的验证规则。另一个方向是探索使用Kong的Go插件开发工具包(PDK),利用Go语言的强类型和丰富的库来构建更为复杂的插件逻辑,同时也能更好地融入主流的DevOps工具链。


  目录