Apache Airflow es uno de los últimos proyectos open source que han despertado un gran interés de la comunidad. Hasta el punto de haber sido integrado dentro del stack de Google Cloud como la herramienta de facto para orquestar sus servicios.

¿Qué hace a este proyecto tan especial y por qué ha tenido tan buena acogida? Analizamos su evolución y desgranamos sus principales características. Además, en este artículo analizamos la versión 2.0.0 de Apache Airflow que incluye multitud de novedades y mejoras.

Historia

Apache Airflow fue creado en octubre de 2014 por Maxime Beauchemin, dentro del equipo de ingeniería de datos de Airbnb, la famosa plataforma de alquiler vacacional. Desde sus comienzos fue concebido como software abierto y se publicó oficialmente en junio de 2015 estando disponible en GitHub.

Airflow fue acogido dentro del programa de incubación de la Apache Software Foundation en marzo de 2016, siguiendo así los pasos de otros grandes proyectos de software open source dentro del área de datos como Hadoop o Spark. Hoy en día es usado en producción por más de 200 compañías en todo el mundo, como Paypal, Spotify o Twitter.

El último gran empujón a Apache Airflow viene de la mano de Google. En mayo de 2018 anunció Google Cloud Composer, un servicio gestionado de Apache Airflow totalmente integrado en la plataforma de Google Cloud y que se convierte en una de las piedras angulares para orquestar servicios gestionados en Google Cloud.

DAGs y Operators

Pero realmente, ¿qué es Apache Airflow? Se trata de una plataforma a través de la cual podemos crear workflows de forma programática y, además, planificarlos y monitorizarlos de forma centralizada.

Un workflow dentro de Airflow podríamos definirlo como una secuencia de tareas, disparadas por un evento o planificación y que suelen usarse para manejar pipelines de datos.

Un ejemplo de un workflow muy sencillo sería el compuesto por las siguientes tareas:

  1. Descargar ciertos datos de un origen de datos (una base de datos por ejemplo).
  2. Enviar estos a un sistema de procesamiento (por ejemplo un cluster de Spark).
  3. Monitorizar el avance de dicho procesamiento.
  4. Generar un reporte con los resultados del procesamiento.
  5. Enviar el reporte por email.

Airflow modela estos workflows o conjuntos de tareas como grafos acíclicos dirigidos, en inglés DAG (Directed Acyclic Graphs). Estos grafos tienen la peculiaridad de que cumplen dos condiciones:

Este tipo de grafo es usado también por otros proyectos de software muy relevantes hoy en día como Spark o Tensorflow para crear sus modelos de ejecución.

Cada una de estas tareas que componen un DAG de Airflow es un Operator en Airflow. Por lo tanto, para definir un DAG tendremos que definir todos los Operators necesarios y establecer las relaciones y dependencias entre ellos, esto lo haremos de forma sencilla con código Python.

Existen multitud de Operators predefinidos aunque podemos extender los nuestros si es necesario. Algunos de los Operators predefinidos más interesantes son los siguientes:

En la documentación oficial podemos encontrar una documentaOperatorsción exhaustiva de todos los Operators “oficiales”.

Un ejemplo real

Un ejemplo de un DAG mínimo sería el siguiente:

En este workflow hemos definido tres tareas:

A continuación describimos el código necesario para programar este workflow.

En primer lugar, realizamos las importaciones correspondientes, entre ellas la del BashOperator que es el tipo de Operator, que usaremos en este ejemplo:

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

En segundo lugar, definimos los argumentos por defecto que usaremos para instanciar el DAG, en este punto configuraremos aspectos importantes como la política de reintentos.


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

Instanciamos el DAG pasándole los argumentos por defecto y la planificación de ejecución. En este caso, una vez al día.

Para planificar la ejecución podemos usar también una notación de tipo cron que suele ser la más conveniente:

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

A continuación, definimos los tres Operators. En este caso como instancias de BashOperator, los asociaremos a nuestro DAG y en el parámetro bash_command definiremos el comando bash que deben ejecutar:

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)
t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag,
)
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

Por último, definiremos la dependencia o relación que existe entre nuestros operadores. En este caso, haciendo uso de los operadores bit shift (>>), que están sobrecargados desde la versión 1.8 para simplificar la definición de relaciones entre Operators:

t1 >> [t2, t3]

Características interesantes

Hasta aquí hemos descrito el funcionamiento general de Airflow y cómo programar un DAG. Pero, ¿por qué es especial Airflow y qué lo diferencia de otras herramientas?

A continuación, describimos algunas de las características más notables de Airflow:

Google Cloud Composer

Como en ocasiones anteriores, Google se basa en proyectos open source con prestigio y una amplia comunidad y los integra en su plataforma en la nube como servicios gestionados.

Con esta estrategia Google no parte de cero a la hora de construir servicios en la nube. Por otro lado, refuerza y apoya estas comunidades, contribuyendo al desarrollo de los proyectos. Podemos ver otros casos parecidos con Apache Beam y Dataflow o con Kubernetes y GKE.

Cloud Composer no deja de ser una versión de Apache Airflow, pero tiene ciertas ventajas al tratarse un servicio gestionado (por supuesto también tiene un coste adicional). Algunas de los principales beneficios de Cloud Composer son los siguientes:

Otro aspecto interesante a destacar es que con Cloud Composer nuestros ficheros que describen los DAGs son almacenados en Cloud Storage.

Para desplegar un nuevo DAG simplemente tenemos que subirlo a Cloud Storage, ya sea por el interfaz web de la consola de Google Cloud o por las herramientas del SDK de Google Cloud (gcloud).

Conclusiones

Podemos concluir que Apache Airflow es un proyecto open source consolidado, con una gran y activa comunidad detrás y con el apoyo de empresas importantes como Airbnb y Google.

Muchas compañías están usando Airflow hoy en día en producción para orquestar sus workflows de datos e implementar sus políticas de gobierno y calidad del dato.

Airflow nos permite gobernar de forma centralizada nuestros pipelines de datos y nos ofrece un sistema moderno de logging y monitorización homogéneo.

Gracias a Cloud Composer podemos disponer de un entorno de Airflow productivo de forma muy rápida y sencilla con las comodidades de un servicio gestionado e integrado con toda la plataforma de Google Cloud.

Reduciremos nuestros esfuerzos en el despliegue y el mantenimiento de la infraestructura, pero tendremos los costes asociados al servicio en la nube.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete