Luigi orchestrator
- 2020-01-06
- 🛠️ Tools/Utils
- Tools Orchestration
1. What is Luigi
Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
It is easy to set up and integrates well with python code. This, for example, make it possible to call tasks directly from python and/or to set task dependencies dinamically.
You can install it directly with pip:
pip install luigi
2. Usage
2.1. How it works
Luigi work with tasks that are defined with classes that extend the luigi.Task
class.
The basic usage is to overwrite the run
function (with what to do) and the output
(with the file that will output).
All Luigi tasks need to end with writting a file. This is how luigi tracks which tasks are completed.
For example a basic task would look like:
import luigi
from datetime import date, datetime
class ReportTask(luigi.Task):
def run(self):
with open(f"output.txt", "w") as stream:
stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def output(self):
return luigi.LocalTarget("output.txt")
This task will write a file called output.txt
with the execution datetime.
2.2. Basic example
For a real example tasks should also have dependencies.
First of all, for this example let’s create two python files (register.py
and report.py
) which run simple python task.
They both will accept a name as parameter so that it can be run for different days.
The first one will simply write a file with the execution time:
register.py
from datetime import date, datetime
def main(filename):
uri = f"output/{filename}.txt"
with open(uri, "w") as stream:
stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print(f"File '{uri}' wrote")
And the other will write the same as an html file:
report.py
from datetime import date, datetime
from markdown import markdown
def main(filename):
uri = f"output/{filename}.html"
html = markdown(
f"""# Report
{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
"""
)
with open(uri, "w") as stream:
stream.write(html)
print(f"File '{uri}' wrote")
With those files it is possible to create the associated luigi tasks:
from datetime import date, datetime
import luigi
class RegisterTask(luigi.Task):
mdate = luigi.DateParameter(default=date.today())
def run(self):
from register import main
main(self.mdate.strftime("%Y_%m_%d"))
def output(self):
return luigi.LocalTarget(self.mdate.strftime("output/%Y_%m_%d.txt"))
class ReportTask(luigi.Task):
mdate = luigi.DateParameter(default=date.today())
def run(self):
from report import main
main(self.mdate.strftime("%Y_%m_%d"))
def output(self):
return luigi.LocalTarget(self.mdate.strftime("output/%Y_%m_%d.html"))
Both tasks have a parameter called mdate
so that each day the task runs it will write a different file.
Finally let’s add a dummy task that requires both RegisterTask
and ReportTask
.
class DoAllTask(luigi.WrapperTask):
mdate = luigi.DateParameter(default=date.today())
def requires(self):
return RegisterTask(self.mdate), ReportTask(self.mdate)
Since DoAllTask
does not need to do anything it extends luigi.WrapperTask
.
To run the example you first need to start the luigi server with:
luigid
And then you put the three tasks to a file called master.py
and add:
if __name__ == "__main__":
luigi.build([DoAllTask()])
You can run:
python master.py
And you should see how Luigi completes the 3 tasks.
2.3. Luigi server
If you go to localhost:8082
you can see luigi server.
It gives a summary of the tasks and their status.
If you select a task (for example DoAllTask
) you can see the dependency tree.
Remember to unmark the Hide Done
button to see all task.
It is also possible view the dependency graph created with D3
.
3. Using task templates
The best way to orchestrate tasks in a fault tolerant way is by having a script (or package) for each task.
This is the same as the last example where there ware report.py
and register.py
scripts.
Then since not all tasks will output a file let’s create a template for task that writes a yaml
file with metadata about the execution.
This will include the start_time
, end_time
and duration
among others.
If the task fails it will also show info about the failure.
When a task fails will output file with have a slightly different name so that if it is run another time it will try to repeat the task.
The full template would be:
luigi_default.py
import os
import time
from datetime import date, datetime
import luigi
import oyaml as yaml
PATH_LUIGI_YAML = "runs/"
class StandardTask(luigi.Task):
"""
Extends luigi task, instead of calling run, one must call run_std
Params:
mdate: date of execution
t_data: is a dictionary with instance data
worker_timeout: maximum time allowed for a task to run in seconds
"""
mdate = luigi.DateParameter(default=date.today())
worker_timeout = 1 * 3600 # Default timeout is 1h per task
t_data = {}
# This is meant to be overwritten
module = "change_this_to_module_name"
def output_filename(self, success=True):
""" Get output filename """
# output will be a yaml file inside a folder with date
uri = f"{PATH_LUIGI_YAML}{self.mdate:%Y%m%d}/"
# make sure folder exists
os.makedirs(uri, exist_ok=True)
# add task name
uri += self.__class__.__name__
# If task fails write a file with different name
# This allows re-runs to retry the failed task while keeping info about fails
if not success:
uri += datetime.now().strftime("_fail_%Y%m%d_%H%M%S")
return f"{uri}.yaml"
def output(self, success=True):
return luigi.LocalTarget(self.output_filename())
def save_result(self, success=True, **kwa):
""" Stores result as a yaml file """
# Store basic execution info
self.t_data["end_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.t_data["duration"] = time.time() - self.start_time
self.t_data["success"] = success
# Allow extra params like 'exception'
self.t_data.update(**kwa)
# Export them as an ordered yaml
with open(self.output_filename(success), "w") as stream:
yaml.dump(self.t_data, stream)
def on_failure(self, exception):
# If there is an error store it anyway
self.save_result(success=False, exception=repr(exception))
self.disabled = True
# If needed, do extra stuff (like log.error)
# End up raising the error to Luigi
super().on_failure(exception)
def run_std(self):
"""
This is what the task will actually do.
If it is not overwritten it will 'import module' and then run:
module.main(mdate)
"""
# By default run the 'main' function of the asked module
module = __import__(self.module)
module.main(self.mdate.strftime("%Y_%m_%d"))
def run(self):
# Store start time and task name
self.t_data["name"] = self.__class__.__name__
self.start_time = time.time()
self.t_data["start_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Run the task and store the resutls
self.run_std()
self.save_result()
Then is easy to create a StandardTask. The two task from the example below would be:
master.py
import luigi
from luigi_default import StandardTask, date
class RegisterTask(StandardTask):
module = "register"
class ReportTask(StandardTask):
module = "report"
class DoAllTask(luigi.WrapperTask):
mdate = luigi.DateParameter(default=date.today())
def requires(self):
return RegisterTask(self.mdate), ReportTask(self.mdate)
if __name__ == "__main__":
luigi.build([DoAllTask()])
So it is only needed to say the name of the script/package as parameter called module
of the StandardTask
instance.
This script/package needs a function called main
since is what would be run by default.
It is even possible to adapt the StandardTask
to do something different by overwritting the run_std
function.
For example:
class ReportTask(StandardTask):
def run_std():
with open(f"output.txt", "w") as stream:
stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
4. Automatically run luigi
If you want a process always runing and to be restarted on failure you should use supervisor.
You should you use it for luigid
.
For runing the tasks itselfs Luigi does not have a scheduler but you can use cron instead.
4.1. Install supervisor
First install it with:
sudo apt install supervisor -y
Then create some auxiliar folders that you will need:
#!/bin/bash
# For luigi logs
sudo mkdir /var/log/luigi/
# For cron logs (and give writte permission)
sudo mkdir /var/log/cron/
sudo chmod 777 /var/log/cron/
# This is to allow luigi to store the worker state
sudo mkdir /var/lib/luigi-server/
And now declare the luigid
service by creating the file /etc/supervisor/conf.d/luigid.conf
/etc/supervisor/conf.d/luigid.conf
[program:luigid]
command=/home/ubuntu/.local/bin/luigid
stopsignal=QUIT
stdout_logfile=/var/log/luigi/luigid.log
stderr_logfile=/var/log/luigi/luigid.error.log
autorestart=true
user=ubuntu
It is important to use the full path for the command that supervisor will run.
Be careful to set user
to one that has visbility of the luigid
file.
Remember you can call whereis luigid
to get the full path
Then start supervisor:
sudo supervisorctl reread
sudo service supervisor restart
# Check the result
sudo supervisorctl status
If there are no errors you can start using Luigi.
4.2. Create a virtual environment
First install virtualenv with:
pip3 install virtualenv
Then create the virtual environment:
# Make a folder for virtual environments
mkdir /home/ubuntu/venv/
# Create a virtualenv called vtasks
virtualenv -p /home/ubuntu/venv/ vtasks
Then to activate the environment you can call:
source /home/ubuntu/venv/vtasks/bin/activate
And deactivate
to deactivate the environment.
4.3. Script file to run luigi
Let’s create the run_luigi.sh
file with:
~/run_luigi.sh
#!/bin/bash
# Add ssh keys
eval `keychain --agents ssh --eval github_ssh`
# Go to desired path
cd /home/ubuntu/villoro_tasks/
# Activate virtual environment
source /home/ubuntu/venv/vtasks/bin/activate
# Git fetch and checkout
git fetch
git checkout master
git pull origin master
# Install requirements
pip install -r requirements.txt
# Run luigi
python src/master.py
deactivate
Do not forget the #!/bin/bash
part. If you do, the source
command won’t work.
This script does some things:
- Loading the ssh keys so that it can work with GIT.
- Change the path to the repository
- Activate the virtual environment
- GIT fetch and checkout new changes in master
- Install requirements
- Run Luigi
In order to run this you should give it executable permissions with:
chmod +x /home/ubuntu/run_luigi.sh
4.4. Runing luigi tasks with cron
Te best way to run Luigi automatically is using cron.
After doing all the previous steps you can set cron to run the script with:
crontab -e
Add this adding this line at the end:
0 4 * * * /home/ubuntu/run_luigi.sh >> /var/log/cron/luigi.log 2>&1
In this example it will run each day at 04:00 am GMT. If you want to run at different moment you can use Crontab.guru to check cron expressions.
It will also write the output as log file called /var/log/cron/luigi.log
.