Luigi and AWS
Luigi and S3
One of Luigi’s two primitives is the Target. A Target
is used to check for existence of data when determining if a Task
can be run. It's also open to open a Target
locally and read the data through a Luigi Task
.
When working with data in S3, there are two ways to build targets. The first is to use Luigi's builtin support for shelling out to hadoop fs
, which supports the s3n://
(S3-native filesystem). For example:
Configuration: Using an HdfsTarget
requires setting fs.s3n.awsAccessKeyId
and fs.s3n.awsSecretAccessKey
in core-site.xml
.
Note: Shelling out to hadoop fs
can be slow (you're starting up a JVM, so it's usually at least 1 second), which can add up when checking a lot of files.
The second option is to use Luigi's built-in S3Target (or one of its subclasses). In most situations, the S3EmrTarget is the most appropropriate—this Target
checks for existence of a _SUCCESS
flag rather than existence of a "directory" (S3 doesn't really support directories—the closest approximation is a prefix-query).
Note that this choise is important: when a MapReduce job outputs to HDFS, it typically renames the output atomically (this is only an operation on metadata in the NameNode) to the final destination. But this is not possible with S3, which doesn't support atomic rename. Thus, the best solution is often to set mapreduce.fileoutputcommitter.marksuccessfuljobs to true
, and check for the existence of a _SUCCESS
flag (which will be written after the files are all moved into their final destination).
In any case, here's a code example:
Configuration:
* You must have boto installed.
* You must have a [s3]
section in your client.cfg
with values for aws_access_key_id
and aws_secret_access_key
(IAM credentials should also be supported, but I haven't tried).
Hybrid Targets
Luigi also supports a LocalFileSystem and local File. It can be useful to use these for local testing but to use s3 in production. In this case, it's easy to write a delegating Target. For example:
Luigi and Redshift
Luigi has support for loading data stored in S3 into Amazon Redshift, which is a data warehousing system. The S3CopyToTable, S3JsonCopyToTable, and RedshiftManifestTasks Tasks each implement a variant of loading data.
In order to keep track of which data has been loaded into Redshift, Luigi uses a maker table, which keeps track of when a table was updated. It uses the RedshiftTarget to insert and check for entries into the marker table.
With the Task and Target, we have nearly all the pieces in place to build a Task to load data into Redshift. The last missing piece is some input data. A gotcha that tends to trip up folks new to luigi is that a Task can only require
other tasks. Thus, we need an ExternalTask for the input data (unless there is already a Luigi Task
for generating the data). There are some pre-build ExternalTasks for data stored in S3—we'll be using S3PathTask
Here's an example task to load the eng-gb 1grams into Redshift: