最初的矛盾点非常明确:业务分析团队需要一个API,通过它能直接查询存储在Google Cloud Storage (GCS)上的数据湖。我们选择Trino (Presto的分支) 作为查询引擎,因为它能很好地处理跨数据源的联邦查询。第一个版本的API实现简单粗暴——一个同步的FastAPI端点,接收SQL查询,直接调用trino-python-client执行,然后等待结果返回。
这个方案在查询简短、数据量小的情况下工作得尚可。但很快,随着查询复杂度的增加,问题开始集中爆发。一个需要扫描数TB Parquet文件的查询可能要执行5到15分钟。在HTTP世界里,这是一个无法接受的等待时间。负载均衡器或反向代理的超时设置(通常是30-120秒)会直接切断连接,导致客户端收到504 Gateway Timeout错误,而后台的Trino集群实际上还在消耗大量资源执行这个“无人认领”的查询。更糟糕的是,FastAPI的同步工作进程被长时间阻塞,无法处理其他请求,整个服务的吞吐量急剧下降。
我们需要一个完全不同的模型:一个异步的、非阻塞的查询提交与结果获取分离的架构。目标是让API调用立即返回,给予客户端一个查询ID,客户端可以稍后通过这个ID来轮询状态并获取最终结果。
架构决策与技术选型
要实现这个异步查询网关,核心在于解耦“提交”和“执行”。整个流程被重新设计为:
- 提交 (Submit): 客户端通过
POST /queries发送SQL查询。服务验证请求,生成一个唯一的query_id,将查询任务推送到一个后台处理系统,并立即向客户端返回query_id。 - 执行 (Execute): 后台工作进程接收到任务,开始通过Trino执行查询。同时,它会持续更新该
query_id在一个共享状态存储中的状态(如PENDING,RUNNING,SUCCEEDED,FAILED)。 - 轮询 (Poll): 客户端使用
GET /queries/{query_id}/status定期检查查询状态。 - 获取 (Fetch): 当状态变为
SUCCEEDED后,客户端通过GET /queries/{query_id}/results获取结果。对于大数据量的结果,直接在JSON响应中返回是不可行的,正确的做法是将结果存储在GCS中,并提供一个有时效性的签名URL供客户端下载。
sequenceDiagram
participant Client
participant FastAPI Gateway
participant Background Worker
participant Redis (State Store)
participant Trino
participant GCS (Result Store)
Client->>+FastAPI Gateway: POST /queries (sql="...")
FastAPI Gateway->>FastAPI Gateway: Generate query_id
FastAPI Gateway->>Redis: SET query_id:status PENDING
FastAPI Gateway->>Background Worker: Start job for query_id
FastAPI Gateway-->>-Client: 202 Accepted ({"query_id": "..."})
loop Poll Status
Client->>+FastAPI Gateway: GET /queries/{query_id}/status
FastAPI Gateway->>Redis: GET query_id:status
Redis-->>FastAPI Gateway: "RUNNING"
FastAPI Gateway-->>-Client: 200 OK ({"status": "RUNNING"})
end
Note right of Background Worker: Query execution in progress
Background Worker->>Redis: SET query_id:status RUNNING
Background Worker->>+Trino: Execute Query
Trino-->>-Background Worker: Query results
Background Worker->>+GCS: Upload results.parquet
GCS-->>-Background Worker: gcs_uri
Background Worker->>Redis: SET query_id:status SUCCEEDED, result_uri: gcs_uri
Client->>+FastAPI Gateway: GET /queries/{query_id}/status
FastAPI Gateway->>Redis: GET query_id:status
Redis-->>FastAPI Gateway: "SUCCEEDED"
FastAPI Gateway-->>-Client: 200 OK ({"status": "SUCCEEDED"})
Client->>+FastAPI Gateway: GET /queries/{query_id}/results
FastAPI Gateway->>Redis: GET query_id:result_uri
Redis-->>FastAPI Gateway: gcs_uri
FastAPI Gateway->>GCS: Generate Signed URL for gcs_uri
FastAPI Gateway-->>-Client: 302 Found (Location: signed_url)
Client->>+GCS: Download results.parquet
GCS-->>-Client: File content
基于此架构,技术栈也变得清晰:
- Web框架: 继续使用 FastAPI,其内置的
BackgroundTask是实现后台任务执行的轻量级方案,无需引入重量级的任务队列(如Celery),对于这个场景足够了。 - 状态存储: Redis (在GCP上使用Memorystore for Redis)。它的高性能键值存储特性非常适合管理查询任务的瞬时状态。
- 结果存储: **Google Cloud Storage (GCS)**。对象存储是存放大规模查询结果的理想选择,成本低廉且可扩展。
- 部署环境: **Google Kubernetes Engine (GKE)**。容器化部署可以轻松管理应用的生命周期,并根据负载进行水平扩展。
核心代码实现
我们从配置和模型定义开始。在真实项目中,配置绝不能硬编码。使用Pydantic的BaseSettings可以方便地从环境变量中加载配置,这对于在GKE中通过ConfigMap或Secrets注入配置至关重要。
core/config.py:
import logging
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""
Application settings loaded from environment variables.
"""
PROJECT_ID: str = "your-gcp-project-id"
# Redis (Memorystore) settings
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDIS_DB: int = 0
QUERY_STATE_TTL_SECONDS: int = 3600 * 24 # State expires in 24 hours
# Trino settings
TRINO_HOST: str = "localhost"
TRINO_PORT: int = 8080
TRINO_USER: str = "trino-user"
TRINO_CATALOG: str = "gcs"
TRINO_SCHEMA: str = "default"
# GCS settings
GCS_RESULT_BUCKET: str = "your-trino-query-results-bucket"
GCS_SIGNED_URL_EXPIRATION_SECONDS: int = 3600 # 1 hour
LOG_LEVEL: str = "INFO"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache()
def get_settings() -> Settings:
"""
Returns the settings instance, cached for performance.
"""
return Settings()
# Setup logging based on settings
logging.basicConfig(level=get_settings().LOG_LEVEL.upper())
logger = logging.getLogger(__name__)
接下来是API的数据模型。使用Pydantic可以确保类型安全和数据验证。
api/models.py:
from enum import Enum
from pydantic import BaseModel, Field
class QueryStatus(str, Enum):
"""
Enumeration for query job statuses.
"""
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
class QueryCreateRequest(BaseModel):
"""
Request model for submitting a new query.
"""
sql: str = Field(..., min_length=10, description="The SQL query to be executed by Trino.")
class QueryCreateResponse(BaseModel):
"""
Response model after successfully submitting a query.
"""
query_id: str = Field(..., description="The unique identifier for the submitted query.")
class QueryStatusResponse(BaseModel):
"""
Response model for checking the status of a query.
"""
query_id: str
status: QueryStatus
error_message: str | None = None
created_at: str | None = None
finished_at: str | None = None
后台任务是整个系统的引擎。这个函数将负责连接Trino,执行查询,处理异常,并将结果上传到GCS。这里的错误处理必须非常健壮。
services/trino_executor.py:
import uuid
import datetime
import logging
import io
import pandas as pd
from trino.dbapi import connect
from trino.exceptions import TrinoQueryError
from google.cloud import storage
from core.config import get_settings
from services.state_manager import update_query_state, QueryStateData, get_query_state
from api.models import QueryStatus
logger = logging.getLogger(__name__)
settings = get_settings()
def execute_trino_query_and_store_results(query_id: str, sql: str):
"""
The core background task function. It executes a Trino query,
handles its lifecycle, and stores results in GCS.
"""
start_time = datetime.datetime.utcnow()
logger.info(f"Starting background execution for query_id: {query_id}")
try:
update_query_state(query_id, QueryStateData(status=QueryStatus.RUNNING))
conn = connect(
host=settings.TRINO_HOST,
port=settings.TRINO_PORT,
user=settings.TRINO_USER,
catalog=settings.TRINO_CATALOG,
schema=settings.TRINO_SCHEMA,
)
cur = conn.cursor()
logger.info(f"Executing SQL for query_id {query_id}: {sql[:200]}...")
cur.execute(sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
if not rows:
logger.warning(f"Query {query_id} returned no results.")
# Even with no results, it's a success. Handle this case gracefully.
df = pd.DataFrame(columns=columns)
else:
df = pd.DataFrame(rows, columns=columns)
# A common mistake is to handle large results in memory.
# For truly large scale, one should use Trino's `CREATE TABLE AS SELECT ... WITH (format = 'PARQUET', external_location = '...')`
# and just monitor the query status. Here, we stream results to GCS.
result_gcs_path = f"results/{query_id}/result.parquet"
upload_df_to_gcs(df, result_gcs_path)
logger.info(f"Query {query_id} succeeded. Results stored at {result_gcs_path}")
final_state = QueryStateData(
status=QueryStatus.SUCCEEDED,
result_gcs_path=f"gs://{settings.GCS_RESULT_BUCKET}/{result_gcs_path}",
finished_at=datetime.datetime.utcnow().isoformat()
)
update_query_state(query_id, final_state)
except TrinoQueryError as e:
logger.error(f"Trino query error for query_id {query_id}: {e}", exc_info=True)
final_state = QueryStateData(
status=QueryStatus.FAILED,
error_message=f"Trino Error: {e.message}",
finished_at=datetime.datetime.utcnow().isoformat()
)
update_query_state(query_id, final_state)
except Exception as e:
# Catch-all for unexpected errors (e.g., GCS connection issues, memory errors)
logger.critical(f"Unhandled exception for query_id {query_id}: {e}", exc_info=True)
final_state = QueryStateData(
status=QueryStatus.FAILED,
error_message=f"Internal Server Error: {str(e)}",
finished_at=datetime.datetime.utcnow().isoformat()
)
update_query_state(query_id, final_state)
def upload_df_to_gcs(df: pd.DataFrame, gcs_path: str):
"""
Uploads a pandas DataFrame as a Parquet file to GCS.
"""
storage_client = storage.Client(project=settings.PROJECT_ID)
bucket = storage_client.bucket(settings.GCS_RESULT_BUCKET)
blob = bucket.blob(gcs_path)
parquet_buffer = io.BytesIO()
df.to_parquet(parquet_buffer, index=False)
parquet_buffer.seek(0)
blob.upload_from_file(parquet_buffer, content_type='application/octet-stream')
logger.info(f"Successfully uploaded Parquet to gs://{settings.GCS_RESULT_BUCKET}/{gcs_path}")
状态管理服务与Redis交互,负责所有状态的读写。
services/state_manager.py:
import json
import redis
import datetime
from pydantic import BaseModel, Field
from core.config import get_settings
from api.models import QueryStatus
settings = get_settings()
# In a real application, use a connection pool.
# For simplicity, we create a single client instance.
redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
decode_responses=True
)
class QueryStateData(BaseModel):
status: QueryStatus
error_message: str | None = None
result_gcs_path: str | None = None
created_at: str = Field(default_factory=lambda: datetime.datetime.utcnow().isoformat())
finished_at: str | None = None
def get_query_state(query_id: str) -> QueryStateData | None:
"""
Retrieves the state of a query from Redis.
"""
state_json = redis_client.get(f"query:{query_id}")
if not state_json:
return None
return QueryStateData(**json.loads(state_json))
def create_initial_query_state(query_id: str) -> None:
"""
Creates the initial PENDING state for a new query.
"""
initial_state = QueryStateData(status=QueryStatus.PENDING)
redis_client.set(
f"query:{query_id}",
initial_state.model_dump_json(),
ex=settings.QUERY_STATE_TTL_SECONDS
)
def update_query_state(query_id: str, new_state_data: QueryStateData):
"""
Updates the state of an existing query. It merges with the existing state.
"""
current_state = get_query_state(query_id)
if not current_state:
# Should not happen in normal flow, but good for robustness
current_state = QueryStateData(status=QueryStatus.PENDING)
# Merge updates
updated_data = current_state.model_copy(update=new_state_data.model_dump(exclude_unset=True))
redis_client.set(
f"query:{query_id}",
updated_data.model_dump_json(),
ex=settings.QUERY_STATE_TTL_SECONDS
)
最后,将所有组件在FastAPI应用中组装起来。
main.py:
import uuid
import datetime
from fastapi import FastAPI, BackgroundTasks, HTTPException, Response
from google.cloud import storage
from api.models import (
QueryCreateRequest, QueryCreateResponse,
QueryStatusResponse, QueryStatus
)
from services.state_manager import get_query_state, create_initial_query_state
from services.trino_executor import execute_trino_query_and_store_results
from core.config import get_settings
app = FastAPI(
title="Asynchronous Trino Query Gateway",
description="An API to run long-running Trino queries asynchronously.",
version="1.0.0"
)
settings = get_settings()
@app.post("/queries", response_model=QueryCreateResponse, status_code=202)
async def submit_query(
request: QueryCreateRequest,
background_tasks: BackgroundTasks
):
"""
Submits a new query for asynchronous execution.
"""
query_id = str(uuid.uuid4())
create_initial_query_state(query_id)
# This is the key part: the heavy lifting is done in the background.
# The API call returns immediately.
background_tasks.add_task(
execute_trino_query_and_store_results,
query_id,
request.sql
)
return QueryCreateResponse(query_id=query_id)
@app.get("/queries/{query_id}/status", response_model=QueryStatusResponse)
async def get_query_status(query_id: str):
"""
Retrieves the current status of a submitted query.
"""
state = get_query_state(query_id)
if not state:
raise HTTPException(status_code=404, detail="Query ID not found.")
return QueryStatusResponse(query_id=query_id, **state.model_dump())
@app.get("/queries/{query_id}/results", status_code=307)
async def get_query_results(query_id: str):
"""
Provides a redirect to the query results stored in GCS.
"""
state = get_query_state(query_id)
if not state:
raise HTTPException(status_code=404, detail="Query ID not found.")
if state.status != QueryStatus.SUCCEEDED:
raise HTTPException(
status_code=400,
detail=f"Query has not succeeded. Current status: {state.status}. "
f"Error: {state.error_message or 'None'}"
)
if not state.result_gcs_path:
# This indicates a logic error in the executor
raise HTTPException(status_code=500, detail="Query succeeded but result path is missing.")
try:
storage_client = storage.Client(project=settings.PROJECT_ID)
# Parse bucket and blob name from gs:// path
bucket_name = settings.GCS_RESULT_BUCKET
blob_name = state.result_gcs_path.replace(f"gs://{bucket_name}/", "")
blob = storage_client.bucket(bucket_name).blob(blob_name)
# Generate a signed URL for the client to download directly.
# This offloads bandwidth from our service.
signed_url = blob.generate_signed_url(
version="v4",
expiration=datetime.timedelta(seconds=settings.GCS_SIGNED_URL_EXPIRATION_SECONDS),
method="GET",
)
return Response(status_code=307, headers={"Location": signed_url})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to generate signed URL: {e}")
部署与运维考量
- Dockerfile: 创建一个生产级的Dockerfile,使用多阶段构建来减小镜像体积,并以非root用户运行。
- GKE Deployment: 在Kubernetes部署清单中,必须定义资源请求和限制(
requestsandlimits),以确保Pod的稳定性和资源调度。配置readinessProbe和livenessProbe至关重要,它们能帮助Kubernetes自动管理不健康的服务实例。 - 水平扩展: 由于状态集中存储在Redis中,FastAPI应用本身是无状态的,可以通过Kubernetes的
HorizontalPodAutoscaler(HPA)轻松实现水平扩展。 - 单元测试: 对于
trino_executor,需要使用pytest和unittest.mock来模拟Trino和GCS的客户端,从而在不依赖外部服务的情况下测试其核心逻辑,比如错误处理和状态转换。对于API层,FastAPI的TestClient提供了测试端点的便捷方式。
局限性与未来迭代路径
这个架构解决了最初的同步阻塞问题,但在生产环境中,它仍然存在一些可以改进的地方。
首先,BackgroundTask 是在与Web服务相同的进程中运行的。如果一个后台任务消耗了过多的CPU或内存,它仍然可能影响API的响应能力。对于负载更高、任务更重的场景,一个更稳健的方案是使用专用的任务队列系统,如Celery配合RabbitMQ或Redis作为Broker。这将实现计算资源的物理隔离。
其次,当前的轮询模型增加了客户端的复杂性,并可能产生不必要的网络流量。一个更高级的替代方案是使用WebSockets或Server-Sent Events (SSE)。客户端可以建立一个持久连接,当查询状态更新时,服务端可以主动推送通知。
最后,查询管理功能相对简单。可以增加一个DELETE /queries/{query_id}端点来尝试取消正在Trino中运行的查询(通过Trino的API),并清理Redis和GCS中的相关资源,这对于资源管理非常重要。同时,需要建立更完善的监控,例如使用Prometheus监控进行中的查询数量、成功率和平均执行时间。