All posts
Data Engineering

Apache Airflow at Scale: DAG Patterns, Best Practices, and Monitoring

15 min
Share:
AirflowData EngineeringETLWorkflowPython

Build production-ready data pipelines with Apache Airflow. Learn DAG design patterns, error handling, monitoring strategies, and scaling techniques for reliable data workflows.

Apache Airflow orchestrates complex data workflows, but building production-grade DAGs requires understanding patterns, error handling, and monitoring. This guide shares lessons from running Airflow at scale.

DAG Design Patterns

1from airflow import DAG
2from airflow.operators.python import PythonOperator
3from airflow.operators.bash import BashOperator
4from airflow.sensors.external_task import ExternalTaskSensor
5from datetime import datetime, timedelta
6
7# ✅ Good: Reusable, testable DAG
8default_args = {
9    'owner': 'data-team',
10    'depends_on_past': False,
11    'email': ['alerts@example.com'],
12    'email_on_failure': True,
13    'email_on_retry': False,
14    'retries': 3,
15    'retry_delay': timedelta(minutes=5),
16    'retry_exponential_backoff': True,
17    'max_retry_delay': timedelta(hours=1)
18}
19
20with DAG(
21    'etl_user_analytics',
22    default_args=default_args,
23    description='Daily user analytics ETL',
24    schedule='0 2 * * *',  # 2 AM daily
25    start_date=datetime(2024, 1, 1),
26    catchup=False,  # Don't backfill on deploy
27    max_active_runs=1,  # Prevent overlapping runs
28    tags=['analytics', 'daily']
29) as dag:
30    
31    # 1. Extract phase
32    extract_users = PythonOperator(
33        task_id='extract_users',
34        python_callable=extract_users_from_db,
35        op_kwargs={'date': '{{ ds }}'}  # Templated date
36    )
37    
38    extract_events = PythonOperator(
39        task_id='extract_events',
40        python_callable=extract_events_from_s3,
41        op_kwargs={'date': '{{ ds }}'}
42    )
43    
44    # 2. Transform phase (runs after extracts)
45    transform_data = PythonOperator(
46        task_id='transform_data',
47        python_callable=transform_user_events,
48        op_kwargs={
49            'users_path': '{{ ti.xcom_pull(task_ids="extract_users") }}',
50            'events_path': '{{ ti.xcom_pull(task_ids="extract_events") }}'
51        }
52    )
53    
54    # 3. Load phase
55    load_to_warehouse = PythonOperator(
56        task_id='load_to_warehouse',
57        python_callable=load_to_redshift,
58        op_kwargs={'data_path': '{{ ti.xcom_pull(task_ids="transform_data") }}'}
59    )
60    
61    # 4. Data quality check
62    validate_data = PythonOperator(
63        task_id='validate_data',
64        python_callable=run_data_quality_checks
65    )
66    
67    # Define dependencies
68    [extract_users, extract_events] >> transform_data >> load_to_warehouse >> validate_data

Error Handling and Retries

1from airflow.exceptions import AirflowException, AirflowSkipException
2from airflow.providers.postgres.hooks.postgres import PostgresHook
3from airflow.utils.trigger_rule import TriggerRule
4
5def extract_with_error_handling(**context):
6    """Extract with proper error handling"""
7    try:
8        # Attempt extraction
9        data = fetch_data_from_api()
10        
11        # Data quality check
12        if not data or len(data) == 0:
13            raise AirflowException("No data returned from API")
14        
15        # Save to XCom
16        return save_to_s3(data)
17        
18    except requests.Timeout:
19        # Retriable error - will retry based on DAG retries
20        raise
21    except requests.HTTPError as e:
22        if e.response.status_code == 404:
23            # Skip this task - data doesn't exist yet
24            raise AirflowSkipException(f"Data not found: {e}")
25        elif e.response.status_code >= 500:
26            # Server error - retry
27            raise
28        else:
29            # Client error - don't retry
30            raise AirflowException(f"API error: {e}")
31
32# Cleanup task that runs even if pipeline fails
33cleanup_task = PythonOperator(
34    task_id='cleanup_temp_files',
35    python_callable=cleanup_temp_files,
36    trigger_rule=TriggerRule.ALL_DONE  # Run regardless of upstream status
37)
38
39# Notification task for failures
40def send_failure_notification(context):
41    """Send Slack notification on failure"""
42    dag_id = context['dag'].dag_id
43    task_id = context['task'].task_id
44    execution_date = context['execution_date']
45    log_url = context['task_instance'].log_url
46    
47    send_slack_message(
48        f"❌ Task {task_id} in DAG {dag_id} failed\n"
49        f"Execution: {execution_date}\n"
50        f"Logs: {log_url}"
51    )
52
53failure_alert = PythonOperator(
54    task_id='failure_alert',
55    python_callable=send_failure_notification,
56    trigger_rule=TriggerRule.ONE_FAILED
57)
58
59# Task dependency
60[extract_users, extract_events] >> transform_data >> [load_to_warehouse, cleanup_task]
61load_to_warehouse >> [validate_data, failure_alert]

Dynamic DAGs and TaskGroups

1from airflow.decorators import task, task_group
2from airflow.models import Variable
3
4# ✅ Dynamic DAGs based on configuration
5TABLES = Variable.get("tables_to_sync", deserialize_json=True)
6# ["users", "orders", "products", "reviews"]
7
8with DAG('dynamic_table_sync', ...) as dag:
9    
10    @task_group(group_id="extract_tables")
11    def extract_all_tables():
12        for table in TABLES:
13            @task(task_id=f"extract_{table}")
14            def extract_table(table_name: str):
15                return extract_from_postgres(table_name)
16            
17            extract_table(table)
18    
19    @task_group(group_id="transform_tables")
20    def transform_all_tables():
21        for table in TABLES:
22            @task(task_id=f"transform_{table}")
23            def transform_table(table_name: str):
24                return transform_data(table_name)
25            
26            transform_table(table)
27    
28    extract_all_tables() >> transform_all_tables()
29
30# Using TaskFlow API (cleaner syntax)
31from airflow.decorators import dag
32
33@dag(
34    schedule='@daily',
35    start_date=datetime(2024, 1, 1),
36    catchup=False,
37    tags=['taskflow']
38)
39def modern_etl():
40    
41    @task
42    def extract():
43        """Extract from source"""
44        return fetch_data()
45    
46    @task
47    def transform(data: list):
48        """Transform data"""
49        return process_data(data)
50    
51    @task
52    def load(data: list):
53        """Load to warehouse"""
54        save_to_warehouse(data)
55    
56    # Automatic dependency management
57    load(transform(extract()))
58
59dag_instance = modern_etl()

Monitoring and Alerting

1# 1. Data quality checks
2from great_expectations.core import ExpectationSuite
3from airflow.providers.great_expectations.operators.great_expectations import \
4    GreatExpectationsOperator
5
6validate_data = GreatExpectationsOperator(
7    task_id="validate_user_data",
8    expectation_suite_name="user_data_suite",
9    data_context_root_dir="/path/to/great_expectations",
10    fail_task_on_validation_failure=True
11)
12
13# 2. SLA monitoring
14sla_miss_callback = lambda context: send_sla_alert(context)
15
16with DAG(
17    'time_sensitive_pipeline',
18    default_args={
19        'sla': timedelta(hours=2),  # Must complete within 2 hours
20        'sla_miss_callback': sla_miss_callback
21    },
22    ...
23) as dag:
24    pass
25
26# 3. Custom metrics
27from airflow.providers.statsd.hooks.statsd import StatsHook
28
29def track_metrics(**context):
30    statsd = StatsHook()
31    
32    # Record execution time
33    duration = context['task_instance'].duration
34    statsd.timing('airflow.task.duration', duration)
35    
36    # Record row counts
37    row_count = get_row_count()
38    statsd.gauge('airflow.pipeline.rows_processed', row_count)
39
40# 4. Logs to external systems
41import logging
42from pythonjsonlogger import jsonlogger
43
44logger = logging.getLogger(__name__)
45handler = logging.StreamHandler()
46formatter = jsonlogger.JsonFormatter()
47handler.setFormatter(formatter)
48logger.addHandler(handler)
49
50def log_pipeline_metrics(**context):
51    logger.info('Pipeline completed', extra={
52        'dag_id': context['dag'].dag_id,
53        'execution_date': str(context['execution_date']),
54        'rows_processed': get_row_count(),
55        'duration_seconds': context['task_instance'].duration
56    })

Best Practices

  • Use connection pooling for database connections (define in Airflow UI)
  • Set appropriate pool slots to limit concurrent tasks
  • Use sensors with poke_interval and timeout to wait for external events
  • Store credentials in Airflow Connections or AWS Secrets Manager
  • Use KubernetesPodOperator for isolated task execution
  • Keep DAGs in Git with CI/CD for automated testing
  • Use DAG versioning to track changes over time
  • Set depends_on_past=True for sequential processing
  • Use SubDAGs or TaskGroups for code reusability
  • Monitor scheduler heartbeat and task queue depth
  • Scale workers horizontally with Celery/Kubernetes executor
  • Set appropriate task concurrency limits
  • Use XCom sparingly - it stores data in metadata DB
  • Archive old DAG runs to keep metadata DB small
  • Test DAGs locally with airflow tasks test command