> ## Documentation Index
> Fetch the complete documentation index at: https://mintlify.com/delta-io/delta-sharing/llms.txt
> Use this file to discover all available pages before exploring further.

# Change Data Feed

> Query table changes over time using Change Data Feed (CDF) in Delta Sharing

## Overview

Starting from release 0.5.0, Delta Sharing supports querying [Change Data Feed](https://docs.databricks.com/delta/delta-change-data-feed.html) (CDF) from shared tables. This enables you to track row-level changes (inserts, updates, deletes) between table versions.

<Note>
  CDF must be enabled by the data provider on the original Delta table. Once enabled and shared through Delta Sharing, recipients can query the change feed just like they would with a regular Delta table.
</Note>

## Basic CDF Queries

To read change data from a Delta Sharing table, use the `readChangeFeed` option:

<Tabs>
  <Tab title="Scala">
    ```scala theme={null}
    val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

    val changesDF = spark.read
      .format("deltaSharing")
      .option("readChangeFeed", "true")
      .option("startingVersion", "3")
      .load(tablePath)

    changesDF.show()
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    table_path = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

    changes_df = spark.read \
      .format("deltaSharing") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", "3") \
      .load(table_path)

    changes_df.show()
    ```
  </Tab>

  <Tab title="Java">
    ```java theme={null}
    String tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>";

    Dataset<Row> changesDF = spark.read()
      .format("deltaSharing")
      .option("readChangeFeed", "true")
      .option("startingVersion", "3")
      .load(tablePath);

    changesDF.show();
    ```
  </Tab>
</Tabs>

## Version Options

Control which versions to read using `startingVersion` and `endingVersion`:

### Starting Version

Specify the first version to include in the change feed:

```scala theme={null}
// Read changes from version 5 onwards
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "5")
  .load(tablePath)
```

### Ending Version

Specify the last version to include (inclusive):

```scala theme={null}
// Read changes from version 3 to version 10
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "3")
  .option("endingVersion", "10")
  .load(tablePath)
```

<AccordionGroup>
  <Accordion title="Single version changes">
    ```python theme={null}
    # Read changes from only version 5
    changes_df = spark.read \
      .format("deltaSharing") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", "5") \
      .option("endingVersion", "5") \
      .load(table_path)
    ```
  </Accordion>

  <Accordion title="Range of versions">
    ```python theme={null}
    # Read changes from version 10 to version 20
    changes_df = spark.read \
      .format("deltaSharing") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", "10") \
      .option("endingVersion", "20") \
      .load(table_path)
    ```
  </Accordion>

  <Accordion title="From version to latest">
    ```python theme={null}
    # Read changes from version 15 to the latest version
    changes_df = spark.read \
      .format("deltaSharing") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", "15") \
      .load(table_path)
    ```
  </Accordion>
</AccordionGroup>

## Change Data Schema

The change data feed includes additional metadata columns beyond the table's regular columns:

| Column              | Type      | Description                                                                  |
| ------------------- | --------- | ---------------------------------------------------------------------------- |
| `_change_type`      | String    | Type of change: `insert`, `update_preimage`, `update_postimage`, or `delete` |
| `_commit_version`   | Long      | The table version where this change occurred                                 |
| `_commit_timestamp` | Timestamp | When the change was committed                                                |

### Understanding Change Types

<Tabs>
  <Tab title="Insert">
    ```python theme={null}
    # Filter for only inserted rows
    inserts = changes_df.filter(changes_df._change_type == "insert")
    inserts.show()
    ```

    Represents newly added rows to the table.
  </Tab>

  <Tab title="Update">
    ```python theme={null}
    # Get before and after images for updates
    update_before = changes_df.filter(changes_df._change_type == "update_preimage")
    update_after = changes_df.filter(changes_df._change_type == "update_postimage")

    print("Before update:")
    update_before.show()

    print("After update:")
    update_after.show()
    ```

    * `update_preimage`: The row's state before the update
    * `update_postimage`: The row's state after the update
  </Tab>

  <Tab title="Delete">
    ```python theme={null}
    # Filter for deleted rows
    deletes = changes_df.filter(changes_df._change_type == "delete")
    deletes.show()
    ```

    Represents rows that were removed from the table.
  </Tab>
</Tabs>

## Complete Example

Here's a comprehensive example analyzing customer data changes:

```scala theme={null}
val tablePath = "/opt/profiles/prod.share#sales.crm.customers"

// Read changes from version 0 to version 5
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "0")
  .option("endingVersion", "5")
  .load(tablePath)

// Analyze change types
println("\nChange type distribution:")
changesDF
  .groupBy("_change_type")
  .count()
  .orderBy(desc("count"))
  .show()

// Find customers who were updated
println("\nUpdated customers:")
changesDF
  .filter($"_change_type".startsWith("update"))
  .select("customer_id", "name", "_change_type", "_commit_version")
  .orderBy("customer_id", "_commit_version")
  .show()

// Track changes by version
println("\nChanges per version:")
changesDF
  .groupBy("_commit_version", "_change_type")
  .count()
  .orderBy("_commit_version")
  .show()
```

## Common Use Cases

### Incremental Data Processing

```python theme={null}
# Process only new and changed records since last run
last_processed_version = 10  # Retrieve from checkpoint/state store

changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", str(last_processed_version + 1)) \
  .load(table_path)

# Process inserts and updates
new_and_updated = changes_df.filter(
  changes_df._change_type.isin(["insert", "update_postimage"])
)

# Apply transformations and write to target
new_and_updated \
  .select("customer_id", "name", "email", "updated_at") \
  .write \
  .mode("append") \
  .parquet("/output/customer_updates")
```

### Change Audit Trail

```scala theme={null}
import org.apache.spark.sql.functions._

// Create an audit log of all changes
val auditLog = changesDF
  .withColumn("audit_timestamp", current_timestamp())
  .select(
    $"customer_id",
    $"_change_type".as("operation"),
    $"_commit_version".as("version"),
    $"_commit_timestamp".as("change_time"),
    $"audit_timestamp",
    struct(col("*")).as("record_data")
  )

auditLog.write
  .mode("append")
  .partitionBy("change_time")
  .parquet("/audit/customer_changes")
```

### Data Synchronization

```python theme={null}
from pyspark.sql.functions import col, when

# Sync changes to a downstream system
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .option("endingVersion", "5") \
  .load(table_path)

# Categorize changes for different sync operations
synced_changes = changes_df.withColumn(
  "sync_action",
  when(col("_change_type") == "insert", "INSERT")
  .when(col("_change_type") == "update_postimage", "UPDATE")
  .when(col("_change_type") == "delete", "DELETE")
  .otherwise("SKIP")
)

# Process each action type
for action in ["INSERT", "UPDATE", "DELETE"]:
    action_df = synced_changes.filter(col("sync_action") == action)
    # Send to downstream system
    action_df.write.format("jdbc").save()  # Example
```

### Tracking Field-Level Changes

```scala theme={null}
import org.apache.spark.sql.functions._

// Compare before and after values for updates
val updates = changesDF
  .filter($"_change_type".startsWith("update"))

val beforeUpdates = updates
  .filter($"_change_type" === "update_preimage")
  .withColumnRenamed("email", "old_email")
  .withColumnRenamed("status", "old_status")
  .select($"customer_id", $"_commit_version", $"old_email", $"old_status")

val afterUpdates = updates
  .filter($"_change_type" === "update_postimage")
  .withColumnRenamed("email", "new_email")
  .withColumnRenamed("status", "new_status")
  .select($"customer_id", $"_commit_version", $"new_email", $"new_status")

// Join to see what changed
val fieldChanges = beforeUpdates
  .join(afterUpdates, Seq("customer_id", "_commit_version"))
  .filter(
    $"old_email" =!= $"new_email" || $"old_status" =!= $"new_status"
  )

fieldChanges.show()
```

## Filtering Change Data

You can apply filters to focus on specific changes:

```python theme={null}
# Get only deleted premium customers
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .load(table_path)

deleted_premium = changes_df.filter(
  (changes_df._change_type == "delete") & 
  (changes_df.customer_tier == "premium")
)

deleted_premium.show()
```

## Performance Considerations

<AccordionGroup>
  <Accordion title="Query specific version ranges">
    ```scala theme={null}
    // Good: Query only needed versions
    val changesDF = spark.read
      .format("deltaSharing")
      .option("readChangeFeed", "true")
      .option("startingVersion", "10")
      .option("endingVersion", "15")
      .load(tablePath)
    ```

    Avoid reading the entire change history when you only need recent changes.
  </Accordion>

  <Accordion title="Filter by change type early">
    ```scala theme={null}
    // Apply change type filters early
    val insertsOnly = changesDF
      .filter($"_change_type" === "insert")
      .select("customer_id", "name", "_commit_version")
    ```

    Filter for specific change types before other operations to reduce data volume.
  </Accordion>

  <Accordion title="Process incrementally">
    ```python theme={null}
    # Process changes in batches
    for version in range(start_version, end_version + 1, batch_size):
        batch_start = version
        batch_end = min(version + batch_size - 1, end_version)
        
        changes = spark.read \
          .format("deltaSharing") \
          .option("readChangeFeed", "true") \
          .option("startingVersion", str(batch_start)) \
          .option("endingVersion", str(batch_end)) \
          .load(table_path)
        
        # Process batch
        process_changes(changes)
    ```
  </Accordion>
</AccordionGroup>

## Handling Update Pairs

When processing updates, you often need to handle preimage/postimage pairs:

```python theme={null}
from pyspark.sql.functions import col, lead
from pyspark.sql.window import Window

# Get only the final state after updates
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .load(table_path)

# Keep inserts, update_postimage (final state), and deletes
final_state = changes_df.filter(
  col("_change_type").isin(["insert", "update_postimage", "delete"])
)

final_state.show()
```

## Error Handling

```scala theme={null}
try {
  val changesDF = spark.read
    .format("deltaSharing")
    .option("readChangeFeed", "true")
    .option("startingVersion", "3")
    .option("endingVersion", "10")
    .load(tablePath)
  
  changesDF.show()
} catch {
  case e: Exception if e.getMessage.contains("Change data feed") =>
    println("CDF is not enabled for this table. Contact the data provider.")
  case e: Exception if e.getMessage.contains("version") =>
    println("Invalid version specified. Check available versions.")
  case e: Exception =>
    println(s"Error reading change feed: ${e.getMessage}")
}
```

<Warning>
  **Common Issues:**

  * **CDF not enabled**: The data provider must enable CDF on the source table
  * **Invalid versions**: Ensure the specified versions exist in the table history
  * **Version gaps**: Some versions may not have changes if no operations occurred
</Warning>

## Next Steps

<CardGroup cols={2}>
  <Card title="Streaming" icon="water" href="/spark/streaming">
    Stream live changes from shared tables
  </Card>

  <Card title="SQL Usage" icon="database" href="/spark/sql-usage">
    Query shared tables with SQL
  </Card>
</CardGroup>
