CDC Incremental Data Loading with SQL Server, Apache Iceberg and Apache Spark

Learn how to create incremental an Apache Spark setup that polls data from SQL Server its Change Data Capture (CDC) feature and saves it to Apache Iceberg

CDC Incremental Data Loading with SQL Server, Apache Iceberg and Apache Spark

Imagine, you are working with Big Data and you just learned that the Medallion architecture exists and you start creating your bronze layer. Now suddenly, out of nowhere you are sinking your entire database each time over and over again making your pipelines run for hours instead of the usual minutes. Now you wonder: "There must be a more efficient way to handle incremental data loads right? "

What appeared to initially be presented as a walk in the park, turned out to be a battle between giants, a battle between ecosystem, a battle between people, all resulting in picking something that just works from a given viewpoint

💡 I would dare to say that this also formalizes "Architecture", where we are responsible of selecting components through the lens of Strategic viewpoint, helping you decide what's best for the business. For example, in my case, I tend to select components with focus on separation of concerns and lock-in avoidance.

Let's dive more into this and how we can find a future proof, reliable and trusted solution for this that can handle our Big Data needs at scale. But before we get started, let's first go over the different components and why I picked those.

Creating our components (Tech Stack )

SQL Server

The first component we need is a SQL Server Database that will hold our data. Besides just holding our data, it should also support a feature named "Change Data Capture" (CDC) that allows us to track changes made to the database and receive those. Something we rely heavily on for this!

Apache Iceberg

Since we want to perform large scale analytics on all our data (both structured and unstructured) we require a storage mechanism that can efficiently work with Flat Files. I currently chose here for Apache Iceberg, as it is created for large scale analytics on "Lakehouse" and Data Streaming and integrates with all the major platforms.

Many might wonder, why Iceberg and not Delta? Well Delta is great as well, but was created with a more commercial oriented approach, and more importantly Iceberg is great at schema evolution and partitioning. Besides this, it seems the community is also more keen on Iceberg, which is seeing a tremendous increase in users over time.

This doesn't mean that Delta is bad! But currently, seeing the current state of the ecosystem, I would like to avoid a lock-in or draw-in into a given ecosystem. This is also reflected by the larger organizations such as Apple, Microsoft, AWS and others adopting Iceberg.

💡 Databricks acquired "Tabular" recently, which was the core maintainer of Apache Iceberg (smart move). So the current future of both Delta and Iceberg is uncertain and time will tell what will happen here.

Apache Iceberg should not be seen as a storage mechanism or database. Instead it's a metadata management layer that sits on top of the data files, which we can then query later on through a dedicated and optimized query engine.

Apache Spark

Finally, a query engine... now there are MANY out there, and it's changing all the time. What also became apparent is that many of them use the commercial open-source trick, drawing you in and eventually asking you to pay for performance (which is not always a bad thing), making the choice difficult.

Seeing the customer I am working for was using Apache Spark, we thus decided to stick with it.

Personally, I think there are better alternatives popping up (hello Ray + Daft), but it currently doesn't make sense to switch and the other ecosystems still have to mature more.

Creating a CDC Implementation

Let's get started creating our actual CDC implementation. For this implementation we will thus have an orchestrator (Python) that will fetch the changes and sink them to the Iceberg maintained repository on Local Storage.

Full code: https://github.com/XavierGeerinck/PublicProjects/tree/master/Data/cdc-capture/cdc-spark

Setting up Spark

Let's create our local Spark Cluster. To do so, clone the docker-compose file from https://github.com/databricks/docker-spark-iceberg/blob/main/docker-compose.yml and spin it up:

# Download the Docker Compose file
wget https://github.com/databricks/docker-spark-iceberg/blob/main/docker-compose.yml

# Start Spark + Iceberg Cluster
docker-compose up -d
docker exec -it spark-iceberg pyspark-notebook

SQL Manager

The most difficult area that I encountered is the actual SQL CDC Resource Manager, which communicate with SQL Server, connects and gets the latest changes, whereafter it merges them into Iceberg. So let's create this first.

# Fetch SQL Server CDC changes from Remote and merge them into the local Iceberg table
# we use pyodbc for this (to avoid temporary views)
import pyodbc
import sqlalchemy as sa
import polars as pl
from urllib.parse import quote_plus
from contextlib import contextmanager

LSN_DEFAULT = "0x00000000000000000000"

class SQLResource:
    def __init__(self, host, db, username, password, last_lsn=None):
        self.host = host
        self.db = db
        self.username = username
        self.password = password

        self.engine = sa.create_engine(
            self.get_connection_string(host, db, username, password)
        )

    def get_connection_string(self, host, db, username, password):
        """Construct the connection string for SQLAlchemy."""
        pass_escaped = quote_plus(password)
        user_escaped = quote_plus(username)
        driver_escaped = quote_plus("ODBC Driver 18 for SQL Server")
        return f"mssql+pyodbc://{user_escaped}:{pass_escaped}@{host}/{db}?driver={driver_escaped}"

    @contextmanager
    def get_connection(self):
        """Get a database connection using context manager for automatic cleanup."""
        connection = self.engine.connect()

        try:
            yield connection
        finally:
            connection.close()


    def get_primary_key_columns(self, table_name: str) -> list[str]:
        """Get the primary key columns for a CDC-enabled table."""
        with self.get_connection() as connection:
            instance = self.get_capture_instance_name("dbo", table_name)

            query = sa.text("""
            SELECT column_name FROM cdc.index_columns WHERE object_id = (
                SELECT object_id FROM cdc.change_tables WHERE capture_instance = :capture_instance_name
            )
            """)

            result = connection.execute(query, {"capture_instance_name": instance})
            primary_key_columns = [row[0] for row in result]
            return primary_key_columns

    def is_cdc_enabled_for_database(self):
        """Check if CDC is enabled for the database."""
        with self.get_connection() as connection:
            query = sa.text("""
                SELECT is_cdc_enabled 
                FROM sys.databases 
                WHERE name = :db_name
            """)
            result = connection.execute(
                query, {"db_name": self.db}
            ).scalar()
            return bool(result)

    def is_cdc_enabled_for_table(self, schema_name, table_name):
        """Check if CDC is enabled for the specified table."""
        with self.get_connection() as connection:
            capture_instance_name = self.get_capture_instance_name(
                schema_name, table_name
            )

            query = sa.text(f"""
                SELECT 1
                FROM cdc.change_tables
                WHERE capture_instance = '{capture_instance_name}'
            """)

            result = connection.execute(
                query, {"schema_name": schema_name, "table_name": table_name}
            ).scalar()

            return bool(result)

    def get_capture_instance_name(self, schema_name, table_name):
        """Get the CDC capture instance name for a table."""
        return f"dbo_{table_name}"

    def get_current_lsn(self):
        """Get the current  LSN from SQL Server using native function."""
        with self.get_connection() as connection:
            query = sa.text("SELECT sys.fn_cdc_get_max_lsn()")
            return connection.execute(query).scalar()

    def get_min_lsn(self, capture_instance=None):
        """Get the minimum available LSN for a capture instance."""
        with self.get_connection() as connection:
            query = sa.text("SELECT sys.fn_cdc_get_min_lsn(:capture_instance)")
            return connection.execute(
                query, {"capture_instance": capture_instance}
            ).scalar()

    def hex_string_to_lsn(self, lsn_hex):
        """Convert a hexadecimal LSN string to binary for SQL Server functions."""
        with self.get_connection() as connection:
            if not lsn_hex or not isinstance(lsn_hex, str):
                # Return minimum LSN if input is invalid
                query = sa.text("SELECT sys.fn_cdc_get_min_lsn(NULL)")
                return connection.execute(query).scalar()

            if not lsn_hex.startswith("0x"):
                lsn_hex = f"0x{lsn_hex}"

            query = sa.text("SELECT CAST(:lsn_hex AS BINARY(10))")
            result = connection.execute(query, {"lsn_hex": lsn_hex}).scalar()

            if result is None:
                query = sa.text("SELECT sys.fn_cdc_get_min_lsn(NULL)")
                return connection.execute(query).scalar()

            return result

    def lsn_to_hex_string(self, lsn_bytes):
        """Convert a binary LSN to a hex string format."""
        if lsn_bytes is None:
            return LSN_DEFAULT

        return f"0x{lsn_bytes.hex().upper()}"

    def get_primary_key_columns(self, table_name: str) -> list[str]:
        """Get the primary key columns for a CDC-enabled table."""
        with self.get_connection() as connection:
            instance = self.get_capture_instance_name("dbo", table_name)

            query = sa.text("""
            SELECT column_name FROM cdc.index_columns WHERE object_id = (
                SELECT object_id FROM cdc.change_tables WHERE capture_instance = :capture_instance_name
            )
            """)

            result = connection.execute(query, {"capture_instance_name": instance})
            primary_key_columns = [row[0] for row in result]
            return primary_key_columns

    def get_merge_predicate(self, table_name: str) -> str:
        """Uses the primary key columns to construct a predicate for merging.
        e.g., CustomerID and Email become: source.CustomerID = target.CustomerID AND source.Email = target.Email
        """
        primary_key_columns = self.get_primary_key_columns(table_name)
        if not primary_key_columns:
            raise ValueError(f"No primary key columns found for table {table_name}")

        # Construct the merge predicate
        merge_predicate = " AND ".join(
            [f"s.{col} = t.{col}" for col in primary_key_columns]
        )
        return merge_predicate

    def get_table_changes(
            self, table_name, last_lsn=None, schema_name="dbo", chunksize=10000
        ) -> tuple[pl.DataFrame, str]:
        """Get changes from a CDC-enabled table since the last LSN.
        Uses the native SQL Server CDC function fn_cdc_get_all_changes.

        Args:
            table_name (str): The name of the table to query.
            last_lsn (str, optional): The last processed LSN. If None, a full copy is performed.
            schema_name (str, optional): The schema name of the table. Defaults to 'dbo'.
            chunksize (int, optional): Number of rows to fetch per query. Defaults to 10000.

        Returns:
            tuple: A tuple containing the DataFrame of changes and the current LSN.
        """
        try:
            with self.get_connection() as connection:
                # Check if CDC is enabled for the database and table
                if not self.is_cdc_enabled_for_database():
                    raise ValueError(
                        f"CDC is not enabled for database {self.config.database.get_value()}"
                    )

                if not self.is_cdc_enabled_for_table(schema_name, table_name):
                    raise ValueError(
                        f"CDC not enabled for table {schema_name}.{table_name}"
                    )

                # Get the capture instance name
                capture_instance = self.get_capture_instance_name(
                    schema_name, table_name
                )
                if not capture_instance:
                    raise ValueError(
                        f"Could not find CDC capture instance for {schema_name}.{table_name}"
                    )

                # Get current maximum LSN
                current_lsn = self.get_current_lsn()
                current_lsn_hex = self.lsn_to_hex_string(current_lsn)
                
                # If no last_lsn provided, we should first take a first copy of the table
                if last_lsn is None or last_lsn == LSN_DEFAULT:
                    raise ValueError(
                        f"Initial copy required for table {schema_name}.{table_name}"
                    )

                # Convert LSN hex strings to binary
                from_lsn_hex = last_lsn
                to_lsn_hex = f"0x{current_lsn.hex()}"

                # Use the native CDC function with parameterized query
                # Process in chunks to avoid memory issues with large tables
                query = sa.text(f"""
                    DECLARE @from_lsn BINARY(10), @to_lsn BINARY(10)
                    SET @from_lsn = CONVERT(BINARY(10), :from_lsn, 1)
                    SET @to_lsn = CONVERT(BINARY(10), :to_lsn, 1)
        
                    SELECT * FROM cdc.fn_cdc_get_all_changes_{capture_instance}(
                        @from_lsn, @to_lsn, 'all'
                    )
                """)

                # Use chunksize to process large result sets in batches
                changes_df = pl.read_database(
                    query,
                    connection,
                    execute_options={
                        "parameters": {"from_lsn": from_lsn_hex, "to_lsn": to_lsn_hex}
                    },
                )

                # Convert binary LSN to hex string for storage
                current_lsn_hex = self.lsn_to_hex_string(current_lsn)

                return changes_df, current_lsn_hex

        except Exception as e:
            raise RuntimeError(
                f"Database error when getting CDC changes: {str(e)}"
            ) from e

Connecting to SQL and Pulling Changes

Once we have the manager, let's now use it and get our changes:

# Connect to SQL Server
sql_resource = SQLResource(
    SQL_HOST,
    SQL_DATABASE,
    SQL_USER,
    SQL_PASS
)

last_lsn = "0x0000004400000D280005"

changes = sql_resource.get_table_changes(
    table_name=TABLE_NAME_REMOTE,
    last_lsn=last_lsn, # todo: fetch this each time and save into metadata
    schema_name="dbo"
)

# Make them available as temporary view
print(type(changes[0]))
changes_df = spark.createDataFrame(changes[0].to_pandas())
changes_df.createOrReplaceTempView("changes")

Verifying

Verifying everything we can do by just getting the changes

print(spark.sql("SELECT * FROM changes").show())

Merging the Changes

Which we merge into our Iceberg table:

# Merge the CDC changes into the iceberg table as merge
# we work with the __$operation column to determine the type of operation, which can have values:
# - Deleted (__$operation = 1),
# - Inserted (__$operation = 2)
# - Updated Before (__$operation = 3)
# - Updated After (__$operation = 4)
# https://iceberg.apache.org/docs/1.5.0/spark-writes/#merge-into
print(f"Performing merge operation on '{TABLE_NAME_LOCAL}' with predicate '{merge_predicate}'...")
spark.sql(f"""
MERGE INTO {TABLE_NAME_LOCAL} AS t
USING (SELECT * FROM changes) AS s
ON {merge_predicate}
WHEN MATCHED AND s.`__$operation` = 1 THEN DELETE
WHEN MATCHED AND s.`__$operation` IN (2, 4) THEN UPDATE SET *
-- Anything we can't match, we insert
WHEN NOT MATCHED THEN INSERT *
""")

Validating

Let's run the same query and compare results + sort. We will now see that the records have changed as expected and old records have been removed, others updated. Also note that we are not merging in the __$ columns from the CDC!

Conclusion

All the above might seem complex, but in production most of this is abstracted away for us. The most difficult part is actually integrating the CDC SQL Manager into an orchestrator (e.g., Azure Data Factory) to pull our changes in batch (or switch over to a more streaming approach if we want to work real-time). Which we can then sync through our Big Data Query Engine such as Spark into Flat Files.

Finally, what remains is to now process this flat file towards a BI application, creating a ready-to-be-consumed data warehouse.

Reference

Here you can find some amazing References that I used to come to the conclusion above.

Apache Iceberg Won the Future — What’s Next for 2025?
RBAC, CDC, Materialized Views, and More: Everything You Need to Know About Apache Iceberg in 2025.
Delta Lake vs Apache Iceberg. The Lake House Squabble.
... the real deal.