Luigi and S3

One of Luigi’s two primitives is the Target. A Target is used to check for existence of data when determining if a Task can be run. It's also open to open a Target locally and read the data through a Luigi Task.

When working with data in S3, there are two ways to build targets. The first is to use Luigi's builtin support for shelling out to hadoop fs, which supports the s3n:// (S3-native filesystem). For example:

from luigi.hdfs import HdfsTarget

target = HdfsTarget("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data")
print(target.exists())

Configuration: Using an HdfsTarget requires setting fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey in core-site.xml.

Note: Shelling out to hadoop fs can be slow (you're starting up a JVM, so it's usually at least 1 second), which can add up when checking a lot of files.

The second option is to use Luigi's built-in S3Target (or one of its subclasses). In most situations, the S3EmrTarget is the most appropropriate—this Target checks for existence of a _SUCCESS flag rather than existence of a "directory" (S3 doesn't really support directories—the closest approximation is a prefix-query).

Note that this choise is important: when a MapReduce job outputs to HDFS, it typically renames the output atomically (this is only an operation on metadata in the NameNode) to the final destination. But this is not possible with S3, which doesn't support atomic rename. Thus, the best solution is often to set mapreduce.fileoutputcommitter.marksuccessfuljobs to true, and check for the existence of a _SUCCESS flag (which will be written after the files are all moved into their final destination).

In any case, here's a code example:

from luigi.s3 import S3Target, S3EmrTarget

target = S3Target("s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data")
print(target.exists())

target = S3EmrTarget("s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/")
print(target.exists())  # checks for s3://.../1gram/_SUCCESS, which won't be there.

Configuration: * You must have boto installed. * You must have a [s3] section in your client.cfg with values for aws_access_key_id and aws_secret_access_key (IAM credentials should also be supported, but I haven't tried).

Hybrid Targets

Luigi also supports a LocalFileSystem and local File. It can be useful to use these for local testing but to use s3 in production. In this case, it's easy to write a delegating Target. For example:

from luigi.s3 import S3Target

def DelegatingTarget(self, path, *args, **kwargs):
    if path.startswith("s3://"):
        return S3EmrTarget(path, *args, **kwargs)
    return File(path, *args, **kwargs)

s3_target = DelegatingTarget("s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/")
local_target = DelegatingTarget("/tmp/foo")

Luigi and Redshift

Luigi has support for loading data stored in S3 into Amazon Redshift, which is a data warehousing system. The S3CopyToTable, S3JsonCopyToTable, and RedshiftManifestTasks Tasks each implement a variant of loading data.

In order to keep track of which data has been loaded into Redshift, Luigi uses a maker table, which keeps track of when a table was updated. It uses the RedshiftTarget to insert and check for entries into the marker table.

With the Task and Target, we have nearly all the pieces in place to build a Task to load data into Redshift. The last missing piece is some input data. A gotcha that tends to trip up folks new to luigi is that a Task can only require other tasks. Thus, we need an ExternalTask for the input data (unless there is already a Luigi Task for generating the data). There are some pre-build ExternalTasks for data stored in S3—we'll be using S3PathTask

Here's an example task to load the eng-gb 1grams into Redshift:

from luigi.s3 import S3PathTask
from luigi.contrib.redshift import RedshiftTarget, S3CopyToTable


class OneGramsToRedshift(S3CopyToTable):
    s3_load_path = luigi.Parameter(default="s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data")
    aws_access_key_id = luigi.Parameter()
    aws_secret_access_key = luigi.Parameter()

    def requires(self):
        return S3PathTask(path=self.s3_load_path)

    def create_table(self, connection):
        connection.cursor().execute(
            """CREATE TABLE {table} (

            )"""
        .format(table=self.table))

TODO