Orchestration Integration
The ETL Watcher SDK provides seamless integration with popular orchestration frameworks like Dagster and Airflow through the OrchestratedETL class.
Overview
The orchestration integration allows you to:
Use your existing ETL functions with orchestration frameworks
Automatically inject orchestration context into your ETL metadata
Maintain compatibility with both Dagster and Airflow
Track ETL execution metrics through the Watcher API
Basic Usage
The OrchestratedETL class provides a unified interface for running ETL pipelines with orchestration context:
from watcher import OrchestratedETL, ETLResult, PipelineConfig, Pipeline
# Define your ETL function
def my_etl_function(watcher_context: WatcherContext, **kwargs):
# Your ETL logic here
data = extract_data()
processed = transform_data(data)
return ETLResult(
completed_successfully=True,
inserts=len(processed),
total_rows=len(data),
execution_metadata={"source": "database"}
)
# Create orchestrated ETL
pipeline_config = PipelineConfig(
pipeline=Pipeline(
name="my_pipeline",
pipeline_type_name="batch_etl"
)
)
etl = OrchestratedETL(
watcher_url="https://api.watcher.example.com",
pipeline_config=pipeline_config
)
result = etl.execute_etl(my_etl_function)
print(result)
Dagster Integration
For Dagster, you can use the OrchestratedETL class directly in your ops:
from dagster import op, job, OpExecutionContext
from watcher import OrchestratedETL, ETLResult, PipelineConfig, Pipeline
# Define your ETL function
def extract_and_transform(watcher_context: WatcherContext, **kwargs) -> ETLResult:
# Your ETL logic
data = extract_data()
processed = transform_data(data)
return ETLResult(
completed_successfully=True,
inserts=len(processed),
total_rows=len(data)
)
# Create Dagster op
@op
def my_etl_op(context: OpExecutionContext):
etl = OrchestratedETL("https://api.watcher.com", pipeline_config)
return etl.execute_etl(extract_and_transform, context)
Airflow Integration
For Airflow, you can use the OrchestratedETL class in your tasks:
from airflow import DAG
from airflow.operators.python import PythonOperator
from watcher import OrchestratedETL, ETLResult, PipelineConfig, Pipeline
# Define your ETL function
def extract_and_transform(watcher_context: WatcherContext, **kwargs) -> ETLResult:
# Your ETL logic
data = extract_data()
processed = transform_data(data)
return ETLResult(
completed_successfully=True,
inserts=len(processed),
total_rows=len(data)
)
# Create Airflow task
def my_etl_task(**context):
etl = OrchestratedETL("https://api.watcher.com", pipeline_config)
return etl.execute_etl(extract_and_transform, context)
Advanced Usage
Watermark Management
The orchestration integration supports watermark management for incremental processing. Watermarks are automatically available in the watcher_context:
def my_etl_function(watcher_context: WatcherContext) -> ETLResult:
# Access current watermark for incremental processing
current_watermark = watcher_context.watermark
next_watermark = watcher_context.next_watermark
# Process data from watermark onwards
data = extract_data_since(current_watermark, next_watermark)
# The next watermark is automatically set by the SDK
# based on the pipeline configuration
return ETLResult(
completed_successfully=True,
inserts=len(data),
total_rows=len(data)
)
# Execute ETL with orchestration context
result = etl.execute_etl(my_etl_function, orchestration_context)
Context Detection
The OrchestratedETL class automatically detects orchestration context from:
Dagster:
OpExecutionContextobjects withrun_idandpartition_keyAirflow: Dictionary or object contexts with
dag_idandtask_id
Metadata Injection
Orchestration context is automatically injected into your ETL metadata:
# Dagster context automatically detected
from dagster import op, OpExecutionContext
@op
def my_dagster_op(context: OpExecutionContext):
etl = OrchestratedETL("https://api.watcher.com", pipeline_config)
return etl.execute_etl(my_etl_function, context)
# Airflow context automatically detected
from airflow.decorators import task
@task
def my_airflow_task(**context):
etl = OrchestratedETL("https://api.watcher.com", pipeline_config)
return etl.execute_etl(my_etl_function, context)
Error Handling
The orchestration integration handles errors gracefully:
Invalid Return Types: Raises
ValueErrorif ETL function doesn’t returnETLResultUnknown Contexts: Issues warnings for unrecognized orchestration contexts
API Failures: Propagates Watcher API errors as expected
Best Practices
Always Return ETLResult: Your ETL functions must return
ETLResultor a subclassUse Type Hints: Add type hints for better IDE support and error detection
Handle Errors: Implement proper error handling in your ETL functions
Test Integration: Test your orchestration integration with mock contexts
Monitor Metrics: Use the injected orchestration metadata for monitoring
Example: Complete Dagster Pipeline with Parent Execution Tracking
Track a parent execution that spans the entire workflow with child executions:
from dagster import op, job, OpExecutionContext
from watcher import OrchestratedETL, ETLResult, PipelineConfig, Pipeline
# Parent pipeline config
parent_config = PipelineConfig(
pipeline=Pipeline(
name="user_data_pipeline",
pipeline_type_name="etl_workflow"
)
)
# Separate pipeline configs for each stage
extract_config = PipelineConfig(
pipeline=Pipeline(
name="user_extract_pipeline",
pipeline_type_name="extract"
)
)
transform_config = PipelineConfig(
pipeline=Pipeline(
name="user_transform_pipeline",
pipeline_type_name="transform"
)
)
load_config = PipelineConfig(
pipeline=Pipeline(
name="user_load_pipeline",
pipeline_type_name="load"
)
)
# Dagster ops with parent execution tracking
@op
def start_parent_execution(context: OpExecutionContext):
"""Start parent execution and return ID."""
etl = OrchestratedETL("https://api.watcher.com", parent_config)
return etl.start_parent_execution()
@op
def extract_op(context: OpExecutionContext, parent_execution_id: int):
etl = OrchestratedETL("https://api.watcher.com", extract_config)
return etl.execute_etl(extract_users, context, parent_execution_id=parent_execution_id)
@op
def transform_op(context: OpExecutionContext, parent_execution_id: int, extract_result: ETLResult):
etl = OrchestratedETL("https://api.watcher.com", transform_config)
return etl.execute_etl(transform_users, context, parent_execution_id=parent_execution_id)
@op
def load_op(context: OpExecutionContext, parent_execution_id: int, transform_result: ETLResult):
etl = OrchestratedETL("https://api.watcher.com", load_config)
return etl.execute_etl(load_users, context, parent_execution_id=parent_execution_id)
@op
def end_parent_execution(context: OpExecutionContext, parent_execution_id: int):
"""End the parent execution."""
etl = OrchestratedETL("https://api.watcher.com", parent_config)
summary_result = ETLResult(
completed_successfully=True,
execution_metadata={
"execution_date": context.run_id
}
)
etl.end_parent_execution(parent_execution_id, summary_result)
return summary_result
@job
def user_data_pipeline():
parent_id = start_parent_execution()
extract_result = extract_op(parent_id)
transform_result = transform_op(parent_id, extract_result)
load_result = load_op(parent_id, transform_result)
end_parent_execution(parent_id)
Example: Complete Airflow DAG with Parent Execution Tracking
Track a parent execution that spans the entire workflow with child executions:
from airflow.decorators import dag, task
def dag_failure_callback(context):
"""DAG failure callback to end parent execution on failure."""
parent_execution_id = context['task_instance'].xcom_pull(task_ids='start_parent_execution')
if parent_execution_id:
etl = OrchestratedETL("https://api.watcher.com", parent_config)
failure_result = ETLResult(
completed_successfully=False
)
etl.end_parent_execution(parent_execution_id, failure_result)
@dag(schedule_interval='@daily', on_failure_callback=dag_failure_callback)
def etl_pipeline():
@task
def start_parent_execution():
"""Start parent execution and return ID."""
etl = OrchestratedETL("https://api.watcher.com", parent_config)
return etl.start_parent_execution() # Returns the parent execution ID
@task
def extract_task(parent_execution_id, **context):
etl = OrchestratedETL("https://api.watcher.com", extract_config)
return etl.execute_etl(extract_function, context, parent_execution_id=parent_execution_id)
@task
def transform_task(parent_execution_id, **context):
etl = OrchestratedETL("https://api.watcher.com", transform_config)
return etl.execute_etl(transform_function, context, parent_execution_id=parent_execution_id)
@task
def end_parent_execution(parent_execution_id, **context):
"""End the parent execution."""
etl = OrchestratedETL("https://api.watcher.com", parent_config)
summary_result = ETLResult(
completed_successfully=True,
execution_metadata={
"execution_date": context['ds'],
"dag_run_id": context['dag_run'].run_id
}
)
etl.end_parent_execution(parent_execution_id, summary_result)
return summary_result
parent_id = start_parent_execution()
extract = extract_task(parent_id)
transform = transform_task(parent_id)
end = end_parent_execution(parent_id)
parent_id >> extract >> transform >> end
etl_pipeline()