Airflow Task Scheduler

2019-03-19
Python Automation



airflow

Apache Airflow is a platform to programmatically author, schedule and monitor workflows. The airflow scheduler executes your tasks on an array of workers following the specified dependencies.

1. Why Airflow?

People usually need to execute some tasks periodically. One common solution is to use cron wich is a good solution for simple tasks. But the more tasks you need to schedule the more problems I will have, specially if there are dependencies between one another.

Airflow allows to define workflows of tasks and you can define them as code making ig more maintainable, versionable, testable and collaborative. Check out the Airflow documentation for more information.

2. Installation

First of all you will need a Linux machine. I'd suggest you use an AWS EC2 instance. You can see here how to create one.

Then you can download airflow:

sudo AIRFLOW_GPL_UNIDECODE=yes pip3 install apache-airflow

When it is installed you can initialize the database (it will be SQLite by default):

airflow initdb

2.1 Test Airflow

We will test airflow with an example from airflow documentation. You will need to create the file tutorial.py:

cd airflow
mkdir dags
cd airflow
nano tutorial.py

And then paste the example and save the file:

~/airflow/dags/tutorial.py
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

You can test that things are going as expected if the following command does not raise any exception:

python3 ~/airflow/dags/tutorial.py

2.1.1. Validate metadata

You can chech that the tutorial dag has been properly created with:

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

2.1.2 Run the test

You can launch tasks with the following commands:

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

# testing templated
airflow test tutorial templated 2015-06-01

3. Use Airflow UI

Of the great things about airflow is the UI.

airflow_ui

3.1. Open EC2 ports (optional)

If you are using and AWS EC2 you will probably have only the 22 port open to connect through SSH.

  1. Go to AWS console and then to the EC2 page. Use the sidebar to go to NETWORK & SECURITY/Security Groups.
  2. Find the security group of your EC2 instance and edit the Inbound rules.
  3. Add Custom TCP Rule with port 8080.

3.2. Start Airflow

You can start airflow with:

airflow webserver -p 8080 # or simply use 'airflow webserver'

You can now view Airflow at XX.XXX.XXX.XXX:8080 (Use your EC2 IP).

3.3. Secure Airflow UI

First we will edit the airflow configuration.

nano ~/airflow/airflow.cfg

Inside the section [webserver] find the line authenticate=X and replace it with:

~/airflow/airflow.cfg
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

Install flask_bcrypt and start python:

pip3 install flask-bcrypt

# start python
python3

And add a new user with:

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'new_user_name'
user.email = 'new_user_email@example.com'
user.password = 'set_the_password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

4. Runing airflow with supervisor

If you want a process always runing and to be restarted on failure you should use supervisor.

4.1. Clean examples and other tests

Edit the airflow.cfg to disable the examples by editing load_examples = True to:

~/airflow/airflow.cfg
load_examples = False

If you still have examples runing you can reset the database:

airflow resetdb -y

This will delete all data be careful.

If you can't delete the previous example try deleting ~/airflow/dags/tutorial.py first.

After cleaning all examples you should stop all airflow processes with:

pkill -f airflow

This will delete the current DAG information

4.2. Runing supervisor

First install it with:

sudo apt install supervisor -y

Then create a folder for the airflow logs:

sudo mkdir /var/log/airflow/

And now declare the airflow services with by creating the file /etc/supervisor/conf.d/airflow.conf

/etc/supervisor/conf.d/airflow.conf
[program:airflow_webserver]
command=airflow webserver -p 8080
stopsignal=QUIT
stopasgroup=true
stdout_logfile=/var/log/airflow/airflow_webserver.log
stderr_logfile=/var/log/airflow/airflow_webserver.error.log
autorestart=true
user=ubuntu

[program:airflow_scheduler]
command=airflow scheduler
stopsignal=QUIT
stopasgroup=true
stdout_logfile=/var/log/airflow/airflow_scheduler.log
stderr_logfile=/var/log/airflow/airflow_scheduler.error.log
autorestart=true
user=ubuntu

We are runing airflow with ubuntu user since by default it gets installed to ~/airflow. You can change the default path and use another user.

Then start supervisor:

sudo supervisorctl reread
sudo service supervisor restart

# Check the result
sudo supervisorctl status

And that's it. You can now start using airflow.

5. Next steps

You can add DAGs in the folder ~/airflow/dags and they should be automatically loaded.

It is advised to run airflow with at least a t2.medium AWS instance. You can run it with a smaller one (I use a t2.micro since it is in the free tier) but you can easily get your instance at 100% CPU usage while runing tasks.

5.1. Using git

You can even change that path to a folder that is tracked with git to keep control of the DAGs by editing the airflow.cfg:

~/airflow/airflow.cfg
dags_folder = "home/ubuntu/airflow_tasks/src"

To keep the code sync with the origin you could create an airflow task but it will fill the logs with unuseful information. This time ironically is better to use crontab to fetch the code. You can do it with:

crontab -e

Add this line at the end:

* */5 * * * cd /home/ubuntu/airflow_tasks && git fetch && git checkout master && git pull origin master

I use /home/ubuntu/airflow_tasks for the repo path and the folder src for DAGs path. But you can use whatever you want.