Apache Airflow

Материал из Национальной библиотеки им. Н. Э. Баумана
Последнее изменение этой страницы: 21:46, 27 мая 2018.
Apache Airflow
Airflow logo.png
Разработчики: Maxime Beauchemin [1]
Постоянный выпуск: 1.9.0 / 16 декабря 2017г.
Состояние разработки: активное
Написана на: Python
Операционная система: Кроссплатформенная
Локализация: Английский язык
Лицензия: Apache2
Веб-сайт airflow.incubator.apache.org

Apache Airflow - это open-source набор библиотек для разработки, планирования и мониторинга рабочих процессов. Основная особенность в том, что для описания процессов пишется код на Python. В основном в роли рабочих процессов выступают ETL - процессы.

Описание

Основные сущности[Источник 1]

К основным сущностям, используемым в Apache Airflow относятся:

  1. DAG (Directed Acyclic Graph).
  2. Airflow operator.

Еще к модулям Apache Airflow, которые стоит рассмотреть подробно относятся: Планировщик и так называемая Execution Data.

DAG

Directed Acyclic Graph - это задачи, которые будут выполнены в строгой последовательности и по определенному расписанию записанные в один файл на Python. Задачи имеют некоторую смысловую связь, позволяющую считать их одним процессом. Файл, который создается для определения DAG по сути является не просто скриптом , обрабатывающим какие-то данные , а объектом. В процессе выполнения задачи DAG не могут пересекаться, так как они выполняются на разных объектах и в разное время.

DAG example

Каждый DAG имеет собственное расписание - то что определяет время и периодичность его запуска для планировщика задач. Типичный DAG может представлять собой последовательность следующих действий: забирает данные , обрабатывает их , выводит данные (записывает в базу данных например). Этот процесс и является ETL (Extract-Transform-Load). Для выполнения отдельных задач в процессе разработчиком реализуются операторы. По сути DAG - это набор операторов с заданной последовательностью их выполнения.

Пример: Создаем DAG с параметрами. Первый параметр это dag_id - то что является уникальным идентификатором для DAG. Вторым параметром передаем словарь аргументов по умолчанию. Третий параметр это интервал вызовов (shedule_interval). Тут он равен одному дню.

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

Airflow operator[Источник 2]

Оператор - сущность на основе которой создаются экземпляры заданий. В нем представлено описание конкретных действий которые будут происходит в момент выполнения экземпляра задания. В репозитории на github уже есть готовые примеры операторов. Вот некоторые из них:

  • BashOperator — оператор для выполнения bash-команды;
  • PythonOperator — оператор для вызова Python-кода;
  • EmailOperator — оператор для отправки email’а;
  • HTTPOperator — оператор для работы с http-запросами;
  • SqlOperator — оператор для выполнения SQL-кода;
  • Sensor — оператор ожидания события (наступления нужного времени, появления требуемого файла, строки в базе БД, ответа из API — и т. д., и т. п.).

Есть так же и другие, более специфические операторы: DockerOperator, JdbcOperator, OracleOperator и т. п..Так же можно создавать свои операторы на базе класса BaseOperator.

Для каждого DAG операторы инициализируются как задачи. Следующий код демонстрирует создание задач с использованием операторов:

 
    t1 = BashOperator(
    task_id='print_net_data',
    bash_command='ifconfig',
    dag=dag)

    t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

Каждый экземпляр класса оператора принимает аргумент task_id - значение этого аргумента является уникальным идентификатором задачи, bash_command - является собственно командой, которую должен выполнить BashOperator, retries - количество попыток выполнения команды и наконец dag - определяет принадлежность задачи к определенному DAG.

Для того чтобы определить зависимости между задачами необходимо иx связать:

t2.set_upstream(t1)

Код говорит о том что задача t2 будет зависеть от задачи t1. По сути это эквивалентно:

t1.set_downstream (t2)

Планировщик

Для планировщика в Apache Airflow используется Celery - Python-библиотека, которая позволяет организовать асинхронное и распределенное исполнение задач, а так же очередь из них. Все задачи разделены на пулы, которые создаются вручную в интерфейсе Apache Airflow. Пулы используются в основном для ограничения нагрузки на один источник или для типизации задач. Для управления пулами используется так же web-интерфейс Apache Airflow. Для пулов задаются их размеры - количество доступных слотов для задач. При создании DAG приписывается одному из пулов.

Scheduler - собственно сам планировщик. Отвечает за планировку всех задач в Apache Airflow и занимается тем что ставит эти задачи на выполнение. Алгоритм его работы следующий: eсли в DAG выполнены все предыдущие задачи, то происходит сортировка очереди по приоритетам задач и если в пуле есть свободное место под задачу, а так же есть свободный celery worker(процесс в Celery выполняющий работу) задача отправляется в него на выполнение.

Для того чтобы планировщик начал работать с DAG'ом необходимо установить аргумент shedule_interval, как и в одном из примеров выше. Есть уже готовые свойства для задания времени. Например:

@once, @hourly, @daily, @weekly, @monthly, @yearly

Так же это возможно сделать cron-выражением:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Execution date

Ключевая особенность Apache Airflow состоит в том что все запуски задач в DAG'ах получают свою Execution date. То есть хранятся все запуски задач DAG'ах за определенные промежутки времени. Это позволяет еще раз в точности воспроизводить результаты, полученные в той или иной Execution date. Так же получается что различные задачи в DAG'ах могут работать одновременно в разных Execution date. При корректировке кода задачи, запуски задач в предыдущих Execution date будут уже с учетом этих корректировок. При этом, очевидно, теряется воспроизводимость результатов, но появляется возможность протестировать новый алгоритм на старых данных.

Хранилище

В Airflow есть свой бекенд-репозиторий. Поскольку Airflow был построен для взаимодействия с его метаданными с помощью большой библиотеки SqlAlchemy, должна быть возможность использовать любую базу данных, поддерживаемую в качестве серверной части SqlAlchemy. Рекомендуется использовать MySQL или Postgres. В базе хранятся состояния задач, DAG’ов, глобальные переменные и т. д..

Установка

Установка проводится на чистую ОС Ubuntu 18.04

Предварительная настройка

Для установки на чистую ОС потребуется: Python,

sudo apt-get install python

а также pip

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python get-pip.py

не помешает так же установить

sudo apt-get install python-dev

Установка Apache Airflow

Указываем домашнюю директорию для airflow

export AIRFLOW_HOME=~/airflow

При помощи pip скачиваем и устанавливаем airflow

sudo pip install airflow

Для большей наглядности процедуры установки и решения возникающих проблем с недостающими пакетами можно посмотреть видео с установкой Apache Airflow:

Решение возможных проблем установки и запуска Apache Airflow

Для устранения ошибки command 'x86_64-linux-gnu-gcc' failed with exit status 1 может помочь установка следующих пакетов [Источник 3]

sudo apt-get install build-essential libssl-dev libffi-dev python-dev

или другой вариант [Источник 4]

sudo apt-get install build-essential autoconf libtool pkg-config python-opengl python-pyrex python-pyside.qtopengl idle-python2.7 qt4-dev-tools qt4-designer libqtgui4 libqtcore4 libqt4-xml libqt4-test libqt4-script libqt4-network libqt4-dbus python-qt4 python-qt4-gl libgle3 python-dev libssl-dev
sudo easy_install greenlet
sudo easy_install gevent

Если проблема связана с fernet то может помочь установка пакета cryptography

sudo pip install cryptography

Конфигурирование

Подготовка среды к работе с mysql

Для работы с базами данных в Apache Airflow необходимо устанавливать дополнительно пакеты для каждой базы отдельно (ставим mysql для работы с mediawiki)

sudo pip install airflow[mysql]

или для всех сразу.

sudo pip install airflow[all_dbs]

Для того чтобы эта установка прошла корректно необходимо установить еще два пакета.

sudo apt-get install libmysqlclient-dev
sudo pip install mysqlclient

Так же для работы с шифрованием потребуется сгенерировать и вставить в конфигурационный файл специальный ключ fernet_key: для этого открываем python в консоли

python

и в открывшемся терминале построчно вводим

from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(key)

в результате в консоль выведет сгенерированный ключ, его необходимо скопировать. Далее выходим из python'а

exit()

и открываем конфигурационный файл airflow

sudo nano ~/airflow/airflow.cfg

находим переменную fernet_key и заменяем ее значение из буфера нашим ключом.

Инициализация airflow

Для инициализации самой airflow достаточно нескольких команд. Для начала инициализируем базу данных для бек-енда самой airflow, где она хранит все записи.

airflow initdb

Теперь запустим airflow webserver

airflow webserver -p 8080

Так же не помешает сразу запустить scheduler

airflow scheduler

Для большей наглядности представлено видео-руководство.

Демонстрационный пример работы с бд mysql

Тест соединения с БД mysql

Для тестирования соединения в Apache Airflow есть специальный сервис Ad Hoc Query. Ему можно скармливать запросы к базе данных и получать результаты.

Ad Hoc Query example

Демонстрация работы DAG

В видео с демонстрацией представлен запуск DAG, просмотр результатов и логов после того как он отработает.

Вывод

Apache Airflow достаточно удобный инструмент для автоматизации часто повторяющихся процессов. Он удобен в использовании и развертывании , так же является open-source проектом с активным комьюнити. Для работы с базами данных существует практический не ограниченный функционал DAG'ов. Разработка и поддержка ведется множеством разработчиков со всего мира. Так же проект получает поддержку со стороны Apache Incubator. К плюсам проекта можно отнести:

  • Быстрота развертывания;
  • Open-source (с точки зрения открытости кода для анализа и модификации);
  • Гибкая и многофункциональная система;
  • Поддерживает множество баз данных.

Из минусов стоит отметить:

  • Много подводных камней при установке (необходимо устранять неявные зависимости);
  • Open-source (с точки зрения наличия большого количества багов, которых некому исправлять);
  • Сложность освоения;
  • Мало документации и статей на русском языке(в целом и на интернациональном английском тоже довольно мало статей);
  • Слабая документация на английском языке.

Примечания

  1. [1], October 2014 by Maxime Beauchemin at Airbnb.

Источники

  1. Описание сущностей Airflow // Habr блог mail.ru. URL: https://habr.com/company/mailru/blog/339392/ (дата обращения: 10.05.2018).
  2. Tutorial // Apache Airflow Incubator. URL: https://airflow.incubator.apache.org/tutorial.html (дата обращения: 15.05.2018).
  3. Answer for command 'x86_64-linux-gnu-gcc' failed with exit status 1 // Stackoverflow. URL: https://stackoverflow.com/questions/41492878/command-x86-64-linux-gnu-gcc-failed-with-exit-status-1 (дата обращения: 19.05.2018).
  4. Answer for command 'x86_64-linux-gnu-gcc' failed with exit status 1 // Stackoverflow. URL: https://stackoverflow.com/questions/26053982/setup-script-exited-with-error-command-x86-64-linux-gnu-gcc-failed-with-exit (дата обращения: 19.05.2018).

Ссылки