Watermark Management
This guide will show you how to manage watermarks for your pipelines.
Incrementing Watermarks
Watermarks are used to track the progress of your pipelines.
You can increment the watermark by providing the next watermark to your Pipeline Config before syncing with the Watcher framework.
import pendulum
from watcher import Watcher, Pipeline, PipelineConfig, AddressLineage, Address
watcher = Watcher("https://api.watcher.example.com")
MY_ETL_PIPELINE_CONFIG = PipelineConfig(
pipeline=Pipeline(
name="my-etl-pipeline",
pipeline_type_name="extraction",
),
default_watermark="2024-01-01",
address_lineage=AddressLineage(
source_addresses=[
Address(
name="source_db.source_schema.source_table",
address_type_name="postgres",
address_type_group_name="database",
)
],
target_addresses=[
Address(
name="target_db.target_schema.target_table",
address_type_name="snowflake",
address_type_group_name="warehouse",
)
],
),
)
MY_ETL_PIPELINE_CONFIG.next_watermark = pendulum.now("UTC").date().to_date_string()
synced_config = watcher.sync_pipeline_config(MY_ETL_PIPELINE_CONFIG)
print(f"Pipeline synced! New Watermark: {synced_config.watermark}")
For the first run, the watermark will be the default watermark. Then, it will be the next watermark.
If you cannot set the next watermark before your pipeline execution,
you can update the next_watermark value once you have it within your pipeline run using the
update_pipeline_next_watermark() method.
Note
It is important to be aware of inclusivity / exlusivity to make sure your incremental windows do not overlap.
Accessing Watermarks
You can access the watermarks for your pipelines by using the WatcherContext.
from watcher import Watcher, PipelineConfig, WatcherContext
watcher = Watcher("https://api.watcher.example.com")
MY_ETL_PIPELINE_CONFIG.next_watermark = pendulum.now("UTC").date().to_date_string()
synced_config = watcher.sync_pipeline_config(MY_ETL_PIPELINE_CONFIG)
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active
watermark=synced_config.watermark
next_watermark=synced_config.next_watermark
)
def etl_pipeline(watcher_context: WatcherContext):
print(f"Watermark: {watcher_context.watermark}")
print(f"Next Watermark: {watcher_context.next_watermark}")
query = f"""
SELECT
*
FROM Table_A
WHERE date_column < '{watcher_context.next_watermark}'
AND date_column >= '{watcher_context.watermark}'
"""
return ETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
execution_metadata={"partition": "2025-01-01"},
)
etl_pipeline()
Watermark Data Type
The watermark data type is stored as a string in the Watcher framework to allow for flexibility. It is important to be aware of the data types you are using in your code and to properly cast it once accessed from the WatcherContext.