Address Management
This guide will show you how to manage addresses for your pipelines.
Address Lineage Configuration
The Address Lineage Configuration is an optional configuration you can add to PipelineConfig. This is used to store the address lineage for your pipeline.
from watcher import AddressLineage, Address, Pipeline, PipelineConfig
MY_PIPELINE_CONFIG = PipelineConfig(
pipeline=Pipeline(
name="my-pipeline",
pipeline_type_name="extraction",
),
default_watermark="2024-01-01",
address_lineage=AddressLineage(
source_addresses=[
Address(
name="source_db.source_schema.source_table",
address_type_name="postgres",
address_type_group_name="database",
)
],
target_addresses=[
Address(
name="target_db.target_schema.target_table",
address_type_name="snowflake",
address_type_group_name="warehouse",
)
],
),
)
Loading Address Lineage
Address Lineage is loaded automatically when the pipeline is synced if the load_lineage flag is set to True. The load_lineage flag is created as True upon first creation of the pipeline. After a successful execution, the load_lineage flag is set to False. It can be set to True again through the Watcher framework API.
from watcher import Watcher, PipelineConfig
watcher = Watcher("https://api.watcher.example.com")
synced_config = watcher.sync_pipeline_config(MY_PIPELINE_CONFIG)