Integrating Elastic Map-Reduce with Resque

Background

Like many companies, Shareaholic uses Hadoop/map-reduce to crunch large amounts of data offline at scheduled intervals. We currently use Amazon’s Elastic Map-Reduce (EMR) for this, which provides a scalable, on-demand Hadoop cluster built on top of EC2 and S3. It is a very convenient (though somewhat expensive) solution for engineers comfortable with AWS and who don’t want to build, maintain, and scale their own in-house cluster. S3 provides the implementation of HDFS where input and output data are stored, and one can simply kick off a job through the web interface, command-line tool (version with Ruby 1.9 support), or web API and specify the number and type of machines used.

We are also big fans of Resque, a job queueing framework for Ruby backed by Redis. We particularly like the monitoring capabilities provided by ResqueWeb, which allows you to see which jobs are running, which are queued, and most importantly which ones have failed and why. We use Cavalcade, written by a Shareaholic engineer, on top of Resque to provide the ability to schedule jobs via cron, usually on a nightly, weekly, or monthly basis.

When we first began using EMR, we wanted to be able to monitor the map-reduce jobs in the same place and fashion as we did our Resque jobs. So we developed a mini-framework for wrapping the launching and monitoring of EMR jobs inside of Resque jobs. It’s been working pretty well for us so far and we thought we’d share it with the larger community. It is not the most elegant possible solution; namely, it uses Amazon’s command-line tool to interact with EMR rather than the Web API or rather than using the tool as a library (it is written in Ruby). We started with the command-line tool because we were familiar with it and it’s easy to understand and use, and it’s been working so well we haven’t bothered to go back and change it.

Implementation

We created a base Cavalcade/Resque class to provide all of the functionality for interacting with Amazon EMR via the command-line tool; all map-reduce-based Resque jobs inherit from this class and either call or override certain key methods.

# Note: The code in this article is not complete 
# and is just for demonstration purposes.

class MapreduceJob < Cavalcade::Job

  def run_job(opts = {})
    inputs = build_input(opts)
    config_file = generate_config(opts, inputs)
    upload_script_files

    cmd_params = [
      "./elastic-mapreduce --create",
      "--json #{config_file.path}",
      "--num-instances #{opts['num_instances']}",
      "--instance-type #{opts['instance_type']}",
      "--access-id #{ENV['AWS_ACCESS_KEY_ID']}",
      "--private-key #{ENV['AWS_SECRET_ACCESS_KEY']}",
      "--log-uri s3n://your.bucket/logs/",
      "--region us-east-1"
    ]

    launch_mapreduce(cmd_params.join(" "))
    wait_for_job
    post_job_action opts
  end
end

The above snippet is a good starting point as it clearly lays out the steps that the job will take after a subclass calls run_job. First, the data inputs will be generated for the job. These inputs are generally an array of S3 URLs pointing to files; in our case (and we imagine many others), these URLs will be generated at run time based on the current date and options passed in. Next, the subclass will generate and return a configuration file in JSON that is passed to the command-line tool and tells EMR the location of the inputs, output, and map/reduce scripts for each phase of the job. It then uploads any needed script files specified by the subclass to S3 as this is where EMR will look for them. It then calls the command-line tool, passing in the configuration file, credentials, and how many EC2 nodes to use for the computation. Once the job is launched, it waits for it to finish by querying the command-line tool at regular intervals. If the job finishes successfully, a subclassed post_job_action method is called to handle the output. We will go into each of these steps in more detail, as well as implement them, in the following sections.

Inputs

Our jobs at Shareaholic operate primarily on log files stored in S3. These log files are of the format TYPE_OF_LOG_FILE-YYYY-MM-DD-TIMESTAMP.log, and contain data in 15 minute intervals. Most of the jobs are run nightly and operate on the previous day’s log files.  While there are obviously other approaches, we currently use cron jobs to kick off our scheduled jobs and do not want to have to specify the specific log files as input each day, as that would require editing the crontab each time. Instead, we want to pass an alias of some sort to our jobs, such as “last X days worth of log files” or “the output from the last successful run of job Y”. We leave it to the MapReduceJob class and its subclasses to interpret those aliases and turn them into an array of S3 file inputs.

Since most of our jobs use the same strategy of operating on the last X days worth of log files, we have a default implementation of build_input in the base map-reduce class; however, subclasses are free to override it and some of ours do. For this article though we will just cover our default implementation. In short, build_input still takes an array of S3 filenames as inputs, except that any references to dates are replaced by aliases/splats that are expanded by the build_input method. For example, we very often pass input for a job that works on the last X days worth of log files like so:

# From our Chef file:
cron "compute_publisher_event_counts" do
  hour "0" # Every day at midnight
  minute "0"
  command %Q( bundle exec cavalcade enqueue -e environment.rb -j SubClassesMRJob -p '"inputs" => ["s3n://your.bucket/logs/log-\$[last_days:3]*"], "num_instances" => 5, "instance_type" => "m1.large" )
    user "root"
  end

The important line here is:

"inputs" => ["s3n://your.bucket/logs/log-\$[last_days:3]*"]

The URL of a log file stored in S3 is passed as input, but instead of a particular date and time the alias $[last_days:3] is inserted into the filename (the dollar sign is escaped because it is a Unix control character). The build_input method will expect the inputs array to be present in the params passed to the job (through “opts”) and will loop through each URL, searching for and parsing tokens contained in $[ … ] blocks and expanding them into the appropriate dates. In the example above, if the current day when the job runs is Oct 31, 2012, then build_input might transform the above string into:

[
  "s3n://your.bucket/logs/log-2012-10-28*",
  "s3n://your.bucket/logs/log-2012-10-29*",
  "s3n://your.bucket/logs/log-2012-10-30*"
]

Or, to reduce the risk of having too large a request if there are too many inputs files, using Unix expansion:

[
  "s3n://your.bucket/logs/log-{2012-10-28,2012-10-29,2012-10-30}*"
]

EMR supports the wild-card and Unix expansion characters, so take advantage of them; in our case, we use them to include all logs on the given date(s) regardless of their timestamps. I’m not going to paste our default implementation of build_input here, as it is relatively trivial to implement. You may also want to consider implementing other aliases, like days_ago:X and last_days_from:Y:Z, as you may often need to go back and rerun failed jobs from a certain date.

Configuration File

The base MapReduceJob class does not implement the generate_config method, as each configuration will be unique to the individual job and will thus be implemented by the subclass for that job. The JSON file format for EMR configuration is not well documented, so I will give an example template here.

def generate_config(opts, inputs)

  # Set output location
  @output_folder = get_job_name() + "_" + Time.now.to_i.to_s

  config = [
    {
      "Name" => "Step 1: Name of step 1",
      "ActionOnFailure" => "TERMINATE_JOB_FLOW",
      "HadoopJarStep" => {
        "Jar" => "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
        "Args" => inputs.map{|input| ["-input", input]}.flatten.concat([
          "-output", "s3n://your.bucket/output/intermediate/#{get_job_name()}/#{@output_folder}/step1",
          "-mapper", "s3n://your.bucket/tasks/map_script.rb",
          "-reducer", "s3n://your.bucket/tasks/reduce_script.rb",
          "-cacheFile", "s3n://your.bucket/tasks/util_lib.rb#util_lib.rb"
        ])
      }
    },
    { 
      "Name" => "Step 2: Name of step 2", 
      "ActionOnFailure" => "TERMINATE_JOB_FLOW", 
      "HadoopJarStep" => { 
        "Jar" => "/home/hadoop/contrib/streaming/hadoop-streaming.jar", 
        "Args" => [
           "-input",     "s3n://your.bucket/output/intermediate/#{get_job_name()}/#{@output_folder}/step1", 
           "-output",    "s3n://your.bucket/output/#{get_job_name()}/#{@output_folder}/", 
           "-mapper",    "s3n://your.bucket/tasks/map_script2.rb",
           "-reducer",   "s3n://your.bucket/tasks/reduce_script2.rb"
        ]
      } 
    }
  ]

  # Note: The code for writing the file is actually
  # in the base class, we just put it here for
  # clarity. Any common functionality should
  # generally go in the base class.
  json_file = File.open(get_job_name + Time.now.to_i.to_s + ".json", "w")
  json_file.write(config.to_json)
  json_file.fsync
  json_file.close

  return json_file
end

Let’s start at the top. First, you will need to set an output directory in S3 to store your job results. This directory must not already exist, so it is best to use a timestamp or some other unique identifier in the directory name. It is worth storing it in a member variable, as you may want access to it in other methods, particularly the post_job_action method that processes the output. You’ll notice a call to get_job_name. Each subclass should implement this to identify itself.

def get_job_name
  return "my-log-job"
end

Next comes the creation of the configuration JSON object, which is probably best constructed in Ruby and then written to a file using the to_json method. The configuration object is an array of steps; the example above has two, but you will often only have one or possibly more than two. Each step consists of a map phase and reduce phase with their own map and reduce scripts. The inputs for the first phase, which we talked about in the previous section, are fed into the map script specified in the first step, then sorted by key, then piped into the reduce script. In the above example, we store the results of this first step in an intermediate output directory. The next step points to this directory as its input, and its contents are fed into the second map and reduce scripts and written to a final output directory. Adding or removing steps simply involves extending this chaining or removing it.

Each step takes several parameters, such as the name of the step (largely unimportant), what to do on failure (see EMR documentation for other options), and which Jar file to use for that step. We currently write our map-reduce scripts in Ruby and thus use the Hadoop Streaming Jar, a standard jar file that comes installed automatically on each EMR EC2 instance at the location specified above. You can specify your own jar file if you prefer writing your jobs in Java. The streaming jar file has some arguments of its own; namely, inputs, outputs, and the map-reduce script files. The inputs we already computed with build_input and passed into this method as an array of S3 URLs. All we need to do is expand them into a series of “-input URL” parameters and insert them into the configuration as in the above example (you can have multiple -input arguments). The output directory is set depending on whether you have multiple steps you are chaining together or not.

You also need to specify the location of your mapper and reducer scripts in S3. In the next section I will talk about uploading your scripts, which are likely stored in whatever repository contains your Resque code, to a location in S3 where they can be accessed by EMR. For now, just put the location you intend to upload them and the name of the scripts into the configuration object. If you want to pass in arguments to these scripts, you can specify them like so:

"-mapper",    "s3n://your.bucket/tasks/map_script2.rb arg1 arg2",

Additionally, you may want to access other files from your map-reduce scripts, such as common utility libraries or data files. You can do this by specifying one or more cacheFile arguments that point to the location of the file in S3. The trick is that you also must include a fragment (#) after the file name specifying how that file will be referenced/included in your scripts. In the step 1 example above, to include the util library I just need to have the following in my mapper:

require 'util_lib'

EMR automatically copies this file along with the mapper/reducer scripts to each node. This file must also be uploaded to S3 as described in the next section.

Once the configuration object is complete, it is written to a JSON file on disk and returned to the base class, which will pass it to the command-line tool.

Upload Script Files

S3 provides the implementation of HDFS for EMR, so you will need to upload your map/reduce script files, as well as any library or data files, to S3 in order for EMR to use them. These files are probably stored in the same repository as your Resque code, and are likely to change as you update them over time, so it makes sense to have the base class upload them each time a job is kicked off to make sure the most recent version is available in S3. Each subclass is responsible for providing the base class with a list of files to upload.

# Implemented by each subclass
def files_to_upload
  return {
    "./mapreduce_tasks/map_script.rb" => "tasks/map_script.rb",
    "./mapreduce_tasks/reduce_script.rb" => "tasks/reduce_script.rb",
    "./mapreduce_tasks/map_script2.rb" => "tasks/map_script2.rb",
    "./mapreduce_tasks/reduce_script2.rb" => "tasks/reduce_script2.rb",
    "./lib/util.rb" => "tasks/util.rb"
  }
end

# Implemented by the base class
def upload_script_files
  s3 = RightAws::S3Interface.new(
    ENV['AWS_ACCESS_KEY_ID'],
    ENV['AWS_SECRET_ACCESS_KEY']
  )

  files_to_upload().each do |src, dest|
    s3.put("your.bucket", dest, File.open(src))
  end
end

We currently use right-aws to interact with S3. As you can see, the subclass provides a mapping of the source files and their destination locations, and the base class retrieves this list and uploads them. Pretty straightforward, though you will likely want to add logging and checks to make sure the upload was successful.

Running the Job

Once the configuration object has been written to a file and all of the script files have been uploaded, it is time to kick off the job by passing the configuration to the command-line tool and specifying some additional parameters (consult our initial base class definition). In order to do this you will need to have the command-line tool installed on whatever machine is launching the Resque jobs. The tool performs various functions which you can read about in the documentation; in this case we are interested in the –create command, which launches a job in EMR. The options are pretty self-explanatory, but briefly: the –json argument points to the JSON configuration file we discussed in the previous sections. The –num-instances and –instance-size arguments specify how many and what type of EC2 machines you want to throw at the job (for example, “m1.large”). I usually let these be passed into the job rather than hardcoding them. The access key and id are your AWS credentials, and –log-uri specifies where in S3 the logs are written. There are some additional options we will discuss later.

How you choose to make the system call to the command line tool is up to you. Below is our implementation, and it very well may not be the most ideal:

def launch_mapreduce(command)
  out_str = execute(command).chomp

  begin
    @job_flow = out_str.match(/^Created job flow (.*)$/)[1]
  rescue
    raise "There was an error launching the mapreduce task: Unexpected output: #{out_str}"
  end
end

def execute(command)
  out_str, status = Open3.capture2e(command)

  # Need to wait for the command to finish executing
  while (success = status.success?).nil?
    sleep(1)
  end

  if !success
    raise "There was an error executing the command #{command}: #{status}"
  else
    return out_str
  end
end

These methods combined make the system call and then wait for the command-line tool to write a successful message to STDOUT. EMR jobs are each given an identifier, which we capture in the member variable @job_flow for later use in monitoring.

Monitoring the Job

This is where the convenience of integrating EMR and Resque starts to pay dividends. Normally you would have to monitor EMR jobs either from the command-line tool or the AWS web interface, which for our purposes was not ideal. We want to be able to do that from the Resque dashboard, which means we need to keep the Resque job alive until the EMR job finishes, or fail the Resque job if the EMR job fails. This involves querying the command-line tool at regular intervals to check on the status of the job. Below is our implementation.

def wait_for_job
  begin
    sleep(10)
  end while !job_complete?()
end

def job_complete?
  retries = 2
  sleep_time = 60

  begin
    out_str = execute("./elastic-mapreduce --describe #{@job_flow} --access-id #{ENV['AWS_ACCESS_KEY_ID']} --private-key #{ENV['AWS_SECRET_ACCESS_KEY']}")
    description = JSON.parse(out_str)
    status = description["JobFlows"][0]["ExecutionStatusDetail"]["State"]
    last_state_change = description["JobFlows"][0]["ExecutionStatusDetail"]["LastStateChangeReason"]
  rescue
    if retries > 0
      retries -= 1
      sleep_time *= 2
      sleep(sleep_time) unless @debug
      retry
    else
      raise "There was an error while querying for job status information on job #{@job_flow}: Invalid output: #{out_str}"
    end
  end

  if status == "COMPLETED"
    return true
  elsif status == "FAILED"
    raise "Job #{@job_flow} has failed: #{last_state_change}"
  elsif status == "TERMINATED"
    raise "Job terminated by user"
  else
    return false
  end
end

The first method simple checks to see if the job has completed every 10 seconds, while the second method provides the actual implementation. It asks the command-line tool to –describe the job flow ID captured earlier, again providing our AWS credentials. This command returns a JSON blob, which you can find documentation for online or simply by taking a look yourself. We are interested in only two fields: the current status of the job and the last state change, which we will need if there was an error. You’ll notice this is all wrapped in a retry block with (geometric?) back-off, as we have encountered instances in the past where the command-line tool failed to reach the server and assumed the job had terminated when it had in fact not.

There are three possible states we are interested in. If the job is complete, then we can break out of the monitoring loop and move on to the post job action. If it failed, we need to raise an exception and specify the last state change we know about. This will cause the Resque job itself to fail, with at least a hint of what went wrong. Deeper analysis will require going through the log files in S3. Lastly, if the job was terminated by the user, we’d like to make that distinction.

Post Job Action

If the EMR job completed successfully, there will now be result files in S3 in the location we specified in the configuration. The result data is usually divided up into several part-XXXXXX files, depending on how many nodes you used for the job. Usually you’ll want to do some post-processing with that data that is not really suitable for map-reduce, such as storing it in a database. This happens to be a perfect task for a Resque job, and rather than write a separate one, you can just implement a post_job_action method in your MapReduceJob subclass that is called when the EMR job finishes; after all, you already have the location of the @output directory in S3 and any other params needed when you launched the job.

While each subclass should implement its own post_job_action, we do have a default one in the base class that simply combines the part-* files in S3 into one result file (not recommended if your results are large). Additionally, we have some common helper methods for reading through the result files. We’ll use this default implementation as an example for this section.

def post_job_action opts={}

  output_file = File.open("/tmp/mr_results", "w")

  read_output(opts) do |line|
    line = line.chomp
    next if line.empty?

    output_file.puts line
  end

  output_file.close

  upload_file_to_s3(output_file.path, "output/#{get_job_name()}/#{@output_folder}/results.out", "your.bucket")
  File.delete(output_file.path)
end

def read_output opts

  results = get_files_from_s3("output/#{get_job_name()}/#{@output_folder}/", "your.bucket")

  results.each do |result_file|

    next if result_file.include?("_SUCCESS")
    filename = download_file_from_s3("your.bucket", result_file)
    file = File.open(filename, "r")

    file.each_line do |line|
      begin
        line = line.chomp
        next if line.empty?
        yield line
      rescue => e
        log_error
      end
    end
    file.close
    File.delete(filename)
  end
end

The first method is our default implementation of post_job_action. It uses the helper method (explained below) to read through the output and write it to a local file, which is then uploaded back to S3. This is sometimes convenient when your result data is small and you don’t want to search through a bunch of mostly empty files in S3 and combine them manually.

The second method is a helper method that actually abstracts the reading of the result data, taking care of grabbing the list of files from S3, downloading each one, and yielding each line to the consuming job as if it were one contiguous block of output. We’ll leave out the implementation of the various S3 methods as they are fairly straightforward with whichever gem you use.

After that, we’re all done! If the post job action completes successfully, the Resque job will exit and report success.

Conclusion

With the base class implemented, you can now begin creating subclasses for your different jobs and overriding/implementing the methods described above to fit their specific needs. Once you have them in whatever scheduling framework you use (cron, etc.), you’ll be able to see them launch and run (and possibly fail) all in the same location, the Resque dashboard.

Tips and Tricks

I’m going to use this section to describe some of the random things we learned while implementing this integration that others might find useful, that don’t really fit in the other sections.

Writing Map/Reduce Scripts in Ruby

Map and reduce scripts written in Ruby tend to follow the templates below.

# mapper.rb
#!/usr/bin/env ruby

# require any std libs or cached files here

# If you have any static arguments passed in,
# you need to remove them from ARGF here.
arg1 = ARGF.shift
arg2 = ARGF.shift

ARGF.each_line do |line|
  # Input data tends to be messy, wrap
  # each line in a rescue block unless
  # you want your job to grind to a halt
  # because of a UTF-8 error or something.
  begin
    line = line.chomp
    next if line.empty?

    # If you just output the line, you're
    # essentially writing an identity mapper,
    # which is useful in many situations,
    # usually in step 2+ of MR jobs.
    # Output must be of the form:
    #   key\tvalue
    # Value can be multiple values encoded
    # in json or some other mechanism.
    puts line
  rescue => e
    # Perhaps log to $stderr
  end
end
# reducer.rb
#!/usr/bin/env ruby

prev_key = nil
state = Hash.new(0)

ARGF.each_line do |line|

  line = line.chomp
  next if line.empty?

  key, val = line.split("\t")
  next if key.nil? || val.nil?

  # Check if we are finished with the current
  # key, and if so output results.
  if prev_key && key != prev_key
    # Output state here
    puts "#{prev_key}\t#{state[:count]}"
    state = Hash.new(0)
  end

  prev_key = key
  # Update state here
  state[:count] += value.to_i
end

# Need to output one last time, or we
# will lose the last key.
puts "#{prev}\t#{state[:count]}"

The mapper is pretty self-explanatory. It is where you want to clean your input data and get it into the tab-separated key/value form consumable by the reducer. The reducer is a little more complicated. If you recall, Hadoop will sort your map output by key and then distribute the keys and their values among the nodes. The reducer works by reading the input line by line, and accumulating some state for a given key (in the example above, we are just adding up all the values, but the state can obviously be much more complex). When we encounter a line with a different key, we know that we’ve gone through all of the data for the previous key, and can thus write our state to the output stream and continue on with the new key. This output is what will either end up in the result files or get piped to the next set of map/reduce steps for further sorting/processing.

There is a convenient way to test your map-reduce scripts without resorting to launching an EMR job or even using a local installation of Hadoop. Simply use the following at the command line:

cat input | ruby mapper.rb | sort | ruby reducer.rb > output

This will simulate what happens during an EMR job, but on a much smaller scale.

Bootstrapping and Other Options

The command-line tool has some additional commands that are useful both inside Resque and manually. One of those is bootstrapping, which runs a script on each EC2 node when it is initialized and allows you to install third-party gems, among other things. You can read more about this on the AWS EMR website. Briefly, you simply pass the –bootstrap option to the command-line tool pointing to a shell script stored in S3. Amazon has a few standard ones, or you can create your own.

There are a few useful manual commands as well for use when testing and debugging. –list –active will tell you which jobs are currently running and what step they are on, as well as their job ID which you can then use to terminate or look in the log files. The –terminate command, followed by the job ID, will end the specified job.

Interpreting the Log Files

Amazon has documentation on debugging EMR jobs that I recommend reading through if you get into trouble, but there are a few things worth emphasizing here. First, in most cases Resque will only tell you if your EMR job has failed and its ID, but little else. In order to get to the root of the problem you will have to look at the log files stored in S3, the location of which you specified in the run_job method.

There are an abundance of log files for even just one job and it can be overwhelming at first, but you’ll find that there are only a few important ones and despite the sheer amount of data there is actually very little useful information when it comes to debugging. Contrary to what you might expect, there also tends to be very little info in any of the stderr log files either.

The first place to look is in the steps/ directory, which contains a folder for each step in the EMR job. They each contain a handful of log files that are worth looking through if they contain anything. More than likely there will only be information in the syslog files, telling you the progress of the job and possibly a generic error. Sometimes it will tell you to look at a particular ‘task attempt’, which is usually the wrong one but nonetheless means you need to dig into the task attempt logs.

In the task-attempts/ folder there is again a directory for each job step, and you’ll probably want the last one as that is the one that failed. Inside there will be potentially a ton of folders containing logs for each map and reduce instance that ran. Most of these contain no useful information. The best way I’ve found to look for clues is to start at the bottom and work your way up, looking for folders that contain more than just “log-index”, and especially any that have an stderr file with more than a few bytes in it. If you’re lucky, you’ll find one of these and there will be a stacktrace corresponding to some obvious error in your script file that you need to fix.

If you’re not lucky, you’ll need to look in task_attempts/step X/Y/syslog, which contains mostly junk but may have a stack trace in there somewhere telling you what went wrong. It might be a syntax error in your script, or it may be an infrastructure-related error. One common one is the ‘out of memory’ error. If you’ve written your reduce scripts in a way that accumulates too much state in memory (as in, your lines are ‘wide’) you can run into this. Usually you can break the reduce script into multiple steps that use less memory. You can also try specifying a new version of Hadoop and the AMI (see documentation) that has hi-memory settings for Hadoop on by default.

The most annoying and cryptic error you may encounter is “Broken Pipe” or “Bad File Descriptor”. From what I’ve found off of Google, this means something went wrong in your script that caused it to terminate, which breaks the pipe from STDIN. You can attempt to wrap everything in your scripts in hard rescue blocks (which means catching Exception => e, not just StandardError), but honestly we have not quite slain this dragon yet ourselves.

As always, good luck!

Comments are closed.