Execution Management
This guide will walk through execution management for your pipelines.
Logging Executions
You can log executions by using the track_pipeline_execution decorator.
This will log the execution to the Watcher framework and return the execution id.
This execution id can be used to track the execution and retrieve the metrics.
The track_pipeline_execution decorator requires the pipeline id and active status as arguments.
You can also pass in the parent_execution_id, watermark, and next_watermark as arguments to be logged.
Exception Handling
The decorator handles exceptions automatically:
Unexpected exceptions are automatically caught, logged as failures in the Watcher API, and then re-raised
Set `completed_successfully=False` for business logic failures (not exceptions)
Warning
Do not catch exceptions and return ETLResult. This silences the error, make sure to raise the exception and the decorator will automatically mark the execution as failed.
Example Usage
from watcher import Watcher, WatcherContext
watcher = Watcher("https://api.watcher.example.com")
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def etl_pipeline():
try:
# Your ETL logic here
if some_condition_fails:
return ETLResult(completed_successfully=False, execution_metadata={"error": "Data quality issues"})
return ETLResult(completed_successfully=True, inserts=100, total_rows=100)
except Exception as e:
# Don't catch exceptions - let them bubble up to the decorator
# The decorator will automatically mark the execution as failed
raise e
# Access results
result = etl_pipeline()
from watcher import Watcher, WatcherContext
watcher = Watcher("https://api.watcher.example.com")
synced_config = watcher.sync_pipeline_config(MY_ETL_PIPELINE_CONFIG)
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def my_pipeline():
# Work here
return ETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
)
my_pipeline()
Custom ETL Results
You can extend ETLResult with custom fields to return additional data from your pipeline:
from pydantic import BaseModel
from watcher import ETLResult
class CustomETLResult(ETLResult):
data_quality_score: Optional[float] = None
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def my_pipeline():
# ... do work ...
return CustomETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
data_quality_score=0.95
)
# Access custom fields
output = my_pipeline()
print(f"Quality score: {output.result.data_quality_score}")
Note
Custom fields are only accessible in your application code. Only the standard ETLResult fields (completed_successfully, inserts, updates, etc.) are sent to the Watcher API.
ETL Results
The ETLResult is a class that is required to be returned from your pipeline function
if using the track_pipeline_execution decorator.
It contains the metrics for your pipeline that are logged to the Watcher framework.
class ETLResult(BaseModel):
completed_successfully: bool
inserts: Optional[int] = Field(default=None, ge=0)
updates: Optional[int] = Field(default=None, ge=0)
soft_deletes: Optional[int] = Field(default=None, ge=0)
total_rows: Optional[int] = Field(default=None, ge=0)
execution_metadata: Optional[dict] = None
Code Example:
from watcher import ETLResult
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def my_pipeline():
# Work here
return ETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
)
Execution Results
The ExecutionResults is a class that is returned from your pipeline function. This wraps around the ETLResult class and adds the execution id. This is to ensure access to the execution id for any usage.
from watcher import ExecutionResult
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def my_pipeline() -> ExecutionResult:
# Work here
return ETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
)
output = my_pipeline()
print(output.execution_id)
print(output.result)
print(output.result.inserts)
Hierarchical Executions
For parent-child execution relationships, you have two options:
Option 1: track_child_pipeline_execution() method (Recommended)
from etl_watcher_sdk import Watcher, ETLResult, WatcherContext
watcher = Watcher(api_key="your-api-key", base_url="https://api.example.com")
def process_ticker_data(ticker: str, watcher_context: WatcherContext) -> ETLResult:
# Child function that processes individual ticker data
print(f"Processing {ticker} with watermark: {watcher_context.watermark}")
return ETLResult(
completed_successfully=True,
total_rows=100,
inserts=100,
execution_metadata={"ticker": ticker, "batch_id": "123"}
)
@watcher.track_pipeline_execution(pipeline_id=123)
def main_etl_pipeline(watcher_context: WatcherContext) -> ETLResult:
tickers = ["AAPL", "GOOGL", "MSFT"]
total_processed = 0
for ticker in tickers:
# One line - calls function and tracks as child execution
child_result = watcher.track_child_pipeline_execution(
pipeline_id=456,
active=True,
parent_execution_id=watcher_context.execution_id,
func=process_ticker_data,
ticker=ticker,
watermark=watcher_context.watermark,
next_watermark=watcher_context.next_watermark
)
total_processed += child_result.result.total_rows
print(f"Child execution {child_result.execution_id} processed {child_result.result.total_rows} rows")
return ETLResult(completed_successfully=True, total_rows=total_processed)
Note
The function that is called by track_child_pipeline_execution
must return an ETLResult or a model that inherits from ETLResult.
It also must not be decorated with track_pipeline_execution.
Option 2: Nested decorators (for small functions)
@watcher.track_pipeline_execution(pipeline_id=parent_pipeline_id, active=True)
def parent_pipeline(watcher_context: WatcherContext):
# Parent logic here
@watcher.track_pipeline_execution(
pipeline_id=child_pipeline_id,
active=True,
parent_execution_id=watcher_context.execution_id
)
def child_pipeline():
# Child logic here
return ETLResult(completed_successfully=True, total_rows=100)
child_pipeline() # Call the child
return ETLResult(completed_successfully=True, total_rows=200)
Method Parameters (Option 1):
pipeline_id(int) - ID of the child pipeline (from child config)active(bool) - Whether the pipeline is active (from child config)parent_execution_id(int) - ID of the parent execution (from watcher_context)func(callable) - Function to execute as child execution*args, **kwargs- Arguments to pass to the functionwatermark(optional) - Watermark to pass to the child execution (from child config)next_watermark(optional) - Next watermark to pass to the child execution (from child config)
Key Features:
Automatic WatcherContext injection - If your function has a watcher_context parameter, it gets injected automatically
Full ETLResult logging - Captures and logs all metrics (inserts, updates, total_rows, etc.)
Error handling - Automatically marks as failed if exception occurs
Watermark support - Child functions can access watermarks via watcher_context
Watcher Execution Context
The WatcherContext is a class that is passed to your pipeline function. It contains the execution id, pipeline id, watermark, and next watermark variables. Your function must have watcher_context as a parameter if using the WatcherContext.
from watcher import WatcherContext
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def my_pipeline(watcher_context: WatcherContext):
# Work here
print(watcher_context.execution_id)
print(watcher_context.pipeline_id)
print(watcher_context.watermark)
print(watcher_context.next_watermark)
return ETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
)
Active Flag
You can set a Pipeline’s active flag to False to skip the execution. This is normally triggered through the Watcher framework directly as the active flag is received from the Watcher API.
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active)
def my_pipeline(watcher_context: WatcherContext):
# Function IS SKIPPED if active is False
return ETLResult(
completed_successfully=True,
inserts=100,
total_rows=100,
)
my_pipeline()
Note
This can be a useful functionality to use in your pipelines to skip executions if needed.
Manual Execution Management
For advanced use cases, especially when integrating with orchestration frameworks like Airflow or Dagster, you can manually manage execution lifecycle using the following methods:
start_pipeline_execution()
Starts a new pipeline execution and returns the execution ID. This is useful when you need to start an execution without using the decorator pattern.
from watcher import Watcher
import pendulum
watcher = Watcher("https://api.watcher.example.com")
# Start execution with minimal parameters
execution_id = watcher.start_pipeline_execution(pipeline_id=123)
# Start execution with all parameters
execution_id = watcher.start_pipeline_execution(
pipeline_id=123,
start_date=pendulum.now(),
watermark="2024-01-01", # Associated with run, only metadata
next_watermark="2024-01-02", # Associated with run, only metadata
parent_execution_id=456
)
Parameters:
pipeline_id(int, required) - The ID of the pipeline to executestart_date(DateTime, optional) - Start date for the executionwatermark(str | int | DateTime | Date, optional) - Watermark value for the executionnext_watermark(str | int | DateTime | Date, optional) - Next watermark valueparent_execution_id(int, optional) - ID of the parent execution for hierarchical tracking
Returns:
int- The execution ID of the started execution
end_pipeline_execution()
Ends a pipeline execution with the provided metrics and status. This is useful when you need to manually end an execution that was started with start_pipeline_execution().
from watcher import Watcher
import pendulum
watcher = Watcher("https://api.watcher.example.com")
# End execution with minimal parameters
watcher.end_pipeline_execution(
execution_id=789,
completed_successfully=True
)
# End execution with all metrics
watcher.end_pipeline_execution(
execution_id=789,
completed_successfully=True,
end_date=pendulum.now(),
inserts=100,
updates=50,
soft_deletes=10,
total_rows=1000,
execution_metadata={"source": "database", "batch_id": "123"}
)
# Mark execution as failed
watcher.end_pipeline_execution(
execution_id=789,
completed_successfully=False,
execution_metadata={"error": "Data quality check failed"}
)
Parameters:
execution_id(int, required) - The ID of the execution to endcompleted_successfully(bool, required) - Whether the execution completed successfullyend_date(DateTime, optional) - End date for the executioninserts(int, optional) - Number of rows insertedupdates(int, optional) - Number of rows updatedsoft_deletes(int, optional) - Number of rows soft deletedtotal_rows(int, optional) - Total number of rows processedexecution_metadata(dict, optional) - Additional metadata for the execution
update_pipeline_next_watermark()
Updates the next watermark for a pipeline. This is useful for manually updating watermarks outside of the normal execution flow.
from watcher import Watcher
import pendulum
watcher = Watcher("https://api.watcher.example.com")
# Update with string watermark
watcher.update_pipeline_next_watermark(
pipeline_id=123,
next_watermark="2024-01-02"
)
# Update with integer watermark
watcher.update_pipeline_next_watermark(
pipeline_id=123,
next_watermark=20240102
)
# Update with DateTime watermark
watcher.update_pipeline_next_watermark(
pipeline_id=123,
next_watermark="2024-01-02T10:00:00"
)
Parameters:
pipeline_id(int, required) - The ID of the pipeline to updatenext_watermark(str | int | DateTime | Date, required) - The new next watermark value
Use Cases:
These manual methods are particularly useful for:
Orchestration Framework Integration: When integrating with Airflow, Dagster, or other orchestration tools where you need fine-grained control over execution lifecycle
Parent-Child Execution Tracking: When managing parent executions that span multiple child tasks
Custom Error Handling: When you need custom error handling logic before completing an execution
Watermark Management: When you need to update watermarks after a pipeline runs rather than before