Execution Management

This guide will walk through execution management for your pipelines.

Logging Executions

You can log executions by using the track_pipeline_execution decorator. This will log the execution to the Watcher framework and return the execution id. This execution id can be used to track the execution and retrieve the metrics.

The track_pipeline_execution decorator requires the pipeline id and active status as arguments. You can also pass in the parent_execution_id, watermark, and next_watermark as arguments to be logged.

Exception Handling

The decorator handles exceptions automatically:

  • Unexpected exceptions are automatically caught, logged as failures in the Watcher API, and then re-raised

  • Set `completed_successfully=False` for business logic failures (not exceptions)

Warning

Do not catch exceptions and return ETLResult. This silences the error, make sure to raise the exception and the decorator will automatically mark the execution as failed.

Example Usage

from watcher import Watcher, WatcherContext

watcher = Watcher("https://api.watcher.example.com")

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def etl_pipeline():
    try:
        # Your ETL logic here
        if some_condition_fails:
            return ETLResult(completed_successfully=False, execution_metadata={"error": "Data quality issues"})
        return ETLResult(completed_successfully=True, inserts=100, total_rows=100)
    except Exception as e:
        # Don't catch exceptions - let them bubble up to the decorator
        # The decorator will automatically mark the execution as failed
        raise e

# Access results
result = etl_pipeline()
from watcher import Watcher, WatcherContext

watcher = Watcher("https://api.watcher.example.com")

synced_config = watcher.sync_pipeline_config(MY_ETL_PIPELINE_CONFIG)

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def my_pipeline():


    # Work here

    return ETLResult(
        completed_successfully=True,
        inserts=100,
        total_rows=100,
    )

my_pipeline()

Custom ETL Results

You can extend ETLResult with custom fields to return additional data from your pipeline:

from pydantic import BaseModel
from watcher import ETLResult

class CustomETLResult(ETLResult):
    data_quality_score: Optional[float] = None

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def my_pipeline():

    # ... do work ...

    return CustomETLResult(
        completed_successfully=True,
        inserts=100,
        total_rows=100,
        data_quality_score=0.95
    )

# Access custom fields
output = my_pipeline()
print(f"Quality score: {output.result.data_quality_score}")

Note

Custom fields are only accessible in your application code. Only the standard ETLResult fields (completed_successfully, inserts, updates, etc.) are sent to the Watcher API.

ETL Results

The ETLResult is a class that is required to be returned from your pipeline function if using the track_pipeline_execution decorator. It contains the metrics for your pipeline that are logged to the Watcher framework.

class ETLResult(BaseModel):
    completed_successfully: bool
    inserts: Optional[int] = Field(default=None, ge=0)
    updates: Optional[int] = Field(default=None, ge=0)
    soft_deletes: Optional[int] = Field(default=None, ge=0)
    total_rows: Optional[int] = Field(default=None, ge=0)
    execution_metadata: Optional[dict] = None

Code Example:

from watcher import ETLResult

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def my_pipeline():


    # Work here

    return ETLResult(
        completed_successfully=True,
        inserts=100,
        total_rows=100,
    )

Execution Results

The ExecutionResults is a class that is returned from your pipeline function. This wraps around the ETLResult class and adds the execution id. This is to ensure access to the execution id for any usage.

from watcher import ExecutionResult

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def my_pipeline() -> ExecutionResult:

    # Work here

    return ETLResult(
            completed_successfully=True,
            inserts=100,
            total_rows=100,
        )

output = my_pipeline()
print(output.execution_id)
print(output.result)
print(output.result.inserts)

Hierarchical Executions

For parent-child execution relationships, you have two options:

Option 1: track_child_pipeline_execution() method (Recommended)

from etl_watcher_sdk import Watcher, ETLResult, WatcherContext

watcher = Watcher(api_key="your-api-key", base_url="https://api.example.com")

def process_ticker_data(ticker: str, watcher_context: WatcherContext) -> ETLResult:
    # Child function that processes individual ticker data
    print(f"Processing {ticker} with watermark: {watcher_context.watermark}")

    return ETLResult(
        completed_successfully=True,
        total_rows=100,
        inserts=100,
        execution_metadata={"ticker": ticker, "batch_id": "123"}
    )

@watcher.track_pipeline_execution(pipeline_id=123)
def main_etl_pipeline(watcher_context: WatcherContext) -> ETLResult:
    tickers = ["AAPL", "GOOGL", "MSFT"]
    total_processed = 0

    for ticker in tickers:
        # One line - calls function and tracks as child execution
        child_result = watcher.track_child_pipeline_execution(
            pipeline_id=456,
            active=True,
            parent_execution_id=watcher_context.execution_id,
            func=process_ticker_data,
            ticker=ticker,
            watermark=watcher_context.watermark,
            next_watermark=watcher_context.next_watermark
        )

        total_processed += child_result.result.total_rows
        print(f"Child execution {child_result.execution_id} processed {child_result.result.total_rows} rows")

    return ETLResult(completed_successfully=True, total_rows=total_processed)

Note

The function that is called by track_child_pipeline_execution must return an ETLResult or a model that inherits from ETLResult. It also must not be decorated with track_pipeline_execution.

Option 2: Nested decorators (for small functions)

@watcher.track_pipeline_execution(pipeline_id=parent_pipeline_id, active=True)
def parent_pipeline(watcher_context: WatcherContext):
    # Parent logic here

    @watcher.track_pipeline_execution(
        pipeline_id=child_pipeline_id,
        active=True,
        parent_execution_id=watcher_context.execution_id
    )
    def child_pipeline():
        # Child logic here
        return ETLResult(completed_successfully=True, total_rows=100)

    child_pipeline()  # Call the child
    return ETLResult(completed_successfully=True, total_rows=200)

Method Parameters (Option 1):

  • pipeline_id (int) - ID of the child pipeline (from child config)

  • active (bool) - Whether the pipeline is active (from child config)

  • parent_execution_id (int) - ID of the parent execution (from watcher_context)

  • func (callable) - Function to execute as child execution

  • *args, **kwargs - Arguments to pass to the function

  • watermark (optional) - Watermark to pass to the child execution (from child config)

  • next_watermark (optional) - Next watermark to pass to the child execution (from child config)

Key Features:

  • Automatic WatcherContext injection - If your function has a watcher_context parameter, it gets injected automatically

  • Full ETLResult logging - Captures and logs all metrics (inserts, updates, total_rows, etc.)

  • Error handling - Automatically marks as failed if exception occurs

  • Watermark support - Child functions can access watermarks via watcher_context

Watcher Execution Context

The WatcherContext is a class that is passed to your pipeline function. It contains the execution id, pipeline id, watermark, and next watermark variables. Your function must have watcher_context as a parameter if using the WatcherContext.

from watcher import WatcherContext

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def my_pipeline(watcher_context: WatcherContext):

    # Work here

    print(watcher_context.execution_id)
    print(watcher_context.pipeline_id)
    print(watcher_context.watermark)
    print(watcher_context.next_watermark)

    return ETLResult(
        completed_successfully=True,
        inserts=100,
        total_rows=100,
    )

Active Flag

You can set a Pipeline’s active flag to False to skip the execution. This is normally triggered through the Watcher framework directly as the active flag is received from the Watcher API.

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active)
def my_pipeline(watcher_context: WatcherContext):

    # Function IS SKIPPED if active is False

    return ETLResult(
        completed_successfully=True,
        inserts=100,
        total_rows=100,
    )

my_pipeline()

Note

This can be a useful functionality to use in your pipelines to skip executions if needed.

Manual Execution Management

For advanced use cases, especially when integrating with orchestration frameworks like Airflow or Dagster, you can manually manage execution lifecycle using the following methods:

start_pipeline_execution()

Starts a new pipeline execution and returns the execution ID. This is useful when you need to start an execution without using the decorator pattern.

from watcher import Watcher
import pendulum

watcher = Watcher("https://api.watcher.example.com")

# Start execution with minimal parameters
execution_id = watcher.start_pipeline_execution(pipeline_id=123)

# Start execution with all parameters
execution_id = watcher.start_pipeline_execution(
    pipeline_id=123,
    start_date=pendulum.now(),
    watermark="2024-01-01",  # Associated with run, only metadata
    next_watermark="2024-01-02",  # Associated with run, only metadata
    parent_execution_id=456
)

Parameters:

  • pipeline_id (int, required) - The ID of the pipeline to execute

  • start_date (DateTime, optional) - Start date for the execution

  • watermark (str | int | DateTime | Date, optional) - Watermark value for the execution

  • next_watermark (str | int | DateTime | Date, optional) - Next watermark value

  • parent_execution_id (int, optional) - ID of the parent execution for hierarchical tracking

Returns:

  • int - The execution ID of the started execution

end_pipeline_execution()

Ends a pipeline execution with the provided metrics and status. This is useful when you need to manually end an execution that was started with start_pipeline_execution().

from watcher import Watcher
import pendulum

watcher = Watcher("https://api.watcher.example.com")

# End execution with minimal parameters
watcher.end_pipeline_execution(
    execution_id=789,
    completed_successfully=True
)

# End execution with all metrics
watcher.end_pipeline_execution(
    execution_id=789,
    completed_successfully=True,
    end_date=pendulum.now(),
    inserts=100,
    updates=50,
    soft_deletes=10,
    total_rows=1000,
    execution_metadata={"source": "database", "batch_id": "123"}
)

# Mark execution as failed
watcher.end_pipeline_execution(
    execution_id=789,
    completed_successfully=False,
    execution_metadata={"error": "Data quality check failed"}
)

Parameters:

  • execution_id (int, required) - The ID of the execution to end

  • completed_successfully (bool, required) - Whether the execution completed successfully

  • end_date (DateTime, optional) - End date for the execution

  • inserts (int, optional) - Number of rows inserted

  • updates (int, optional) - Number of rows updated

  • soft_deletes (int, optional) - Number of rows soft deleted

  • total_rows (int, optional) - Total number of rows processed

  • execution_metadata (dict, optional) - Additional metadata for the execution

update_pipeline_next_watermark()

Updates the next watermark for a pipeline. This is useful for manually updating watermarks outside of the normal execution flow.

from watcher import Watcher
import pendulum

watcher = Watcher("https://api.watcher.example.com")

# Update with string watermark
watcher.update_pipeline_next_watermark(
    pipeline_id=123,
    next_watermark="2024-01-02"
)

# Update with integer watermark
watcher.update_pipeline_next_watermark(
    pipeline_id=123,
    next_watermark=20240102
)

# Update with DateTime watermark
watcher.update_pipeline_next_watermark(
    pipeline_id=123,
    next_watermark="2024-01-02T10:00:00"
)

Parameters:

  • pipeline_id (int, required) - The ID of the pipeline to update

  • next_watermark (str | int | DateTime | Date, required) - The new next watermark value

Use Cases:

These manual methods are particularly useful for:

  • Orchestration Framework Integration: When integrating with Airflow, Dagster, or other orchestration tools where you need fine-grained control over execution lifecycle

  • Parent-Child Execution Tracking: When managing parent executions that span multiple child tasks

  • Custom Error Handling: When you need custom error handling logic before completing an execution

  • Watermark Management: When you need to update watermarks after a pipeline runs rather than before