This is a simple walk-through of an example usage of Luigi. Online there is the excellent documentation of Spotify themselves. You can find all bits and bytes out there to create your own pipeline script. Also, there are already a few blog posts about what is possible when using Luigi, but then – I believe – it’s not very well described how to implement it. So, in my opinion there is either too much information to just try it out or too few information to actually get started hands-on. Also, I’ll mention a word about security.
Therefore, I publish a full working example of a minimalist pipeline from where you can start, copy and paste everything you need
These are the question I try to answer:
- What is Luigi and when do I want to use it?
- How do I setup the Luigi scheduler?
- How do I specify a Luigi pipeline?
- How do I schedule a Luigi pipeline?
- Can I use Luigi with a secure Hadoop cluster?
- What I like about Luigi?
What is Luigi?
Luigi is a framework written in Python that makes it easy to define and execute complex pipelines in a consistent way. You can use Luigi …
- … when your data is processed in (micro) batches, rather than it is streamed
- … when you want to run jobs that depend on (many) other jobs.
- … when you want to have nice visualizations of your pipelines to keep a good overview.
- … when you want to integrate data into the Hadoop ecosystem.
- … when you want to do any of the above and love Python.
Every pipeline can actually be tested using the
--local-scheduler tag in the command line. But for production you should use a central scheduler running on one node.
The first thing you want to do is to create a user and a group the scheduler is running as.
groupadd luigi useradd -g luigi luigi
The second step is to create a Luigi config directory.
sudo mkdir /etc/luigi sudo chown luigi:luigi /etc/luigi
You also need to install Luigi (and Python and pip) if you did not do that already.
pip install luigi
It’s now time to deploy the configuration file. Put the following file into
/etc/luigi/luigi.cfg. In this example the Apache Pig home directory of a Hortonworks Hadoop cluster is specified. There are many more configuration options listed in the official documentation.
[core] default-scheduler-host=www.example.com default-scheduler-port=8088 [pig] home=/usr/hdp/current/pig-client
Don’t forget to create directories for the process id of the luigi scheduler daemon, the store log and libs.
sudo mkdir /var/run/luigi sudo mkdir /var/log/luigi sudo mkdir /var/lib/luigi chown luigi:luigi /var/run/luigi chown luigi:luigi /var/log/luigi chown luigi:luigi /var/lib/luigi
You are now prepared to start up the scheduler daemon.
sudo su - luigi luigid --background --port 8088 --address www.example.com --pidfile /var/run/luigi/luigi.pid --logdir /var/log/luigi --state-path /var/lib/luigi/luigi.state'
A Simple Pipeline
We are now ready to go. Let’s specify an example pipeline that actually can be run without a Hadoop ecosystem present: It reads data from a custom file, counting the number of words and writing the output to a file called
count.txt. In this example two of the most basic task types are used:
luigi.ExternalTask which requires you to implement the
output method and
luigi.Task which requires you to implement the
run methods. I added pydocs to all methods and class definitions, so the code below should speak for itself. You can also view it on Github.
import luigi class FileInput(luigi.ExternalTask): ''' Define the input file for our job: The output method of this class defines the input file of the class in which FileInput is referenced in "requires" ''' # Parameter definition: input file path input_path = luigi.Parameter() def output(self): ''' As stated: the output method defines a path. If the FileInput class is referenced in a "requires" method of another task class, the file can be used with the "input" method in that class. ''' return luigi.LocalTarget(self.input_path) class CountIt(luigi.Task): ''' Counts the words from the input file and saves the output into another file. ''' input_path = luigi.Parameter() def requires(self): ''' Requires the output of the previously defined class. Can be used as input in this class. ''' return FileInput(self.input_path) def output(self): ''' count.txt is the output file of the job. In a more close-to-reality job you would specify a parameter for this instead of hardcoding it. ''' return luigi.LocalTarget('count.txt') def run(self): ''' This method opens the input file stream, counts the words, opens the output file stream and writes the number. ''' word_count = 0 with self.input().open('r') as ifp: for line in ifp: word_count += len(line.split(' ')) with self.output().open('w') as ofp: ofp.write(unicode(word_count)) if __name__ == "__main__": luigi.run(main_task_cls=CountIt)
Schedule the Pipeline
To test and schedule your pipeline create a file
test.txt with arbitrary content.
We can now execute the pipeline manually by typing
python pipe.py --input-path test.txt
Use the following if you didn’t set up and configure the central scheduler as described above
python pipe.py --input-path test.txt -local-scheduler
If you did everything right you will see that no tasks failed and a file count.txt was created that contains the count of the words of your input file.
Try running this job again. You will notice that Luigi will tell you that there already is a dependency present. Luigi detects that the count.txt is already written and will not run the job again.
Now you can easily trigger this pipeline on a daily base by using, e.g., crontab in order to schedule the job to run, e.g., every minute. If your input and output file has the current date in the filename’s suffix, the job will be triggered every minute, but successfully run only exactly once a day.
In a crontab you could do the following:
1 * * * * python pipe.py --input-path test.txt
The cool thing about Luigi is, that you basically don’t need to worry much about security. Luigi basically uses the security features of the components it interacts with. If you are, e.g., working on a secure Hadoop cluster (that means on a cluster, where Kerberos authentication is enforced) the only thing you need to worry about, is that you obtain a fresh Kerberos ticket before you trigger the job – given that the validity of the ticket is longer than the job needs to finish. I.e., when you schedule your pipeline with cron make sure you do a kinit from a keytab. you can check out my answer to a related question on the Hortonworks community connection for more details on that (https://community.hortonworks.com/questions/5488/what-are-the-required-steps-we-need-to-follow-in-s.html#answer-5490) .
What do I like about Luigi?
It combines my favourite programming language and my favourite distributed ecosystem. I didn’t go too much into that now. But Luigi is especially great because of its rich ways to interact with Hadoop Ecosystem services. Instead of a LocalTarget you would rather use HdfsTargets or Amazon S3Targets. You can define and run Pig jobs and there even is a Apache Hive client built in.