基于 FastAPI 和 Trino 构建 GCP 上的可伸缩异步数据查询网关


最初的矛盾点非常明确:业务分析团队需要一个API,通过它能直接查询存储在Google Cloud Storage (GCS)上的数据湖。我们选择Trino (Presto的分支) 作为查询引擎,因为它能很好地处理跨数据源的联邦查询。第一个版本的API实现简单粗暴——一个同步的FastAPI端点,接收SQL查询,直接调用trino-python-client执行,然后等待结果返回。

这个方案在查询简短、数据量小的情况下工作得尚可。但很快,随着查询复杂度的增加,问题开始集中爆发。一个需要扫描数TB Parquet文件的查询可能要执行5到15分钟。在HTTP世界里,这是一个无法接受的等待时间。负载均衡器或反向代理的超时设置(通常是30-120秒)会直接切断连接,导致客户端收到504 Gateway Timeout错误,而后台的Trino集群实际上还在消耗大量资源执行这个“无人认领”的查询。更糟糕的是,FastAPI的同步工作进程被长时间阻塞,无法处理其他请求,整个服务的吞吐量急剧下降。

我们需要一个完全不同的模型:一个异步的、非阻塞的查询提交与结果获取分离的架构。目标是让API调用立即返回,给予客户端一个查询ID,客户端可以稍后通过这个ID来轮询状态并获取最终结果。

架构决策与技术选型

要实现这个异步查询网关,核心在于解耦“提交”和“执行”。整个流程被重新设计为:

  1. 提交 (Submit): 客户端通过 POST /queries 发送SQL查询。服务验证请求,生成一个唯一的 query_id,将查询任务推送到一个后台处理系统,并立即向客户端返回 query_id
  2. 执行 (Execute): 后台工作进程接收到任务,开始通过Trino执行查询。同时,它会持续更新该 query_id 在一个共享状态存储中的状态(如 PENDING, RUNNING, SUCCEEDED, FAILED)。
  3. 轮询 (Poll): 客户端使用 GET /queries/{query_id}/status 定期检查查询状态。
  4. 获取 (Fetch): 当状态变为 SUCCEEDED 后,客户端通过 GET /queries/{query_id}/results 获取结果。对于大数据量的结果,直接在JSON响应中返回是不可行的,正确的做法是将结果存储在GCS中,并提供一个有时效性的签名URL供客户端下载。
sequenceDiagram
    participant Client
    participant FastAPI Gateway
    participant Background Worker
    participant Redis (State Store)
    participant Trino
    participant GCS (Result Store)

    Client->>+FastAPI Gateway: POST /queries (sql="...")
    FastAPI Gateway->>FastAPI Gateway: Generate query_id
    FastAPI Gateway->>Redis: SET query_id:status PENDING
    FastAPI Gateway->>Background Worker: Start job for query_id
    FastAPI Gateway-->>-Client: 202 Accepted ({"query_id": "..."})

    loop Poll Status
        Client->>+FastAPI Gateway: GET /queries/{query_id}/status
        FastAPI Gateway->>Redis: GET query_id:status
        Redis-->>FastAPI Gateway: "RUNNING"
        FastAPI Gateway-->>-Client: 200 OK ({"status": "RUNNING"})
    end

    Note right of Background Worker: Query execution in progress
    Background Worker->>Redis: SET query_id:status RUNNING
    Background Worker->>+Trino: Execute Query
    Trino-->>-Background Worker: Query results
    Background Worker->>+GCS: Upload results.parquet
    GCS-->>-Background Worker: gcs_uri
    Background Worker->>Redis: SET query_id:status SUCCEEDED, result_uri: gcs_uri
    
    Client->>+FastAPI Gateway: GET /queries/{query_id}/status
    FastAPI Gateway->>Redis: GET query_id:status
    Redis-->>FastAPI Gateway: "SUCCEEDED"
    FastAPI Gateway-->>-Client: 200 OK ({"status": "SUCCEEDED"})

    Client->>+FastAPI Gateway: GET /queries/{query_id}/results
    FastAPI Gateway->>Redis: GET query_id:result_uri
    Redis-->>FastAPI Gateway: gcs_uri
    FastAPI Gateway->>GCS: Generate Signed URL for gcs_uri
    FastAPI Gateway-->>-Client: 302 Found (Location: signed_url)
    Client->>+GCS: Download results.parquet
    GCS-->>-Client: File content

基于此架构,技术栈也变得清晰:

  • Web框架: 继续使用 FastAPI,其内置的 BackgroundTask 是实现后台任务执行的轻量级方案,无需引入重量级的任务队列(如Celery),对于这个场景足够了。
  • 状态存储: Redis (在GCP上使用Memorystore for Redis)。它的高性能键值存储特性非常适合管理查询任务的瞬时状态。
  • 结果存储: **Google Cloud Storage (GCS)**。对象存储是存放大规模查询结果的理想选择,成本低廉且可扩展。
  • 部署环境: **Google Kubernetes Engine (GKE)**。容器化部署可以轻松管理应用的生命周期,并根据负载进行水平扩展。

核心代码实现

我们从配置和模型定义开始。在真实项目中,配置绝不能硬编码。使用Pydantic的BaseSettings可以方便地从环境变量中加载配置,这对于在GKE中通过ConfigMap或Secrets注入配置至关重要。

core/config.py:

import logging
from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """
    Application settings loaded from environment variables.
    """
    PROJECT_ID: str = "your-gcp-project-id"
    
    # Redis (Memorystore) settings
    REDIS_HOST: str = "localhost"
    REDIS_PORT: int = 6379
    REDIS_DB: int = 0
    QUERY_STATE_TTL_SECONDS: int = 3600 * 24 # State expires in 24 hours

    # Trino settings
    TRINO_HOST: str = "localhost"
    TRINO_PORT: int = 8080
    TRINO_USER: str = "trino-user"
    TRINO_CATALOG: str = "gcs"
    TRINO_SCHEMA: str = "default"

    # GCS settings
    GCS_RESULT_BUCKET: str = "your-trino-query-results-bucket"
    GCS_SIGNED_URL_EXPIRATION_SECONDS: int = 3600 # 1 hour

    LOG_LEVEL: str = "INFO"

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache()
def get_settings() -> Settings:
    """
    Returns the settings instance, cached for performance.
    """
    return Settings()

# Setup logging based on settings
logging.basicConfig(level=get_settings().LOG_LEVEL.upper())
logger = logging.getLogger(__name__)

接下来是API的数据模型。使用Pydantic可以确保类型安全和数据验证。

api/models.py:

from enum import Enum
from pydantic import BaseModel, Field

class QueryStatus(str, Enum):
    """
    Enumeration for query job statuses.
    """
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    SUCCEEDED = "SUCCEEDED"
    FAILED = "FAILED"
    CANCELLED = "CANCELLED"

class QueryCreateRequest(BaseModel):
    """
    Request model for submitting a new query.
    """
    sql: str = Field(..., min_length=10, description="The SQL query to be executed by Trino.")

class QueryCreateResponse(BaseModel):
    """
    Response model after successfully submitting a query.
    """
    query_id: str = Field(..., description="The unique identifier for the submitted query.")

class QueryStatusResponse(BaseModel):
    """
    Response model for checking the status of a query.
    """
    query_id: str
    status: QueryStatus
    error_message: str | None = None
    created_at: str | None = None
    finished_at: str | None = None

后台任务是整个系统的引擎。这个函数将负责连接Trino,执行查询,处理异常,并将结果上传到GCS。这里的错误处理必须非常健壮。

services/trino_executor.py:

import uuid
import datetime
import logging
import io
import pandas as pd
from trino.dbapi import connect
from trino.exceptions import TrinoQueryError
from google.cloud import storage

from core.config import get_settings
from services.state_manager import update_query_state, QueryStateData, get_query_state
from api.models import QueryStatus

logger = logging.getLogger(__name__)
settings = get_settings()

def execute_trino_query_and_store_results(query_id: str, sql: str):
    """
    The core background task function. It executes a Trino query,
    handles its lifecycle, and stores results in GCS.
    """
    start_time = datetime.datetime.utcnow()
    logger.info(f"Starting background execution for query_id: {query_id}")
    
    try:
        update_query_state(query_id, QueryStateData(status=QueryStatus.RUNNING))

        conn = connect(
            host=settings.TRINO_HOST,
            port=settings.TRINO_PORT,
            user=settings.TRINO_USER,
            catalog=settings.TRINO_CATALOG,
            schema=settings.TRINO_SCHEMA,
        )
        cur = conn.cursor()
        
        logger.info(f"Executing SQL for query_id {query_id}: {sql[:200]}...")
        cur.execute(sql)
        rows = cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        
        if not rows:
            logger.warning(f"Query {query_id} returned no results.")
            # Even with no results, it's a success. Handle this case gracefully.
            df = pd.DataFrame(columns=columns)
        else:
            df = pd.DataFrame(rows, columns=columns)
        
        # A common mistake is to handle large results in memory.
        # For truly large scale, one should use Trino's `CREATE TABLE AS SELECT ... WITH (format = 'PARQUET', external_location = '...')`
        # and just monitor the query status. Here, we stream results to GCS.
        result_gcs_path = f"results/{query_id}/result.parquet"
        upload_df_to_gcs(df, result_gcs_path)
        
        logger.info(f"Query {query_id} succeeded. Results stored at {result_gcs_path}")
        
        final_state = QueryStateData(
            status=QueryStatus.SUCCEEDED,
            result_gcs_path=f"gs://{settings.GCS_RESULT_BUCKET}/{result_gcs_path}",
            finished_at=datetime.datetime.utcnow().isoformat()
        )
        update_query_state(query_id, final_state)

    except TrinoQueryError as e:
        logger.error(f"Trino query error for query_id {query_id}: {e}", exc_info=True)
        final_state = QueryStateData(
            status=QueryStatus.FAILED,
            error_message=f"Trino Error: {e.message}",
            finished_at=datetime.datetime.utcnow().isoformat()
        )
        update_query_state(query_id, final_state)
    except Exception as e:
        # Catch-all for unexpected errors (e.g., GCS connection issues, memory errors)
        logger.critical(f"Unhandled exception for query_id {query_id}: {e}", exc_info=True)
        final_state = QueryStateData(
            status=QueryStatus.FAILED,
            error_message=f"Internal Server Error: {str(e)}",
            finished_at=datetime.datetime.utcnow().isoformat()
        )
        update_query_state(query_id, final_state)

def upload_df_to_gcs(df: pd.DataFrame, gcs_path: str):
    """
    Uploads a pandas DataFrame as a Parquet file to GCS.
    """
    storage_client = storage.Client(project=settings.PROJECT_ID)
    bucket = storage_client.bucket(settings.GCS_RESULT_BUCKET)
    blob = bucket.blob(gcs_path)

    parquet_buffer = io.BytesIO()
    df.to_parquet(parquet_buffer, index=False)
    parquet_buffer.seek(0)
    
    blob.upload_from_file(parquet_buffer, content_type='application/octet-stream')
    logger.info(f"Successfully uploaded Parquet to gs://{settings.GCS_RESULT_BUCKET}/{gcs_path}")

状态管理服务与Redis交互,负责所有状态的读写。

services/state_manager.py:

import json
import redis
import datetime
from pydantic import BaseModel, Field

from core.config import get_settings
from api.models import QueryStatus

settings = get_settings()

# In a real application, use a connection pool.
# For simplicity, we create a single client instance.
redis_client = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DB,
    decode_responses=True
)

class QueryStateData(BaseModel):
    status: QueryStatus
    error_message: str | None = None
    result_gcs_path: str | None = None
    created_at: str = Field(default_factory=lambda: datetime.datetime.utcnow().isoformat())
    finished_at: str | None = None

def get_query_state(query_id: str) -> QueryStateData | None:
    """
    Retrieves the state of a query from Redis.
    """
    state_json = redis_client.get(f"query:{query_id}")
    if not state_json:
        return None
    return QueryStateData(**json.loads(state_json))

def create_initial_query_state(query_id: str) -> None:
    """
    Creates the initial PENDING state for a new query.
    """
    initial_state = QueryStateData(status=QueryStatus.PENDING)
    redis_client.set(
        f"query:{query_id}",
        initial_state.model_dump_json(),
        ex=settings.QUERY_STATE_TTL_SECONDS
    )

def update_query_state(query_id: str, new_state_data: QueryStateData):
    """
    Updates the state of an existing query. It merges with the existing state.
    """
    current_state = get_query_state(query_id)
    if not current_state:
        # Should not happen in normal flow, but good for robustness
        current_state = QueryStateData(status=QueryStatus.PENDING)

    # Merge updates
    updated_data = current_state.model_copy(update=new_state_data.model_dump(exclude_unset=True))
    
    redis_client.set(
        f"query:{query_id}",
        updated_data.model_dump_json(),
        ex=settings.QUERY_STATE_TTL_SECONDS
    )

最后,将所有组件在FastAPI应用中组装起来。

main.py:

import uuid
import datetime
from fastapi import FastAPI, BackgroundTasks, HTTPException, Response
from google.cloud import storage

from api.models import (
    QueryCreateRequest, QueryCreateResponse, 
    QueryStatusResponse, QueryStatus
)
from services.state_manager import get_query_state, create_initial_query_state
from services.trino_executor import execute_trino_query_and_store_results
from core.config import get_settings

app = FastAPI(
    title="Asynchronous Trino Query Gateway",
    description="An API to run long-running Trino queries asynchronously.",
    version="1.0.0"
)
settings = get_settings()

@app.post("/queries", response_model=QueryCreateResponse, status_code=202)
async def submit_query(
    request: QueryCreateRequest,
    background_tasks: BackgroundTasks
):
    """
    Submits a new query for asynchronous execution.
    """
    query_id = str(uuid.uuid4())
    create_initial_query_state(query_id)
    
    # This is the key part: the heavy lifting is done in the background.
    # The API call returns immediately.
    background_tasks.add_task(
        execute_trino_query_and_store_results, 
        query_id, 
        request.sql
    )
    
    return QueryCreateResponse(query_id=query_id)

@app.get("/queries/{query_id}/status", response_model=QueryStatusResponse)
async def get_query_status(query_id: str):
    """
    Retrieves the current status of a submitted query.
    """
    state = get_query_state(query_id)
    if not state:
        raise HTTPException(status_code=404, detail="Query ID not found.")
    
    return QueryStatusResponse(query_id=query_id, **state.model_dump())

@app.get("/queries/{query_id}/results", status_code=307)
async def get_query_results(query_id: str):
    """
    Provides a redirect to the query results stored in GCS.
    """
    state = get_query_state(query_id)
    if not state:
        raise HTTPException(status_code=404, detail="Query ID not found.")
    
    if state.status != QueryStatus.SUCCEEDED:
        raise HTTPException(
            status_code=400,
            detail=f"Query has not succeeded. Current status: {state.status}. "
                   f"Error: {state.error_message or 'None'}"
        )
    
    if not state.result_gcs_path:
        # This indicates a logic error in the executor
        raise HTTPException(status_code=500, detail="Query succeeded but result path is missing.")

    try:
        storage_client = storage.Client(project=settings.PROJECT_ID)
        # Parse bucket and blob name from gs:// path
        bucket_name = settings.GCS_RESULT_BUCKET
        blob_name = state.result_gcs_path.replace(f"gs://{bucket_name}/", "")
        
        blob = storage_client.bucket(bucket_name).blob(blob_name)
        
        # Generate a signed URL for the client to download directly.
        # This offloads bandwidth from our service.
        signed_url = blob.generate_signed_url(
            version="v4",
            expiration=datetime.timedelta(seconds=settings.GCS_SIGNED_URL_EXPIRATION_SECONDS),
            method="GET",
        )
        
        return Response(status_code=307, headers={"Location": signed_url})

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to generate signed URL: {e}")

部署与运维考量

  • Dockerfile: 创建一个生产级的Dockerfile,使用多阶段构建来减小镜像体积,并以非root用户运行。
  • GKE Deployment: 在Kubernetes部署清单中,必须定义资源请求和限制(requests and limits),以确保Pod的稳定性和资源调度。配置readinessProbelivenessProbe至关重要,它们能帮助Kubernetes自动管理不健康的服务实例。
  • 水平扩展: 由于状态集中存储在Redis中,FastAPI应用本身是无状态的,可以通过Kubernetes的HorizontalPodAutoscaler (HPA)轻松实现水平扩展。
  • 单元测试: 对于trino_executor,需要使用pytestunittest.mock来模拟Trino和GCS的客户端,从而在不依赖外部服务的情况下测试其核心逻辑,比如错误处理和状态转换。对于API层,FastAPI的TestClient提供了测试端点的便捷方式。

局限性与未来迭代路径

这个架构解决了最初的同步阻塞问题,但在生产环境中,它仍然存在一些可以改进的地方。

首先,BackgroundTask 是在与Web服务相同的进程中运行的。如果一个后台任务消耗了过多的CPU或内存,它仍然可能影响API的响应能力。对于负载更高、任务更重的场景,一个更稳健的方案是使用专用的任务队列系统,如Celery配合RabbitMQ或Redis作为Broker。这将实现计算资源的物理隔离。

其次,当前的轮询模型增加了客户端的复杂性,并可能产生不必要的网络流量。一个更高级的替代方案是使用WebSockets或Server-Sent Events (SSE)。客户端可以建立一个持久连接,当查询状态更新时,服务端可以主动推送通知。

最后,查询管理功能相对简单。可以增加一个DELETE /queries/{query_id}端点来尝试取消正在Trino中运行的查询(通过Trino的API),并清理Redis和GCS中的相关资源,这对于资源管理非常重要。同时,需要建立更完善的监控,例如使用Prometheus监控进行中的查询数量、成功率和平均执行时间。


  目录