Incremental Loading in dlt¶
Strategy for efficiently loading only new/changed data.
Why Incremental?¶
| Approach | Speed | Cost | Latency |
|---|---|---|---|
| Full Reload | Slow | High | High |
| Incremental | Fast | Low | Low |
Incremental loading queries only new/changed records since last run.
Common Strategies¶
1. Timestamp (Last Modified)¶
@dlt.resource
def fetch_records(api_key: str, state):
last_run = dlt.current.source_state()
start_time = last_run.get("last_timestamp", "2024-01-01")
records = api.get_records(
modified_after=start_time
)
for record in records:
yield record
state["last_timestamp"] = record["modified_at"]
2. Cursor (ID-based)¶
@dlt.resource
def fetch_records(api_key: str):
state = dlt.current.source_state()
last_id = state.get("last_id", 0)
records = api.get_records(after_id=last_id)
for record in records:
yield record
state["last_id"] = record["id"]
3. Change Data Capture (CDC)¶
For databases with WAL/CDC:
@dlt.resource
def fetch_changes(state):
lsn = state.get("lsn", "0/0")
changes = db.get_wal_changes(from_lsn=lsn)
for change in changes:
yield change
state["lsn"] = change["lsn"]
State Management¶
dlt tracks state per source:
# Initialize state
state = {}
# Update during extraction
state["last_run"] = datetime.now()
state["last_id"] = max_id
# dlt persists between runs
Testing Incremental¶
def test_incremental_loading():
# First run
state = {}
records1 = list(fetch_records(state=state))
assert len(records1) > 0
# Second run (should get only new)
records2 = list(fetch_records(state=state))
# Verify only new records loaded