Joe Crobak's Website

Header Image

Getting Started with Apache Hadoop 0.23.0

Posted on Dec 4, 2011

Hadoop 0.23.0 was released November 11, 2011. Being the future of the Hadoop platform, it's worth checking out even though it is an alpha release.

Note: Many of the instructions in this article came from trial and error, and there are lots of alternative (and possibly better ways) to configure the systems. Please feel free to suggest improvements in the comments. Also, all commands were only tested on Mac OS X.


To get started, download the hadoop-0.23.0.tar.gz file from one of the mirrors here:

Once downloaded, decompress the file. The bundled documentation is available in share/doc/hadoop/index.html

Notes for Users of Previous Versions of Hadoop

The directory layout of the hadoop distribution changed in hadoop 0.23.0 and 0.20.204 vs. previous versions. In particular, there are now sbin, libexec, and etc directories in the root of distribution tarball.

scripts and executables

In hadoop 0.23.0, a number of commonly used scripts from the bin directory have been removed or drastically changed.  Specifically, the following scripts were removed (vs

  • hadoop-daemon(s).sh
  • and
  • and
  • and
  • and
  • task-controller

The start/stop mapred-related scripts have been replaced by "map-reduce 2.0" scripts called yarn-*.  The and scripts no longer start or stop HDFS, but they are used to start and stop the yarn daemons.  Finally, bin/hadoop has been deprecated. Instead, users should use bin/hdfs and bin/mapred.

Hadoop distributions now also include scripts in a sbin directory. The scripts include,, and (and the stop versions of those scripts).

configuration directories and files

The conf directory that comes with Hadoop is no longer the default configuration directory.  Rather, Hadoop looks in etc/hadoop for configuration files.  The libexec directory contains scripts and for configuring where Hadoop pulls configuration information, and it's possible to override the location of the configuration directory the following ways:

  • calls in $HADOOP_COMMON_HOME/libexec and $HADOOP_HOME/libexec
  • accepts a --config option for specifying a config directory, or the directory can be specified using $HADOOP_CONF_DIR.
    • This scripts also accepts a --hosts parameter to specify the hosts / slaves
    • This script uses variables typically set in, such as: $JAVA_HOME, $HADOOP_HEAPSIZE, $HADOOP_CLASSPATH, $HADOOP_LOG_DIR, $HADOOP_LOGFILE and more.  See the file for a full list of variables.

Configure HDFS

To start hdfs, we will use sbin/ which pulls configuration from etc/hadoop by default. We'll be putting configuration files in that directory, starting with core-site.xml.  In core-site.xml, we must specify a

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

Next, we want to override the locations that the NameNode and DataNode store data so that it's in a non-transient location. The two relevant parameters are and  We also set replication to 1, since we're using a single datanode.

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name></name> <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/namenode</value> </property> <property> <name></name> <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/datanode</value> </property> </configuration>


  • as of HDFS-456 and HDFS-873, the namenode and datanode dirs should be specified with a full URI.
  • by default, hadoop starts up with 1000 megabytes of RAM allocated to each daemon. You can change this by adding a to etc/hadoop. There's a template that can be added with: $ cp ./share/hadoop/common/templates/conf/ etc/hadoop
    • The template sets up a bogus value for HADOOPLOGDIR
    • HADOOPPIDDIR defaults to /tmp, so you might want to change that variable, too.

Start HDFS

Start the NameNode:

sbin/ start namenode

Start a DataNode:

sbin/ start datanode

(Optionally) start the SecondaryNameNode (this is not required for local development, but definitely for production).

sbin/ start secondarynamenode

To confirm that the processes are running, issue jps and look for lines for NameNode, DataNode and SecondaryNameNode:

$ jps
55036 Jps
55000 SecondaryNameNode
54807 NameNode
54928 DataNode


  • the hadoop daemons log to the "logs" dir.  Stdout goes to a file ending in ".out" and a logfile ends in ".log". If a daemon doesn't start up, check the file that includes the daemon name (e.g. logs/hadoop-joecrow-datanode-jcmba.local.out).
  • the commands might say "Unable to load realm info from SCDynamicStore" (at least on Mac OS X). This appears to be harmless output, see HADOOP-7489 for details.

Stopping HDFS

Eventually you'll want to stop HDFS. Here are the commands to execute, in the given order:

sbin/ stop secondarynamenode
sbin/ stop datanode
sbin/ stop namenode

Use jps to confirm that the daemons are no longer running.

Running an example MR Job

This section just gives the commands for configuring and starting the Resource Manager, Node Manager, and Job History Server, but it doesn't explain the details of those. Please refer to the References and Links section for more details.

The Yarn daemons use the conf directory in the distribution for configuration by default. Since we used etc/hadoop as the configuration directory for HDFS, it would be nice to use that as the config directory for mapreduce, too.  As a result, we update the following files:

In conf/, add the following lines under the definition of YARNCONFDIR:


In conf/yarn-site.xml, update the contents to:

<?xml version="1.0"?>

Set the contents of etc/hadoop/mapred-site.xml to:

<?xml version="1.0"?>
<?xml-stylesheet href="configuration.xsl"?>

Now, start up the yarn daemons:

 $ bin/ start resourcemanager
 $ bin/ start nodemanager
 $ bin/ start historyserver

A bunch of example jobs are available via the hadoop-examples jar. For example, to run the program that calculates pi:

$ bin/hadoop jar hadoop-mapreduce-examples-0.23.0.jar pi \ \
-libjars modules/hadoop-mapreduce-client-jobclient-0.23.0.jar 16 10000

The command will output a lot of output, but towards the end you'll see:

Job Finished in 67.705 seconds
Estimated value of Pi is 3.14127500000000000000


  • By default, the resource manager uses a number of IPC ports, including 8025, 8030, 8040, and 8141.  The web UI is exposed on port 8088.
  • By default, the JobHistoryServer uses port 19888 for a web UI and port 10020 for IPC.
  • By default, the node manager uses port 9999 for a web UI and port 4344 for IPC. Port 8080 is used for something? Also so random port... 65176 ?
  • The resource manager has a "proxy" url that it uses to link-through to the JobHistoryServer UI. e.g.:
    $ curl -I
    HTTP/1.1 302 Found
    Content-Type: text/plain; charset=utf-8
    Content-Length: 0
    Server: Jetty(6.1.26)


While Hadoop 0.23 is an alpha-release, getting it up and running in psuedo-distributed mode isn't too difficult.  The new architecture will take some getting used to for users of previous releases of Hadoop, but it's an exciting step forward.

Observations and Notes

There are a few bugs or gotchas that I discovered or verified to keep an eye on as you're going through these steps.  These include:

  • HADOOP-7837 log4j isn't setup correctly when using sbin/
  • HDFS-2574 Deprecated parameters appear in the hdfs-site.xml templates.
  • HDFS-2595 misleading message when not set and running sbin/
  • HDFS-2553 BlockPoolScanner spinning in a loop (causes DataNode to peg one cpu to 100%).
  • HDFS-2608 NameNode webui references missing hadoop.css

References and Links

Leave a Comment

Recap: Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition

Posted on Nov 13, 2011

The Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition was Wednesday, November 9. It was collocated with the Hive Meetup at Palantir's awesome office space in the meatpacking district in Manhattan.

The following are my notes from the two presentations about Flume NG and Artimon. Hopefully I've recapped details accurately, but please let me know if corrections are appropriate!

Flume NG

Speakers Arvind Prabhakar and Eric Sammer of Cloudera. Arvind is first.

Motivation: Flume was built piecemeal as use-cases arose, and FlumeNG is taking a step back and rearchitecting from scratch with these use-cases in mind.  To understand the flume-ng architecture, Arvind took us through a tour of terminology.

If Flume is a blackbox, then there are bunches of potential inputs (e.g. log4j, thrift, avro) that go into the flume black box. Likewise there are lots of potential destinations for data (e.g. hdfs, hbase) exiting the flume black box.

Clients are the entities that produce events, e.g. log4j appender. A terminal sink is the final sink where data exits flume. Like flume 0.9.x, there are sources and sinks, but the start and end points have special names--  clients and terminal sinks. A flow describes how data moves from a client to a terminal sink.

Agents are the physical processes that house bunches of flume entities to move the flow along (pass along data from a source to a sink).

There can be other situations, such as fanout flows, in which an event gets replicated to two different sinks. From the terminology perspective, each start-end pipeline is a flow, even if two flows have hops in common and overlap.

A channel is a persistent or transient store for buffering events on agent nodes. An agent reads from a source and puts data on a channel. The sink polls the channel, picks up the events that arrived, packages them together, and sends them along. In doing so, the sink can do batching, encryption, compression, etc.  A source can feed into multiple channels, but a sink only reads from a single channel.

Rather than a ChannelDriver (flume 0.9.x), the sinks and sources each have their own threads, and they collaborate using a producer-consumer paradigm with the channel as a queue/buffer.

Because the definition of a node doesn't fit well in fanout situations, this term is no longer used.

Transactional Guarantees

  • if a agent has a single source and single sink. The source initiates a transaction with the channel. If the source can't accept the message, it invalidates the transaction.
  • likewise, sink has a transaction with its channel, so there is a channel -> sink -> src -> channel transaction, meaning you can have guaranteed delivery.
  • implementations of channels include:
    • memory, file, jdbc (with acid guarantees).


client (log4j) -> agent1 -> agent2 -> HDFS.

If agent2/HDFS link goes down, then the events will start accumulating on agent2. This is because channels are passive (the sink actively removes them). When the channel reaches its capacity, the message will get relayed as an exception and then events will start buffering on agent1. When agent1 reaches its capacity, then your client will start to fail. When link recovers, you'll start to see an immediate draining of events.

At this point, Eric takes over to chat about the flume-ng (and flume in general) dev efforts:

The bug for flume-ng is FLUME-728, the branch is called FLUME-728. Per Eric:

  • alpha quality.
  • sources: avro source, unix (tail) source, net cat source (for testing), a few other things.
  • configuration is now plugin/user extendable (zookeeper implemented, bug could use other stores).
  • hdfs sink supports append and sync, as well as bucketing.
  • avro client sink that lets you do tiered collection.
  • looking for committers that don't work for Cloudera.

Notes from breakout session run by Eric:

  • Avro is the new default RPC system (no more reliance on thrift binary).
  • Avro is a bit slower, so batching / buffering is builtin to improve performance
  • Not 100% api compatible, so custom flume decorators/sinks will have to have (minimal) changes.
  • lots more that I can't recall :).

Artimon - Arkéa Real Time Information Monitoring.

Mathias Herberts of Arkéa- Slides on Slideshare.

"we found out that none of the existing monitoring system meet our needs, so we created artimon"

ARTIMON -- last mast of a ship with at least 3 sails, and it's supposed to stabilize the ship.

Collecting data

Collect variable instances on machines (using flume) and store the metrics in HDFS and in-memory buckets.  Variable instances are similar to rowkeys in OpenTSDB, if you're familiar. Something like:

name, label=value

Support for types like:

  • Integer
  • Double
  • Boolean
  • String

The framework adds a few labels: datacenter, rack, module, context, environment (production, testing, etc), host. Retrieved via reverse dns, dc-rack-module.

Metrics are created by a library (mainly java), called ArtimonBookeeper. It has a very simple interface. To use ArtimonBookeeper:

    • build a singleton.
    • use a method like: addToIntegerVar(name, labels, offset).
      • creates (or adds to) a variable metric (counter) held in memory.
    • the bookkeeper is part of a context, and it registers itself in zookeeper.
      • part of the registration tells ZK the context, IP and port.
      • the ip and port are a thrift end point that you can retrieve variables/counters stored by the application.
    • metrics have been exported by java library using a thrift endpoint. The design allows you to expose metrics from any language supported by thrift.
    • if you can't provide a thrift endpoint to expose metrics, you can create a file in /var/run/artimon, and it'll read the data every minute.

Machines run apps that export the metrics. Can have several apps on each machine.


Each machine has flume running. Start several nodes and configure them without a master (patched flume cause they had problems with the master). Special source "artimon source" looks in Zookeeper for stuff to read. Periodically polls (every minute) the thrift endpoints exposed by the apps. Retrieve all metrics that are on the same app. Push to collectors. Collectors push to sink:

  • put most granular data in HDFS.
  • VHMS - VarHistoryMemStore (several servers running this). Store in buckets with various resolutions, depth, etc.
    • 60 buckets of 60 seconds.
    • 48 buckets of 5 minutes
    • 96 buckets of 15 minutes.
    • 72 buckets of 60 minutes. (3 days worth of data with 1-hour granularity).

Worked well for a while, but hit a "flume-wall":

  • DFOs for failures, which works very well.
  • problems arise when you have a slowdown, but not a complete failure. Not going to benefit from the DFO. You're going to run into OOME because you fill up buffers. Had to put in place a heuristic to drop stuff.
    • not great because when there is congestion is when there is a problem.
  • Solution was to create a Kafka sink. Kafka is the buffer. Failure scenario is much better. Haven't had any slowdowns.
  • Wrote a Kafka source on the other side to collect metrics.

Stats and numbers

  • Collecting around 1.5 million metrics per minute.
    • 25K metrics/second. But they experience that all metrics are collected in about 10s of every minute.
  • 1.5B metrics per day.
  • Keeping all raw data since end of june, collected around 80TB (divide by 3). 200-300GB/day, compressed.

The great thing about flume is that we have several dc, but when there's a colo partition, the data recovers. When the partition ends, the metrics flood in.

What they do with metrics?

VHMS has a thrift endpoint. You can query it like "Give me all metrics named memcached which have dc=jb for the last 600,000ms"

  • retrieves a matrix of values. memcached.varX x value
  • you can display and have a pretty quick view.
  • you can apply stuff on the matrices.
    • "Map paradigm" - historical data on one variable data. Library is in Groovy. DSL to play with metrics.
    • map_delta() -- take two values, create a new one. Creates a new dataset.
    • provides a bunch of builtin functions, but you can create your own using groovy closures.
  • labels are important. You can define equivalence classes. If you have the same names and values (or subset).
  • can do bulk operations.
    • group two operands by equivalence classes. e.g. compute available disk space. Works for one volume or thousands of volumes.
  • reduce paradigm to an equivalence class. e.g. could compute the average temperature of each rack in your datacenter with 1-line.

notes from breakout session:

  • have a JMX adapter that's used for pulling metrics from hadoop, hbase, etc.
  • using Kafka very much like the channels in flume-ng (flume releases use a push model, which is responsible for many of the problems).
  • Write pig scripts for custom / post-mortem analysis.
  • Groovy library has an email service for alerting.

Thanks again for everyone that help to make this event happen. It was a ton of fun.

Leave a Comment

Recap: April Puppet NYC Meetup

Posted on Apr 21, 2011

Last week, I attended my first Puppet NYC meetup, which was hosted at Gilt Groupe. As a fairly recent user of puppet, it was great to meet some folks from the community in NYC that are using it on a daily basis. Here are my notes from the two great presentations in hopes that they're useful for other people.


Presenter: Eric E. Moore (Brandorr Group LLC)

Foreman is an application that runs atop puppet to let you interact with and see reports of your puppet nodes.  In addition, Foreman can act as a provisioning system for imaging machines with kickstart, signing puppet certificates, and much more.  Foreman scales -- foreman has been known to power a 4,000 node operation.

The main page of the Foreman UI is a list of all of your hosts, and it links off to a number of reports.  There is a dashboard/overview page which displays a pie chart of your Active/Error/Out of Sync instances as well as the run-distribution (and timings) of previous runs.

Foreman imports information about your hosts from the puppet storeconfigs, and there's detailed output from the last puppet run (on a per-node basis).  Foreman also gives you a UI to access to the lists of facts per server, and you can do things like search for all nodes matching a particular fact (i.e. see a list of all machines with processor XYZ).  This data is available via API, as well (more later).

Foreman lets you control which environment a node is in (if you're using environments on the puppet server), but it also lets you set variables that are sent to the puppet client on run (somewhat like facter).  You can set these variables on four-levels: global, domain-level, hostgroups, and host-level.  Internally, foreman lets you group machines by domain or into host-groups for setting variables like these (the talk at the puppet meetup was that this is an alternative to extlookup).

Foreman is designed for total-provisioning.  It supports provisioning via/configuring (among others):

  • Kickstart, Jumpstart, and Preseed
  • PuppetCA
  • TFTP
  • DNS
  • DHCP
  • Virtual machines (VMWare)
  • Eric and Brian mentioned that they are planning on contributing ec2 provisioning to the project.

Foreman has full role-based access controls, meaning you can give you users access to particular views, reports, operations or subsets of nods.  In addition, it provides an audit log of what has changed (including graphs of the number of changes, failures etc).  It provides a mechanism to initiate puppet runs from the dashboard, and also has a "destroy" button to clean out the storeconfigs for a particular node.

An interesting feature of Foreman is the REST API, which follows full REST / HTTP semantics for CRUD operations.  Eric mentioned using the API for provisioning nodes as well as for running searches over the nodes in the system.  It was mentioned that authentication for the REST API was less than ideal -- suggestion was to use some sort of proxy in front of Foreman.

Foreman vs. Puppet dashboard: The conversation seemed to suggest that Foreman's features were a super-set of those of Puppet Dashboard, with a few exceptions.  For example, Puppet Dashboard has support for viewing diffs of your configurations.

Change Management with Puppet

Presenter: Garrett Honeycutt (Puppet Labs) Slides

Ideally, you want all the environments to be exactly the same: Dev == QA == Staging == PROD so that you can catch issues early.  With that said, there's typically some sort of approval criteria to move changes from one environment to the other (code review, QA procedures, etc).  Given all of these different environments, each environment often has different teams and sometimes there are conflicting goals.  For example, dev wants to do quick features whereas ops wants production to be stable.

It's important to document the different environments you have and what the policies are.  For example, who owns what, what the order of precedence is, what the SLAs are per environment, etc.  Garret has seen a flow like the following work well when doing puppet development: Puppet Test Area -> Dev -> QA -> Prod .  In addition, it's important to document the gating factors between environments -- who can approve migrations between environments and how are they approved.

Suggested SVN/git layout looks like this:


Breaking it down:

  • Branches are short-lived and topical ("feature" branches). For example, branches/123 is a branch to work on ticket 123. All work should be done on a branch and then merged to trunk after review.
  • Tags are immutable, and Garret has found that BIND style timestamps work the best. 2011041300 would be the first tag for April 13th, 2011 (the last two digits are an incrementing counter).
  • Trunk contains all of the best known working code, but it is not very-well tested.

Development flow:

Consider these environments: Puppet Test Area -> Dev -> QA -> Prod

  1. When a ticket/change request comes in, create a branch off of trunk. Eventually, the change will get merged back to trunk.
  2. There should be an environment that is always running off of trunk so that you can verify that it works well-enough to create a tag. Tags should always be made off of trunk.
  3. Once a tag has been created, that tag should be deployed to each environment in turn.
  4. Testing is done per environment, and if any verifications fail (for example, there's an issue in QA and it's not a candidate for production), then create a new ticket to fix the issue and go back to #1.

Important: Avoid taking shortcuts, they will become more and more expensive.

Tag generation can be automated, and the selection of tags for environments can be automated as well (svn switch module).

Branch development:

It needs to be easy to have test instances using branches. Need to have it easy for spinning up instances, and you can either use puppet apply or have a puppet master that knows about each branch and has a separate environment for it. Person reviewing code can then spin up an instance to verify the module.

Release manager:

Garret has very successfully used the role of "release manager" in the past to facilitate branch-management. The RM is responsible for merging all branches back to trunk once they are stable. This person should also be responsible for monitoring commits to a branch so that they can offer constructive criticism early in the process (possibly pair-programming), particularly if there are people new to using puppet. The RM position can be a rolling person (e.g. he's had it switch weekly in the past).

Multiple teams exchanging code:

  • Use multiple module paths.
  • Communication is super-important. Advocated using svn commit emails so that teams can see what other teams are doing.
  • Private github accounts are supposedly very useful, too.

Test driven development:

  • Use puppet apply to do manifest testing.
  • Recommends having a ${module}/test directory to put test files. The test file should show how to use the module you're developing, so that it can be tested with puppet apply. The test should be written before the module. NOTE, this is not a unit test. You're not verifying what puppet has done, only that it is exiting cleanly.
  • End-to-end testing should be done via monitoring. Machines add themselves to nagios and you can verify that services get started correctly that way. Suggestion is that all environments have monitoring for this end-end testing.

We briefly discussed a use-case of a company that runs hundreds of machines. This organization runs puppet via cron rather than as the puppet agent daemon in order to disable updates during busy hours. During those hours, puppet runs in noop mode so that reports are generated for Puppet Dashboard. When they are ready to roll-out changes during the change-window, they run puppet with noop to test what would happen and give a green light before actually applying the changes.


The April puppet meetup was great, and I definitely plan on attending future meetups to learn what people are doing.  I'd like to thank everyone that made the meetup possible and a great success!

Leave a Comment

Silently broken Gmail

Posted on Apr 2, 2011

At work, we have google apps, which comes with several gigs of gmail storage. For email, though, we use outlook server with a low quota. Rather than deleting email, I "archive" to gmail via IMAP.

One day, though, gmail IMAP silently stopped syncing. I could login, but no mail was being transfered. I tried everything -- synchronizing accounts, rebuilding the mailbox, but nothing worked. I was hoping to enable some more verbose logging, when I came across an old article from about Logging.

After learning the magic incantation, I started up to log all operations on port 993 (the SSL port that gmail IMAP uses).  A message in the logs immediately stood out:

5.1 BAD [ALERT] Message too large.

The prior message logged the timestamp of the offending email, which was over 25MB in size (the max size for Gmail).  Unfortunately, had created a large number of copies of this email (over 1,000) and placed them in the "Recovered Items" folder.

Even after deleting all of these "recovered item" copies, Mail kept making new one.  I tracked down a copy of this file in the .OfflineBackups directory. After removing the file that gmail rejects (see -- I chose the file with the large file size in that directory), I was finally able to resync with gmail!

Unfortunately, was not happy that I removed a file from .OfflineBackups, and it refused to process the rest of the files in that directory.  I wrote the following python script to convert the unprocessed files in .OfflineBackups into a single .mbox file suitable for import into

Thankfully, in the end, everything is back to normal.

Leave a Comment

two puppet tricks: combining arrays and local tests

Posted on Feb 1, 2011

Joining Arrays
I found myself wanting to join a bunch of arrays in my puppet manifests. I had 3 lists of ip addresses, but wanted to join all 3 lists together into a single list to provide all ips to a template. I found some good tricks for flattening nested arrays in an erb -- -- but I found those solutions to be too "magic" and hard to read.I ended up settling on using an inline_template and split, like so:

$all_ips = split(inline_template("<%= (worker_ips+entry_point_ips+master_ips).join(',') %>"),',') Testing Puppet Locally
To debug things like this, I like to use puppet locally to test a configuration by writing a test.pp file and passing that into puppet, e.g.:File: arrays-test.pp
define tell_me() { notify{$name:}}

$worker_ips = [ "",
$entry_point_ips = ["",
$master_ips = [ "",

$all_ips = split(inline_template("<%= (worker_ips+entry_point_ips+master_ips).join(',') %>"),',')

Running puppet:

$ puppet arrays-test.pp
notice: /Stage[main]//Tellme[]/Notify[]/message: defined 'message' as ''
notice: /Stage[main]//Tell
me[]/Notify[]/message: defined 'message' as ''
notice: /Stage[main]//Tellme[]/Notify[]/message: defined 'message' as ''
notice: /Stage[main]//Tell
me[]/Notify[]/message: defined 'message' as ''
notice: /Stage[main]//Tell_me[]/Notify[]/message: defined 'message' as ''

Note that for these kinds of tests, I like to use notify rather than building files.

Leave a Comment

Blog Search


Joe Crobak is a software engineer at the United States Digital Service and runs Hadoop Weekly.

Elsewhere on the internet:

subscribe via RSS