Что такое Apache Airflow?

Apache Airflow — это платформа с открытым исходным кодом для разработки, планирования и мониторинга рабочих процессов (workflows). Airflow позволяет программно создавать, планировать и мониторить сложные пайплайны обработки данных.

Основные возможности Apache Airflow

  • Визуальный интерфейс — графическое представление DAG и их выполнения
  • Расширяемость — богатый набор операторов и возможность создания собственных
  • Масштабируемость — поддержка различных исполнителей (executors)
  • Мониторинг — отслеживание состояния задач и уведомления
  • Программируемость — определение workflows как код на Python

Когда использовать Apache Airflow

Apache Airflow идеально подходит для:

  • Создания ETL/ELT пайплайнов для обработки данных
  • Автоматизации регулярных задач и процессов
  • Оркестрации машинного обучения workflows
  • Интеграции различных систем и сервисов
  • Создания сложных зависимостей между задачами

Установка Apache Airflow

Установка через pip:

1pip install apache-airflow
2poetry add apache-airflow

Или используй Docker Compose:

 1version: '3.8'
 2services:
 3  airflow-webserver:
 4    image: apache/airflow:2.5.0
 5    command: webserver
 6    ports:
 7      - "8080:8080"
 8    environment:
 9      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
10      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
11    depends_on:
12      - postgres
13  
14  postgres:
15    image: postgres:13
16    environment:
17      - POSTGRES_USER=airflow
18      - POSTGRES_PASSWORD=airflow
19      - POSTGRES_DB=airflow

Основные концепции

  • DAG (Directed Acyclic Graph) — граф задач без циклов
  • Task — единица работы в DAG
  • Operator — определяет тип выполняемой задачи
  • Scheduler — планировщик выполнения задач
  • Executor — механизм выполнения задач

Пример простого DAG

Создай файл в папке dags/:

 1from datetime import datetime, timedelta
 2from airflow import DAG
 3from airflow.operators.bash import BashOperator
 4from airflow.operators.python import PythonOperator
 5
 6def my_python_function():
 7    print("Выполняется Python функция")
 8    return "Успешно завершено"
 9
10# Определение DAG
11dag = DAG(
12    'example_dag',
13    default_args={
14        'owner': 'data-team',
15        'depends_on_past': False,
16        'start_date': datetime(2025, 1, 1),
17        'email_on_failure': False,
18        'email_on_retry': False,
19        'retries': 1,
20        'retry_delay': timedelta(minutes=5),
21    },
22    description='Пример простого DAG',
23    schedule_interval=timedelta(days=1),
24    catchup=False,
25    tags=['example'],
26)
27
28# Задача с Bash оператором
29bash_task = BashOperator(
30    task_id='print_date',
31    bash_command='date',
32    dag=dag,
33)
34
35# Задача с Python оператором
36python_task = PythonOperator(
37    task_id='python_task',
38    python_callable=my_python_function,
39    dag=dag,
40)
41
42# Определение зависимостей
43bash_task >> python_task

Популярные операторы

  • BashOperator — выполнение bash команд
  • PythonOperator — выполнение Python функций
  • SqlOperator — выполнение SQL запросов
  • DockerOperator — запуск Docker контейнеров
  • KubernetesPodOperator — запуск задач в Kubernetes

Настройка connections

Для подключения к внешним системам используй Airflow Connections:

1from airflow.hooks.postgres_hook import PostgresHook
2
3def extract_data():
4    pg_hook = PostgresHook(postgres_conn_id='my_postgres_conn')
5    records = pg_hook.get_records("SELECT * FROM users")
6    return records

Мониторинг и алерты

Настрой уведомления при сбоях:

 1def task_fail_slack_alert(context):
 2    slack_msg = f"""
 3        Задача {context.get('task_instance').task_id} упала
 4        DAG: {context.get('task_instance').dag_id}
 5        Время: {context.get('execution_date')}
 6    """
 7    # Отправка в Slack
 8    
 9default_args = {
10    'on_failure_callback': task_fail_slack_alert,
11}

Best Practices

  • Используй идемпотентные задачи
  • Избегай циклических зависимостей
  • Настраивай правильные интервалы повторных попыток
  • Используй XCom для передачи небольших данных между задачами
  • Мониторь производительность и ресурсы

FAQ

Подходит ли Apache Airflow для продакшена?

Да, Apache Airflow широко используется в production средах крупных компаний для оркестрации сложных пайплайнов обработки данных и автоматизации бизнес-процессов.

Какие требования к инфраструктуре?

Минимальные требования: 4 ГБ RAM, 2 CPU cores. Для production рекомендуется использовать PostgreSQL как метаstore и настроить мониторинг.

Чем Airflow отличается от Jenkins?

Airflow специализируется на пайплайнах обработки данных с богатыми возможностями планирования, в то время как Jenkins больше ориентирован на CI/CD процессы.