Simple ETL Using Luigi

Rio Anggara Sufilin
2 min readFeb 20, 2021
Photo by Florian Wächter on Unsplash

What is Luigi?

Luigi is a Python (2.7, 3.6, 3.7 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.

Why Luigi?

  • Luigi helps you to build complex pipelines of batch jobs.
  • It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
  • Luigi was built using Python.
  • It is one of the most common ETL tools for workflow management

Installation

Luigi is a Python module. To install it, you just need to to type:

pip install luigi

Building Blocks

There are two fundamental building blocks of Luigi:

  • Target class corresponds to a file on a disk, a file on HDFS or some kind of a checkpoint, like an entry in a database. Actually, the only method that Targets have to implement is the exists method which returns True if and only if the Target exists.
  • Task class is a bit more conceptually interesting because this is where computation is done. There are a few methods that can be implemented to alter its behavior, most notably run() , output() and requires()

Task’s methods

  • requires() is the first method executed, if exists. In order to execute the process, previous Luigi task(s) need to be executed first (as the requirement).
  • Tasks consume Targets that were created by some other task called input(). They usually also output targets called output() .
  • run() contains the actions you want to execute.

Practice Case

Let’s write an ETL story.

Imagine you want to create a report. The data you used comes from websites. You want to extract those data first before making the report. Here is the workflow diagram:

Luigi workflow diagram example

In this example, the steps to generate report are:

  • Take the websites as the input to extract its data by scraping it
  • The result from scraping is extracted data in a csv file
  • Use that csv file to make report and save it as an excel file

Code

A code example for story above would be:

class ScrapeData(luigi.Task):def output(self):
return luigi.LocalTarget('scrape_table.csv')
def run(self):
# write a scraping script to extract data from websites
class GenerateReport(luigi.Task):def requires(self):
return ScrapeData()
def output(self):
return luigi.LocalTarget('report.xlsx')
def run(self):
# read extracted data in csv
data = pd.read_csv('file://scrape_table.csv')
# do some data manipulation and save it as excel file
pd.to_xslx(data, 'report.xslx')

Sources

--

--