利用 Clojure 构建从 HBase 到 Delta Lake 的异构 CDC 数据同步架构


一个常见的架构困境是,如何将一个为高并发、低延迟随机读写优化的在线事务处理(OLTP)系统的数据,高效、可靠地同步到一个为复杂分析查询优化的在线分析处理(OLAP)系统。我们的核心业务依赖于 HBase,它承载着每秒数十万次的点查和范围扫描,但其存储模型(LSM-Tree)和缺乏 SQL 支持,使得在其上直接进行大规模分析变得不切实际且危险。

最初的解决方案是夜间批量导出。通过 MapReduce 或 Spark 任务,将 HBase 表的快照导出为 Parquet 文件,加载到数据仓库。这种方法的弊端显而易见:数据延迟高达24小时,导出过程对线上 HBase 集群造成巨大的 I/O 压力,且无法捕获期间发生的数据删除或更新,导致数据一致性问题频发。

第二代方案尝试引入消息队列。应用层在写入 HBase 的同时,向 Kafka 发送一份数据。这种双写模式看似解决了延迟问题,却引入了更严重的分布式系统一致性问题。网络分区、应用崩溃或消息队列的短暂不可用,都可能导致 HBase 和 Kafka 的数据不一致。我们无法保证这两个异构系统写入的原子性,这在生产环境中是不可接受的。

最终的技术决策是构建一个基于变更数据捕获(Change Data Capture, CDC)的实时同步管道。其核心思想是直接消费数据源的事务日志,以保证捕获所有的数据变更,且对源系统侵入性最低。对于 HBase 而言,其内置的跨集群复制(Replication)机制,本质上就是一种基于预写日志(WAL)的 CDC 实现。我们可以通过实现一个自定义的 ReplicationEndpoint,将自己伪装成一个从集群,从而获得一个可靠、有序、完整的 HBase 变更事件流。

目标架构如下:

graph TD
    subgraph "生产 HBase 集群"
        HMaster[HMaster]
        RS1[RegionServer 1]
        RS2[RegionServer 2]
        RS3[RegionServer 3]
    end

    subgraph "CDC 处理层 (Clojure 应用)"
        direction LR
        CustomEndpoint[Custom ReplicationEndpoint]
        EventProcessor[事件处理器]
        SchemaManager[Schema 管理器]
        DeltaWriter[Delta Lake 写入器]
    end

    subgraph "数据湖 (S3/HDFS)"
        DeltaTable[Delta Lake Table]
    end
    
    RS1 -- WAL Edits --> CustomEndpoint
    RS2 -- WAL Edits --> CustomEndpoint
    RS3 -- WAL Edits --> CustomEndpoint
    
    CustomEndpoint -- "HBase WAL.Entry[]" --> EventProcessor
    EventProcessor -- "解析/转换" --> SchemaManager
    SchemaManager -- "携带 Schema" --> DeltaWriter
    DeltaWriter -- "MERGE INTO" --> DeltaTable
    
    style DeltaTable fill:#228B22,stroke:#333,stroke-width:2px,color:#fff

选择 Clojure 作为实现语言并非偶然。作为一门托管在 JVM 上的 Lisp 方言,它能无缝调用 HBase 和 Delta Lake 的原生 Java API。其函数式编程范式、对不可变数据结构的强调,以及强大的并发原语,非常适合构建可预测、易于推理的数据处理管道。相比于冗长的 Java 或复杂的 Scala,Clojure 能用更少的代码清晰地表达业务逻辑。

步骤一:配置 HBase 启用自定义复制

首先,HBase 必须启用复制功能。在 hbase-site.xml 中确保以下配置存在:

<property>
  <name>hbase.replication</name>
  <value>true</value>
</property>

接下来,我们需要将编译好的 Clojure 应用(打包成一个 uberjar)分发到 HBase 集群所有 RegionServer 节点的 classpath 下,通常是 $HBASE_HOME/lib/ 目录。

然后,通过 HBase Shell 添加一个复制对等体(peer),关键在于 ENDPOINT_CLASSNAME 参数,它指向我们用 Clojure 实现的自定义类。

# HBase Shell
# 'delta_lake_sink' 是我们为这个复制 peer 起的名字
# 'com.mycompany.cdc.HBaseClojureEndpoint' 是我们实现的 Java 接口的类名
add_peer 'delta_lake_sink', ENDPOINT_CLASSNAME => 'com.mycompany.cdc.HBaseClojureEndpoint'

最后,为需要同步的表启用复制。

# 为名为 'user_profiles' 的表启用复制
enable_table_replication 'user_profiles'

完成这些步骤后,HBase RegionServer 会加载我们的 Clojure 代码,并在 user_profiles 表有任何数据变更时,将相关的 WAL edits 推送到我们的 HBaseClojureEndpoint 中。

步骤二:实现 Clojure ReplicationEndpoint

Clojure 的核心是 com.mycompany.cdc.core 命名空间。它需要实现 HBase 的 BaseReplicationEndpoint 抽象类。Clojure 的 Java 互操作性通过 gen-classreify 提供了优雅的实现方式。

项目结构 deps.edn

{:paths ["src"]
 :deps {org.clojure/clojure {:mvn/version "1.11.1"}
        org.apache.hbase/hbase-server {:mvn/version "2.4.9"}
        io.delta/delta-core_2.12 {:mvn/version "2.1.0"}
        org.apache.spark/spark-sql_2.12 {:mvn/version "3.2.1"}
        org.apache.hadoop/hadoop-client {:mvn/version "3.2.2"}
        org.clojure/tools.logging {:mvn/version "1.2.4"}
        ch.qos.logback/logback-classic {:mvn/version "1.2.11"}}
 :aliases
 {:uberjar {:extra-deps {org.clojure/tools.build {:git/tag "v0.9.4" :git/sha "76b7445"}}
            :main-opts ["-m" "build"]}}}

核心实现 src/com/mycompany/cdc/core.clj:

(ns com.mycompany.cdc.core
  (:require [clojure.tools.logging :as log]
            [com.mycompany.cdc.delta :as delta]
            [com.mycompany.cdc.parser :as parser])
  (:import (org.apache.hadoop.conf.Configuration)
           (org.apache.hadoop.fs Path)
           (org.apache.hbase.replication BaseReplicationEndpoint)
           (org.apache.hbase.wal WAL$Entry)
           (java.util UUID List))
  (:gen-class
   :name com.mycompany.cdc.HBaseClojureEndpoint
   :extends org.apache.hbase.replication.BaseReplicationEndpoint
   :main false))

(defonce ^:private delta-writer-state (atom nil))
(defonce ^:private hbase-conf (atom nil))

(defn -start [this]
  (log/info "Starting Clojure HBase Replication Endpoint...")
  (try
    (let [conf (.getContext this)]
      (reset! hbase-conf conf)
      (let [delta-table-path (.get "hbase.cdc.delta.table.path" conf)]
        (if-not delta-table-path
          (throw (IllegalStateException. "Missing config: hbase.cdc.delta.table.path"))
          (do
            (log/info "Initializing Delta Lake writer for path:" delta-table-path)
            (reset! delta-writer-state (delta/init-delta-writer delta-table-path))))))
    (log/info "Clojure Endpoint started successfully.")
    (.superStart this))
  (catch Exception e
    (log/error e "Failed to start Clojure Endpoint")
    (.notifyFailed this e)))

(defn -stop [this]
  (log/info "Stopping Clojure HBase Replication Endpoint...")
  (try
    ;; 在生产环境中,这里可能需要更优雅的资源清理
    (reset! delta-writer-state nil)
    (log/info "Clojure Endpoint stopped.")
    (.superStop this))
  (catch Exception e
    (log/error e "Error while stopping Clojure Endpoint")
    (.notifyFailed this e)))

(defn -replicate [this ^BaseReplicationEndpoint$ReplicateContext context]
  (let [entries (.getEntries context)
        wal-group (.getWalGroup context)]
    (try
      (log/info (format "Received %d WAL entries from WAL group %s" (count entries) wal-group))
      
      ;; 核心处理逻辑
      (let [parsed-data (parser/parse-wal-entries entries)
            valid-ops (filter #(not (empty? (:operations %))) parsed-data)]
        
        (when (seq valid-ops)
          (log/info (format "Parsed %d valid operations to process." (count valid-ops)))
          (delta/write-to-delta! @delta-writer-state valid-ops)))
      
      ;; 成功处理后返回 true
      true)
    (catch Exception e
      (log/error e "Critical error during replication batch processing. WAL group:" wal-group)
      ;; 返回 false 会触发 HBase 的重试机制
      false)))

;; BaseReplicationEndpoint 需要的其他方法实现
(defn -getPeerUUID [this]
  (UUID/randomUUID))

(defn -canReplicateToSameCluster [this] false)

;; WalEntryFilter 的实现,可以用于过滤某些 WAL 条目
(defn -getWalEntryfilter [this] nil)

(defn -isReplicationObserver [this] false)

这个 core.clj 文件定义了一个 HBaseClojureEndpoint 类。

  • gen-class 宏负责生成一个继承自 BaseReplicationEndpoint 的 Java 类。
  • -start-stop 是生命周期方法。我们在 -start 中初始化 Delta Lake writer 的连接和状态。一个常见的错误是在这里做过多阻塞操作,这会延迟 RegionServer 的启动。
  • -replicate 是最核心的方法。HBase 会批量地将 WAL.Entry 对象推送过来。我们的任务就是解析这些二进制日志条目,转换为结构化数据,然后写入 Delta Lake。

步骤三:解析 HBase WAL 条目

HBase 的 WAL Entry 结构复杂,内含多个 KeyValue 对象,代表了对单元格(Cell)的原子操作。我们需要将这些底层的 KeyValue 对象转换为更易于处理的 Clojure map。

src/com/mycompany/cdc/parser.clj:

(ns com.mycompany.cdc.parser
  (:require [clojure.tools.logging :as log])
  (:import (org.apache.hadoop.hbase CellUtil)
           (org.apache.hadoop.hbase.util Bytes)
           (org.apache.hadoop.hbase.wal WAL$Entry)
           (org.apache.hadoop.hbase.KeyValue$Type)))

(defn- kv->map
  "将单个 HBase KeyValue 对象转换为 Clojure map。
   这是数据转换的核心,需要根据业务约定来解析。
   例如,这里的约定是列族 'd' 存储数据,列名就是字段名。"
  [kv]
  (try
    (let [row-key (Bytes/toString (CellUtil/cloneRow kv))
          family (Bytes/toString (CellUtil/cloneFamily kv))
          qualifier (Bytes/toString (CellUtil/cloneQualifier kv))
          value (Bytes/toString (CellUtil/cloneValue kv))
          op-type (KeyValue$Type/codeToType (.getTypeByte kv))]
      (when (= "d" family) ; 假设所有业务数据都在 'd' 列族下
        {:row-key row-key
         :op-type (if (= op-type KeyValue$Type/DeleteFamily) :delete-row :upsert)
         :column (keyword qualifier)
         :value value
         :timestamp (.getTimestamp kv)}))
    (catch Exception e
      (log/error e "Failed to parse KeyValue object.")
      nil)))

(defn- group-by-row-key
  "将同一行键的所有变更聚合在一起。"
  [kvs]
  (->> kvs
       (map kv->map)
       (remove nil?)
       (group-by :row-key)
       (map (fn [[row-key ops]]
              (let [delete-op (some #(when (= :delete-row (:op-type %)) %) ops)]
                (if delete-op
                  ;; 如果存在行删除标记,整个操作被视为删除
                  {:row-key row-key
                   :operation :delete
                   :timestamp (:timestamp delete-op)}
                  ;; 否则是更新/插入
                  {:row-key row-key
                   :operation :upsert
                   :timestamp (apply max (map :timestamp ops))
                   :columns (->> ops
                                 (map (juxt :column :value))
                                 (into {}))}))
              ))
       (vec)))

(defn parse-wal-entries
  "解析一批 WAL$Entry 对象。"
  [^java.util.List entries]
  (->> entries
       (mapcat (fn [^WAL$Entry entry]
                 (let [table-name (.toString (.getTableName (.getKey entry)))
                       kvs (.getEdit entry)]
                   ;; 我们只关心特定表的变更
                   (when (= table-name "user_profiles")
                     (group-by-row-key (.getCells kvs))))))
       (remove nil?)
       (vec)))

这里的解析逻辑是高度业务相关的。我们做了一个假设:所有业务数据都存储在名为 d 的列族下,列名即为字段名。kv->map 函数负责将底层的 KeyValue 转换为包含 row-key, column, value 等信息的 map。group-by-row-key 则将属于同一行的所有变更聚合起来,形成一个完整的行级操作(UPSERT 或 DELETE)。这是一个常见的错误来源:不正确地处理部分更新,导致目标表数据不一致。

步骤四:幂等地写入 Delta Lake

Delta Lake 的 MERGE 操作是实现数据同步的关键。它提供了原子的 UPSERTDELETE 功能,这保证了即使我们的 CDC 进程重试(例如,在失败后 HBase 重新推送同一批 WAL edits),最终数据状态也是正确的,即实现了幂等性。

src/com/mycompany/cdc/delta.clj:

(ns com.mycompany.cdc.delta
  (:require [clojure.tools.logging :as log])
  (:import (io.delta.tables DeltaTable)
           (org.apache.spark.sql SparkSession)))

(defn- get-spark-session []
  ;; 生产环境中,SparkSession 配置需要精细调优
  ;; 特别是 S3/HDFS 的访问凭证和连接参数
  (-> (SparkSession/builder)
      (.appName "HBase-CDC-to-Delta")
      (.master "local[*]") ; 嵌入式运行,真实项目应连接到 Spark 集群
      (.config "spark.sql.extensions" "io.delta.sql.DeltaSparkSessionExtension")
      (.config "spark.sql.catalog.spark_catalog" "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      (.getOrCreate)))

(defrecord DeltaWriter [spark-session table-path])

(defn init-delta-writer [table-path]
  (log/info "Creating Spark session and DeltaTable handle for" table-path)
  (->DeltaWriter (get-spark-session) table-path))

(defn- convert-to-dataframe
  "将 Clojure map 转换为 Spark DataFrame。"
  [{:keys [spark-session]} ops]
  (let [spark (.sparkContext spark-session)
        rows (map (fn [op]
                    ;; 将 map 转换为 Spark Row
                    ;; 这里需要一个固定的 schema 结构
                    (let [base-map {:__row_key (:row-key op)
                                  :__op_type (name (:operation op))
                                  :__timestamp_ms (:timestamp op)}]
                      (merge base-map (:columns op))))
                  ops)
        rdd (.parallelize spark rows)]
    (.createDataFrame spark-session rdd java.util.Map)))

(defn write-to-delta!
  "将解析后的操作写入 Delta Lake 表。"
  [{:keys [spark-session table-path] :as writer} parsed-ops]
  (when (seq parsed-ops)
    (try
      (let [delta-table (DeltaTable/forPath spark-session table-path)
            source-df (convert-to-dataframe writer parsed-ops)]
        
        (log/info (str "Merging " (.count source-df) " records into Delta table at " table-path))
        
        (-> delta-table
            (.as "target")
            (.merge source-df "target.__row_key = source.__row_key")
            
            ;; 处理删除操作
            (.whenMatched "source.__op_type = 'delete'")
            (.delete)
            
            ;; 处理更新/插入操作
            (.whenMatched "source.__op_type = 'upsert'")
            (.updateExpr {"name" "source.name", "email" "source.email", "age" "source.age"}) ; 手动指定列映射
            
            ;; 处理新插入操作
            (.whenNotMatched)
            (.insertExpr {"__row_key" "source.__row_key", "name" "source.name", "email" "source.email", "age" "source.age"})
            
            ;; 启用 Schema 自动演进,这是应对 HBase 动态增减列的关键
            (.withSchemaEvolution)
            (.execute))
        
        (log/info "Merge operation completed successfully."))
      (catch Exception e
        (log/error e "Failed to write batch to Delta Lake.")
        ;; 抛出异常,让上层 replication 循环进行重试
        (throw e)))))

这段代码的核心是 write-to-delta! 函数。它首先将 Clojure 数据结构转换为 Spark DataFrame,然后调用 delta-table.merge API。

  • merge(sourceDF, "target.primary_key = source.primary_key") 定义了合并的关联条件。
  • whenMatched("condition").delete() / updateExpr(...) 定义了匹配到记录时的操作。
  • whenNotMatched().insertExpr(...) 定义了未匹配到记录时的插入操作。
  • .withSchemaEvolution() 是一个至关重要的选项。它允许 Delta Lake 在源数据(来自 HBase 的新列)包含目标表所没有的列时,自动地、事务性地为目标表添加新列,从而优雅地处理了上游 Schema 的变更。

架构的局限性与未来展望

该架构虽然解决了数据同步的实时性和一致性问题,但并非没有缺点。

首先,性能瓶颈。HBase 的复制是单线程的,每个 RegionServer 对一个下游 peer 只有一个同步线程。如果单个表的写入量过大,可能会造成复制延迟。优化方案可以是实现更复杂的 ReplicationEndpoint,内部使用线程池来并行处理来自不同 WAL Group 的数据,但这会显著增加实现的复杂度。

其次,运维成本。这套系统涉及 HBase、Clojure 应用和 Delta Lake/Spark,任何一环出现问题都需要相应的专业知识来排查。日志、监控和告警必须做得非常完善,以确保数据管道的健康。例如,需要监控 HBase 的复制延迟指标 replication.ageOfLastShippedOp

最后,对事务的支持有限。HBase 的行级事务能被正确捕获,但跨行事务则不行。任何依赖于 HBase 客户端原子性 batch 操作的逻辑,在 Delta Lake 侧会表现为多个独立的行操作,这可能会破坏业务层面的事务完整性。

未来的迭代方向可能包括:引入一个持久化的消息队列(如 Kafka)作为 ReplicationEndpoint 和 Delta Lake writer 之间的缓冲区,以增强系统的解耦和削峰填谷能力;或者探索使用 Flink SQL 来替代自定义的 Clojure 代码进行流式处理和写入,以利用其更成熟的状态管理和容错机制。


  目录