Simple ETL Using Luigi
What is Luigi?
Luigi is a Python (2.7, 3.6, 3.7 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
Why Luigi?
- Luigi helps you to build complex pipelines of batch jobs.
- It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
- Luigi was built using Python.
- It is one of the most common ETL tools for workflow management
Installation
Luigi is a Python module. To install it, you just need to to type:
pip install luigi
Building Blocks
There are two fundamental building blocks of Luigi:
- Target class corresponds to a file on a disk, a file on HDFS or some kind of a checkpoint, like an entry in a database. Actually, the only method that Targets have to implement is the exists method which returns True if and only if the Target exists.
- Task class is a bit more conceptually interesting because this is where computation is done. There are a few methods that can be implemented to alter its behavior, most notably
run()
,output()
andrequires()
Task’s methods
requires()
is the first method executed, if exists. In order to execute the process, previous Luigi task(s) need to be executed first (as the requirement).- Tasks consume Targets that were created by some other task called
input()
. They usually also output targets calledoutput()
. run()
contains the actions you want to execute.
Practice Case
Let’s write an ETL story.
Imagine you want to create a report. The data you used comes from websites. You want to extract those data first before making the report. Here is the workflow diagram:
In this example, the steps to generate report are:
- Take the websites as the input to extract its data by scraping it
- The result from scraping is extracted data in a csv file
- Use that csv file to make report and save it as an excel file
Code
A code example for story above would be:
class ScrapeData(luigi.Task):def output(self):
return luigi.LocalTarget('scrape_table.csv')def run(self):
# write a scraping script to extract data from websites class GenerateReport(luigi.Task):def requires(self):
return ScrapeData()def output(self):
return luigi.LocalTarget('report.xlsx')def run(self):
# read extracted data in csv
data = pd.read_csv('file://scrape_table.csv')
# do some data manipulation and save it as excel file
pd.to_xslx(data, 'report.xslx')
Sources