构建结合MySQL元数据过滤与TensorFlow重排名的Pinecone向量检索增强LLM生成管道


在构建一个依赖海量内部文档的知识库问答系统时,我们面临的第一个挑战不是LLM本身,而是检索的精准度。单纯的向量相似度检索(RAG)在处理混合了非结构化文本和结构化元数据的复杂查询时,表现得力不从心。例如,用户查询“查找一下去年第三季度由‘基础设施’团队发布的关于Kubernetes成本优化的技术方案”,一个标准的RAG系统几乎无法处理“去年第三季度”和“‘基础设施’团队发布”这类精确的元数据约束。这会返回大量语义相关但约束条件不符的噪声文档,严重污染提供给LLM的上下文,导致生成错误的答案。

我们的目标是构建一个能够同时理解语义相似性和精确元数据过滤的混合检索系统,并将检索到的候选文档通过一个更精细的模型进行重排序,最终将最优上下文提交给LLM。这需要一个能协同工作的技术栈,而不是单一的向量数据库能解决的问题。

架构决策:在不同检索范式间权衡

最初,我们评估了两种主流的混合检索实现方案。

方案A: 串行过滤-检索模式 (Filter-then-Fetch)

这是最直观的思路。它将检索过程分解为两个独立的串行步骤:

  1. 元数据过滤: 首先,根据查询中的结构化信息(如日期、作者、标签),在关系型数据库(MySQL)中执行一次SELECT查询,获取所有满足条件的document_id列表。
  2. 向量检索: 将上一步得到的document_id列表作为filter条件,传递给Pinecone的query接口。Pinecone会在这些指定的向量中执行相似度搜索。
sequenceDiagram
    participant User
    participant AppServer as 应用服务
    participant MySQL
    participant Pinecone

    User->>AppServer: 发起混合查询
    AppServer->>MySQL: 1. SELECT doc_id WHERE metadata matches
    MySQL-->>AppServer: 返回匹配的 doc_id 列表
    AppServer->>Pinecone: 2. Vector search with doc_id filter
    Pinecone-->>AppServer: 返回Top-K向量
    AppServer->>User: 整合结果

优势:

  • 逻辑清晰,实现简单。
  • 过滤精度极高,因为完全依赖MySQL的事务性和精确查询能力。

劣势:

  • 性能瓶颈: 这是该方案的致命伤。当MySQL返回的doc_id列表非常庞大时(例如,数万甚至数十万条),将其作为过滤器传递给Pinecone会导致严重的性能下降。Pinecone内部需要对这个巨大的ID列表进行处理,这几乎摧毁了其基于HNSW等索引实现的高性能近邻搜索优势。
  • 召回率问题: 如果用户的查询意图主要在语义上,而元数据条件只是一个弱约束,这种严格的“先过滤”可能会过早地淘汰掉许多潜在的优质结果。

在真实项目中,一个查询可能匹配成千上万份文档的元数据,这使得方案A在我们的延迟要求下(p99 < 500ms)完全不可行。

方案B: 独立检索-合并模式 (Fetch-then-Merge)

这个方案放弃了串行流程,改为并行执行两种检索,然后对结果进行合并。

  1. 并行检索:
    • 向量通道: 将用户查询的语义部分(例如“Kubernetes成本优化”)编码为向量,在Pinecone中进行无过滤的Top-K搜索。
    • 关键词/元数据通道: 提取查询中的关键词和元数据,在MySQL中进行FULLTEXT搜索或WHERE条件查询,获取另一组匹配的document_id
  2. 结果合并与重排: 将两个通道返回的document_id集合进行合并、去重,然后获取这些文档的完整内容。由于两个通道的排序依据完全不同,需要一个独立的重排(Rerank)阶段来对合并后的候选集进行统一打分和排序。
graph TD
    subgraph "应用服务 (Node.js)"
        A[接收混合查询] --> B{并行检索};
        B -- 语义部分 --> C[Pinecone 向量检索];
        B -- 元数据/关键词 --> D[MySQL 全文/字段检索];
        C --> E{结果合并与去重};
        D --> E;
        E --> F[候选文档获取];
        F --> G[TensorFlow 重排模型];
        G --> H[构建最终上下文];
        H --> I[调用 LLM];
    end
    I --> J[返回生成结果];

    style C fill:#d5f5e3
    style D fill:#d5f5e3
    style G fill:#f5e3d5

优势:

  • 高性能: 两个检索通道并行执行,充分利用了不同系统的优势。Pinecone的向量搜索不受元数据过滤影响,可以保持极低的延迟。
  • 高召回率: 确保了语义相关和元数据匹配的文档都能被召回,为后续的重排阶段提供了更丰富的候选集。

劣劣势:

  • 架构复杂度高: 需要实现结果合并、去重以及一个独立的重排服务。
  • 重排模型依赖: 系统的最终效果高度依赖于重排模型的质量。一个劣质的重排模型可能会毁掉优秀的召回结果。

最终选择:

我们最终选择了方案B。尽管它更复杂,但在生产环境中,性能和召回率的平衡至关重要。串行方案的性能瓶颈是架构级别的,难以优化,而并行方案的复杂性是工程级别的,可以通过引入重排模型和健壮的测试来解决。这种架构允许我们独立地优化向量检索、元数据检索和重排三个环节。

核心实现概览

我们的后端服务采用Node.js构建,它负责编排整个检索和生成流程。

1. 数据写入与同步

数据写入是整个系统的基础。每份文档都需要同时存入MySQL和Pinecone,并通过一个统一的uuid进行关联。

// src/services/ingestion.service.ts
import { Pinecone } from '@pinecone-database/pinecone';
import { TensorFlowEmbedder } from './embedding.service'; // 封装TensorFlow.js或调用Python服务
import { MySQLClient } from './mysql.client';

interface Document {
  uuid: string;
  content: string;
  author: string;
  team: 'infrastructure' | 'application' | 'data';
  createdAt: Date;
  tags: string[];
}

export class IngestionService {
  private pinecone: Pinecone;
  private embedder: TensorFlowEmbedder;
  private mysql: MySQLClient;

  constructor() {
    // 初始化客户端,配置从环境变量读取
    this.pinecone = new Pinecone({ apiKey: process.env.PINECONE_API_KEY! });
    this.embedder = new TensorFlowEmbedder(process.env.EMBEDDING_MODEL_URL!);
    this.mysql = new MySQLClient();
  }

  /**
   * 处理单个文档的写入,确保事务性或最终一致性
   * 在真实项目中,这里应该有重试和错误处理队列
   */
  public async ingestDocument(doc: Document): Promise<void> {
    try {
      // 1. 生成向量
      const vector = await this.embedder.createEmbedding(doc.content);

      // 2. 写入 Pinecone
      const index = this.pinecone.index('knowledge-base');
      await index.upsert([{
        id: doc.uuid,
        values: vector,
        metadata: {
          // Pinecone的metadata主要用于简单的后过滤,这里可以存一些冗余信息
          team: doc.team,
          createdAt: doc.createdAt.getTime(), // 存为timestamp
        },
      }]);

      // 3. 写入 MySQL, 存储完整信息和用于全文检索的内容
      // 注意:tags应存为关联表或JSON类型以便查询
      const query = `
        INSERT INTO documents (uuid, content, author, team, created_at, tags_json)
        VALUES (?, ?, ?, ?, ?, ?)
        ON DUPLICATE KEY UPDATE
        content = VALUES(content), author = VALUES(author), team = VALUES(team), created_at = VALUES(created_at), tags_json = VALUES(tags_json);
      `;
      await this.mysql.execute(query, [
        doc.uuid,
        doc.content,
        doc.author,
        doc.team,
        doc.createdAt,
        JSON.stringify(doc.tags),
      ]);

      console.log(`Successfully ingested document ${doc.uuid}`);

    } catch (error) {
      console.error(`Failed to ingest document ${doc.uuid}:`, error);
      // 生产级代码需要加入死信队列等机制
      throw error;
    }
  }
}

2. 混合检索与编排

这是系统的核心逻辑,由一个QueryOrchestrator服务来负责。

// src/services/orchestrator.service.ts
import { Pinecone } from '@pinecone-database/pinecone';
import { TensorFlowEmbedder } from './embedding.service';
import { MySQLClient, DocumentRecord } from './mysql.client';
import { Reranker } from './reranker.service'; // 调用TensorFlow Serving

interface HybridQuery {
  semanticQuery: string;
  team?: string;
  author?: string;
  dateFrom?: Date;
  dateTo?: Date;
}

export class QueryOrchestrator {
  // ... constructor for clients initialization

  public async query(query: HybridQuery): Promise<DocumentRecord[]> {
    // 1. 并行执行向量检索和元数据检索
    const [pineconeResults, mysqlResults] = await Promise.all([
      this.searchVector(query.semanticQuery),
      this.searchMetadata(query)
    ]);

    // 2. 合并和去重候选文档ID
    const candidateIds = this.mergeAndDeduplicate(
      pineconeResults.map(r => r.id),
      mysqlResults.map(r => r.uuid)
    );

    if (candidateIds.length === 0) {
      return [];
    }

    // 3. 从MySQL批量获取候选文档的完整内容
    const candidateDocs = await this.mysql.getDocumentsByIds(candidateIds);

    // 4. 使用TensorFlow模型进行重排序
    const rerankedDocs = await this.reranker.rerank(query.semanticQuery, candidateDocs);

    // 5. 返回排序后的Top-N文档用于构建LLM上下文
    return rerankedDocs.slice(0, 5);
  }

  private async searchVector(queryText: string): Promise<{id: string, score: number}[]> {
    const vector = await this.embedder.createEmbedding(queryText);
    const index = this.pinecone.index('knowledge-base');
    const results = await index.query({
      vector: vector,
      topK: 20, // 召回更多候选者给reranker
    });
    return results.matches?.map(match => ({ id: match.id, score: match.score || 0 })) || [];
  }

  private async searchMetadata(query: HybridQuery): Promise<DocumentRecord[]> {
    let sql = 'SELECT uuid FROM documents WHERE 1=1';
    const params: (string | Date)[] = [];

    if (query.team) {
      sql += ' AND team = ?';
      params.push(query.team);
    }
    if (query.author) {
      sql += ' AND author = ?';
      params.push(query.author);
    }
    if (query.dateFrom) {
      sql += ' AND created_at >= ?';
      params.push(query.dateFrom);
    }
    // ... 其他条件
    sql += ' LIMIT 20'; // 同样限制召回数量

    return this.mysql.query<DocumentRecord[]>(sql, params);
  }

  private mergeAndDeduplicate(arr1: string[], arr2: string[]): string[] {
    const combined = [...arr1, ...arr2];
    return [...new Set(combined)];
  }
}

3. 重排模型 (Reranker)

重排阶段我们使用了一个基于BERT的Cross-Encoder模型,通过TensorFlow Serving部署。它接收[query, document_content]对,并输出一个相关性分数。Node.js服务通过gRPC或REST API调用它。

// src/services/reranker.service.ts
import axios from 'axios';
import { DocumentRecord } from './mysql.client';

// 这是一个简化的客户端,用于与TensorFlow Serving交互
export class Reranker {
  private readonly servingUrl: string;

  constructor(servingUrl: string) {
    this.servingUrl = servingUrl;
    if (!this.servingUrl) {
      throw new Error('TENSORFLOW_SERVING_URL is not defined.');
    }
  }

  /**
   * 对候选文档列表进行重排序
   * @param query - 原始的用户查询
   * @param documents - 候选文档对象数组
   * @returns 按相关性分数降序排列的文档数组
   */
  public async rerank(query: string, documents: DocumentRecord[]): Promise<DocumentRecord[]> {
    // TensorFlow Serving的RESTful API期望特定的输入格式
    const instances = documents.map(doc => ({
      query: query,
      document: doc.content.substring(0, 512) // 截断以符合模型输入限制
    }));

    try {
      const response = await axios.post(this.servingUrl, { instances });
      const scores: number[] = response.data.predictions.map((p: any) => p[0]);

      const scoredDocuments = documents.map((doc, index) => ({
        ...doc,
        relevanceScore: scores[index],
      }));

      // 按重排分数降序排序
      scoredDocuments.sort((a, b) => b.relevanceScore - a.relevanceScore);
      
      return scoredDocuments;

    } catch (error) {
      console.error('Reranking request failed:', error);
      // 如果重排服务失败,应有降级策略,例如直接返回未排序的文档
      return documents;
    }
  }
}

4. 单元与集成测试 (Jest)

这套复杂的编排逻辑极易出错,必须有完善的测试覆盖。我们使用Jest,并对外部依赖(Pinecone, MySQL, TensorFlow Serving, LLM API)进行mock。

// src/services/orchestrator.service.test.ts
import { QueryOrchestrator } from './orchestrator.service';
import { Pinecone } from '@pinecone-database/pinecone';
import { MySQLClient } from '../mysql.client';
import { TensorFlowEmbedder } from '../embedding.service';
import { Reranker } from '../reranker.service';

// Mock所有外部依赖
jest.mock('@pinecone-database/pinecone');
jest.mock('../mysql.client');
jest.mock('../embedding.service');
jest.mock('../reranker.service');

describe('QueryOrchestrator', () => {
  let orchestrator: QueryOrchestrator;
  let mockPineconeIndex: any;
  let mockMysql: jest.Mocked<MySQLClient>;
  let mockEmbedder: jest.Mocked<TensorFlowEmbedder>;
  let mockReranker: jest.Mocked<Reranker>;

  beforeEach(() => {
    // 为每个测试重置mocks
    mockPineconeIndex = { query: jest.fn() };
    (Pinecone.prototype.index as jest.Mock).mockReturnValue(mockPineconeIndex);
    
    mockMysql = new MySQLClient() as jest.Mocked<MySQLClient>;
    mockEmbedder = new TensorFlowEmbedder('') as jest.Mocked<TensorFlowEmbedder>;
    mockReranker = new Reranker('') as jest.Mocked<Reranker>;

    orchestrator = new QueryOrchestrator();
    // 手动注入mock实例,在真实应用中会使用依赖注入容器
    (orchestrator as any).pinecone = new Pinecone({ apiKey: 'fake-key' });
    (orchestrator as any).mysql = mockMysql;
    (orchestrator as any).embedder = mockEmbedder;
    (orchestrator as any).reranker = mockReranker;
  });

  test('should correctly merge and rerank results from Pinecone and MySQL', async () => {
    // --- Arrange ---
    const query = { semanticQuery: 'test query', team: 'application' };

    // Mock Embedder
    mockEmbedder.createEmbedding.mockResolvedValue([0.1, 0.2, 0.3]);

    // Mock Pinecone: returns two unique docs
    mockPineconeIndex.query.mockResolvedValue({
      matches: [
        { id: 'pinecone-doc-1', score: 0.9 },
        { id: 'pinecone-doc-2', score: 0.8 },
      ],
    });

    // Mock MySQL: returns one unique and one overlapping doc
    mockMysql.query.mockResolvedValue([
      { uuid: 'mysql-doc-1' },
      { uuid: 'pinecone-doc-2' }, // Overlapping doc
    ]);

    // Mock fetching full documents
    const fullDocs = [
      { uuid: 'pinecone-doc-1', content: 'content 1', team: 'application' },
      { uuid: 'pinecone-doc-2', content: 'content 2', team: 'application' },
      { uuid: 'mysql-doc-1', content: 'content 3', team: 'application' },
    ];
    mockMysql.getDocumentsByIds.mockResolvedValue(fullDocs);

    // Mock Reranker: reverses the order
    mockReranker.rerank.mockImplementation(async (q, docs) => {
        return [...docs].reverse();
    });

    // --- Act ---
    const results = await orchestrator.query(query);

    // --- Assert ---
    // 1. 确认MySQL的`getDocumentsByIds`被以正确的、去重后的ID列表调用
    expect(mockMysql.getDocumentsByIds).toHaveBeenCalledWith(
      expect.arrayContaining(['pinecone-doc-1', 'pinecone-doc-2', 'mysql-doc-1'])
    );
    expect(mockMysql.getDocumentsByIds.mock.calls[0][0].length).toBe(3);

    // 2. 确认Reranker被调用
    expect(mockReranker.rerank).toHaveBeenCalledWith('test query', fullDocs);

    // 3. 确认最终结果是经过重排模型处理后的顺序
    expect(results.map(r => r.uuid)).toEqual([
      'mysql-doc-1',
      'pinecone-doc-2',
      'pinecone-doc-1',
    ]);
  });
});

架构的扩展性与局限性

这个架构虽然解决了最初的问题,但并非银弹。一个明显的局限是,重排阶段引入了额外的延迟。对于需要极低延迟的场景,可能需要一个更轻量级的、甚至无需调用外部服务的重排模型。此外,TensorFlow Serving集群的维护本身也是一项成本。

另一个挑战在于如何优化两个检索通道的召回数量(topK)。设置得太高会增加重排阶段的负载,太低则可能丢失重要文档。这需要基于真实查询日志进行持续的调优。

未来的迭代方向可以考虑引入一个学习排序(Learning to Rank, LTR)模型作为重排器,它可以融合向量相似度分数、MySQL的全文检索引擎(如BM25)分数以及其他业务特征,从而获得比单一Cross-Encoder更精准的排序结果。同时,可以探索使用更快的向量索引,或者将部分热点元数据冗余到Pinecone中,以实现更高效的过滤,但这又回到了对数据一致性和成本的权衡上。


  目录