Data Engineering
Apache Airflow at Scale: DAG Patterns, Best Practices, and Monitoring
15 min
AirflowData EngineeringETLWorkflowPython
Build production-ready data pipelines with Apache Airflow. Learn DAG design patterns, error handling, monitoring strategies, and scaling techniques for reliable data workflows.
Apache Airflow orchestrates complex data workflows, but building production-grade DAGs requires understanding patterns, error handling, and monitoring. This guide shares lessons from running Airflow at scale.
DAG Design Patterns
1from airflow import DAG
2from airflow.operators.python import PythonOperator
3from airflow.operators.bash import BashOperator
4from airflow.sensors.external_task import ExternalTaskSensor
5from datetime import datetime, timedelta
6
7# ✅ Good: Reusable, testable DAG
8default_args = {
9 'owner': 'data-team',
10 'depends_on_past': False,
11 'email': ['alerts@example.com'],
12 'email_on_failure': True,
13 'email_on_retry': False,
14 'retries': 3,
15 'retry_delay': timedelta(minutes=5),
16 'retry_exponential_backoff': True,
17 'max_retry_delay': timedelta(hours=1)
18}
19
20with DAG(
21 'etl_user_analytics',
22 default_args=default_args,
23 description='Daily user analytics ETL',
24 schedule='0 2 * * *', # 2 AM daily
25 start_date=datetime(2024, 1, 1),
26 catchup=False, # Don't backfill on deploy
27 max_active_runs=1, # Prevent overlapping runs
28 tags=['analytics', 'daily']
29) as dag:
30
31 # 1. Extract phase
32 extract_users = PythonOperator(
33 task_id='extract_users',
34 python_callable=extract_users_from_db,
35 op_kwargs={'date': '{{ ds }}'} # Templated date
36 )
37
38 extract_events = PythonOperator(
39 task_id='extract_events',
40 python_callable=extract_events_from_s3,
41 op_kwargs={'date': '{{ ds }}'}
42 )
43
44 # 2. Transform phase (runs after extracts)
45 transform_data = PythonOperator(
46 task_id='transform_data',
47 python_callable=transform_user_events,
48 op_kwargs={
49 'users_path': '{{ ti.xcom_pull(task_ids="extract_users") }}',
50 'events_path': '{{ ti.xcom_pull(task_ids="extract_events") }}'
51 }
52 )
53
54 # 3. Load phase
55 load_to_warehouse = PythonOperator(
56 task_id='load_to_warehouse',
57 python_callable=load_to_redshift,
58 op_kwargs={'data_path': '{{ ti.xcom_pull(task_ids="transform_data") }}'}
59 )
60
61 # 4. Data quality check
62 validate_data = PythonOperator(
63 task_id='validate_data',
64 python_callable=run_data_quality_checks
65 )
66
67 # Define dependencies
68 [extract_users, extract_events] >> transform_data >> load_to_warehouse >> validate_data
Error Handling and Retries
1from airflow.exceptions import AirflowException, AirflowSkipException
2from airflow.providers.postgres.hooks.postgres import PostgresHook
3from airflow.utils.trigger_rule import TriggerRule
4
5def extract_with_error_handling(**context):
6 """Extract with proper error handling"""
7 try:
8 # Attempt extraction
9 data = fetch_data_from_api()
10
11 # Data quality check
12 if not data or len(data) == 0:
13 raise AirflowException("No data returned from API")
14
15 # Save to XCom
16 return save_to_s3(data)
17
18 except requests.Timeout:
19 # Retriable error - will retry based on DAG retries
20 raise
21 except requests.HTTPError as e:
22 if e.response.status_code == 404:
23 # Skip this task - data doesn't exist yet
24 raise AirflowSkipException(f"Data not found: {e}")
25 elif e.response.status_code >= 500:
26 # Server error - retry
27 raise
28 else:
29 # Client error - don't retry
30 raise AirflowException(f"API error: {e}")
31
32# Cleanup task that runs even if pipeline fails
33cleanup_task = PythonOperator(
34 task_id='cleanup_temp_files',
35 python_callable=cleanup_temp_files,
36 trigger_rule=TriggerRule.ALL_DONE # Run regardless of upstream status
37)
38
39# Notification task for failures
40def send_failure_notification(context):
41 """Send Slack notification on failure"""
42 dag_id = context['dag'].dag_id
43 task_id = context['task'].task_id
44 execution_date = context['execution_date']
45 log_url = context['task_instance'].log_url
46
47 send_slack_message(
48 f"❌ Task {task_id} in DAG {dag_id} failed\n"
49 f"Execution: {execution_date}\n"
50 f"Logs: {log_url}"
51 )
52
53failure_alert = PythonOperator(
54 task_id='failure_alert',
55 python_callable=send_failure_notification,
56 trigger_rule=TriggerRule.ONE_FAILED
57)
58
59# Task dependency
60[extract_users, extract_events] >> transform_data >> [load_to_warehouse, cleanup_task]
61load_to_warehouse >> [validate_data, failure_alert]
Dynamic DAGs and TaskGroups
1from airflow.decorators import task, task_group
2from airflow.models import Variable
3
4# ✅ Dynamic DAGs based on configuration
5TABLES = Variable.get("tables_to_sync", deserialize_json=True)
6# ["users", "orders", "products", "reviews"]
7
8with DAG('dynamic_table_sync', ...) as dag:
9
10 @task_group(group_id="extract_tables")
11 def extract_all_tables():
12 for table in TABLES:
13 @task(task_id=f"extract_{table}")
14 def extract_table(table_name: str):
15 return extract_from_postgres(table_name)
16
17 extract_table(table)
18
19 @task_group(group_id="transform_tables")
20 def transform_all_tables():
21 for table in TABLES:
22 @task(task_id=f"transform_{table}")
23 def transform_table(table_name: str):
24 return transform_data(table_name)
25
26 transform_table(table)
27
28 extract_all_tables() >> transform_all_tables()
29
30# Using TaskFlow API (cleaner syntax)
31from airflow.decorators import dag
32
33@dag(
34 schedule='@daily',
35 start_date=datetime(2024, 1, 1),
36 catchup=False,
37 tags=['taskflow']
38)
39def modern_etl():
40
41 @task
42 def extract():
43 """Extract from source"""
44 return fetch_data()
45
46 @task
47 def transform(data: list):
48 """Transform data"""
49 return process_data(data)
50
51 @task
52 def load(data: list):
53 """Load to warehouse"""
54 save_to_warehouse(data)
55
56 # Automatic dependency management
57 load(transform(extract()))
58
59dag_instance = modern_etl()
Monitoring and Alerting
1# 1. Data quality checks
2from great_expectations.core import ExpectationSuite
3from airflow.providers.great_expectations.operators.great_expectations import \
4 GreatExpectationsOperator
5
6validate_data = GreatExpectationsOperator(
7 task_id="validate_user_data",
8 expectation_suite_name="user_data_suite",
9 data_context_root_dir="/path/to/great_expectations",
10 fail_task_on_validation_failure=True
11)
12
13# 2. SLA monitoring
14sla_miss_callback = lambda context: send_sla_alert(context)
15
16with DAG(
17 'time_sensitive_pipeline',
18 default_args={
19 'sla': timedelta(hours=2), # Must complete within 2 hours
20 'sla_miss_callback': sla_miss_callback
21 },
22 ...
23) as dag:
24 pass
25
26# 3. Custom metrics
27from airflow.providers.statsd.hooks.statsd import StatsHook
28
29def track_metrics(**context):
30 statsd = StatsHook()
31
32 # Record execution time
33 duration = context['task_instance'].duration
34 statsd.timing('airflow.task.duration', duration)
35
36 # Record row counts
37 row_count = get_row_count()
38 statsd.gauge('airflow.pipeline.rows_processed', row_count)
39
40# 4. Logs to external systems
41import logging
42from pythonjsonlogger import jsonlogger
43
44logger = logging.getLogger(__name__)
45handler = logging.StreamHandler()
46formatter = jsonlogger.JsonFormatter()
47handler.setFormatter(formatter)
48logger.addHandler(handler)
49
50def log_pipeline_metrics(**context):
51 logger.info('Pipeline completed', extra={
52 'dag_id': context['dag'].dag_id,
53 'execution_date': str(context['execution_date']),
54 'rows_processed': get_row_count(),
55 'duration_seconds': context['task_instance'].duration
56 })
Best Practices
- Use connection pooling for database connections (define in Airflow UI)
- Set appropriate pool slots to limit concurrent tasks
- Use sensors with poke_interval and timeout to wait for external events
- Store credentials in Airflow Connections or AWS Secrets Manager
- Use KubernetesPodOperator for isolated task execution
- Keep DAGs in Git with CI/CD for automated testing
- Use DAG versioning to track changes over time
- Set depends_on_past=True for sequential processing
- Use SubDAGs or TaskGroups for code reusability
- Monitor scheduler heartbeat and task queue depth
- Scale workers horizontally with Celery/Kubernetes executor
- Set appropriate task concurrency limits
- Use XCom sparingly - it stores data in metadata DB
- Archive old DAG runs to keep metadata DB small
- Test DAGs locally with airflow tasks test command