Learn the workflow of Airflow with python code examples and best practices.
In the world of data engineering, automation and orchestration are critical. As data pipelines grow more complex, relying on cron jobs and manual triggers becomes inefficient and error-prone. That’s where Apache Airflow steps in. Originally developed by Airbnb and now maintained under the Apache Software Foundation, Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows. This blog post will walk you through the essentials of Airflow, including installation, writing DAGs in Python, and building practical ETL pipelines. We will also highlight best practices and key tips to help you use Airflow effectively in real-world scenarios.
What is Apache Airflow?
Apache Airflow is a workflow management system that lets you define and manage your workflows as code. These workflows are expressed as DAGs, or Directed Acyclic Graphs. Each DAG represents a pipeline where nodes are tasks and edges define their execution order. Airflow’s use of Python makes it highly dynamic and flexible. You can use loops, conditionals, and reusable components when building pipelines. This is a major upgrade from static configuration files or manually coded bash scripts.
Airflow’s architecture is built for scalability. It uses a central metadata database to track task states, a scheduler to queue tasks, and a web server for monitoring. Depending on your needs, Airflow can run on a single machine or be deployed in distributed setups using Celery, Kubernetes, or other execution engines.
Why Choose Airflow Over Traditional Schedulers?
Airflow offers several advantages over basic tools like cron:
Python-native Workflows: Write your pipeline logic in Python, leveraging its full programming capabilities.
Visual Monitoring: Airflow comes with a rich web UI to visualize DAGs, monitor task status, and access logs.
Scalable Execution: Support for parallel execution using Celery or Kubernetes allows scaling as your pipelines grow.
Built-in Integrations: It includes operators and hooks for databases, cloud services (AWS, GCP, Azure), Docker, and more.
Scheduling and Retry Logic: Automate your workflows with fine-grained scheduling, retry policies, and alert mechanisms.
Installing Airflow: Getting Started the Right Way
Installing Airflow requires some preparation. While it’s possible to use pip
, the recommended method for local development is using the official constraints file to avoid dependency issues. Here’s a minimal example to get you started:
export AIRFLOW_HOME=~/airflow
pip install apache-airflow
airflow db init
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
airflow webserver --port 8080
airflow scheduler
Once the webserver is running, you can access the Airflow UI by visiting http://localhost:8080
. Here, you’ll see all your DAGs, their execution history, logs, and other useful operational metrics.
Writing a Basic DAG in Python
In Airflow, a DAG is just a Python script placed in the dags/
folder. Let’s look at a basic DAG that prints “Hello from Airflow!” each day.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def say_hello():
print("Hello from Airflow!")
default_args = {
'start_date': datetime(2023, 1, 1),
'retries': 1
}
with DAG(
dag_id='hello_world_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False
) as dag:
task = PythonOperator(
task_id='say_hello_task',
python_callable=say_hello
)
This example highlights how clean and readable Airflow DAGs can be. You define a function, assign it to a task using PythonOperator
, and specify how often it should run. The catchup=False
argument ensures that Airflow does not retroactively run missed jobs from the past.
Building an ETL Workflow Using Airflow
Now let’s expand into something more realistic: an ETL pipeline. This pipeline will extract data from a CSV file, transform it using Pandas, and load it into a SQLite database.
import pandas as pd
import sqlite3
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract():
df = pd.read_csv('/tmp/data.csv')
df.to_pickle('/tmp/extracted.pkl')
def transform():
df = pd.read_pickle('/tmp/extracted.pkl')
df['processed'] = df['value'] * 100
df.to_pickle('/tmp/transformed.pkl')
def load():
df = pd.read_pickle('/tmp/transformed.pkl')
conn = sqlite3.connect('/tmp/etl.db')
df.to_sql('processed_data', conn, if_exists='replace', index=False)
with DAG(
dag_id='etl_pipeline',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
) as dag:
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)
extract_task >> transform_task >> load_task
In this DAG, each task handles a different stage of the ETL process. Airflow ensures that tasks are executed in the right order and retries them automatically on failure. It also logs each task’s output, making debugging easier.
Airflow Best Practices
Using Airflow effectively goes beyond writing functional DAGs. Here are some best practices to keep in mind:
Make Tasks Idempotent: Tasks should produce the same results even if run multiple times. This helps with retries and backfills.
Avoid Long-Running Tasks: Break down large tasks into smaller units for better visibility and failure handling.
Use the TaskFlow API: Introduced in Airflow 2.x, the TaskFlow API simplifies task dependency management and improves readability.
Monitor Resource Usage: Use the Airflow UI or external tools to monitor CPU, memory, and I/O usage.
Secure Your Environment: Use role-based access control (RBAC), encrypted secrets, and secure communication between components.
Monitoring, Logging, and Alerting
Airflow shines when it comes to observability. Each task run is logged, and you can view logs directly from the web UI. You can also configure alerts to notify you via email or messaging platforms like Slack whenever a task fails. Here’s how to set up basic email alerts:
default_args = {
'email': ['alerts@example.com'],
'email_on_failure': True,
}
For robust alerting, consider integrating Airflow with services like PagerDuty or Prometheus for advanced monitoring and incident response.
Final Thoughts
Apache Airflow has become the go-to tool for managing complex, scalable data workflows. It gives you full control through Python scripting, lets you monitor tasks with ease, and scales well in production environments. Whether you’re managing ETL jobs, training machine learning models, or orchestrating distributed systems, Airflow provides the infrastructure and flexibility you need.
Start small build a simple DAG, explore the UI, and gradually scale your pipelines. Once you’re comfortable, you can take advantage of its rich ecosystem of plugins and integrations to build powerful, production-grade workflows.