Posted on Dec 4, 2011
Hadoop 0.23.0 was released November 11, 2011. Being the future of the Hadoop platform, it's worth checking out even though it is an alpha release.
Note: Many of the instructions in this article came from trial and error, and there are lots of alternative (and possibly better ways) to configure the systems. Please feel free to suggest improvements in the comments. Also, all commands were only tested on Mac OS X.
To get started, download the hadoop-0.23.0.tar.gz file from one of the mirrors here: http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-0.23.0.
Once downloaded, decompress the file. The bundled documentation is available in share/doc/hadoop/index.html
The directory layout of the hadoop distribution changed in hadoop 0.23.0 and 0.20.204 vs. previous versions. In particular, there are now sbin, libexec, and etc directories in the root of distribution tarball.
In hadoop 0.23.0, a number of commonly used scripts from the bin directory have been removed or drastically changed. Specifically, the following scripts were removed (vs 0.20.205.0):
The start/stop mapred-related scripts have been replaced by "map-reduce 2.0" scripts called yarn-*. The start-all.sh and stop-all.sh scripts no longer start or stop HDFS, but they are used to start and stop the yarn daemons. Finally, bin/hadoop has been deprecated. Instead, users should use bin/hdfs and bin/mapred.
Hadoop distributions now also include scripts in a sbin directory. The scripts include start-all.sh, start-dfs.sh, and start-balancer.sh (and the stop versions of those scripts).
configuration directories and files
The conf directory that comes with Hadoop is no longer the default configuration directory. Rather, Hadoop looks in etc/hadoop for configuration files. The libexec directory contains scripts hadoop-config.sh and hdfs-config.sh for configuring where Hadoop pulls configuration information, and it's possible to override the location of the configuration directory the following ways:
To start hdfs, we will use sbin/start-dfs.sh which pulls configuration from etc/hadoop by default. We'll be putting configuration files in that directory, starting with core-site.xml. In core-site.xml, we must specify a fs.default.name:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration>
Next, we want to override the locations that the NameNode and DataNode store data so that it's in a non-transient location. The two relevant parameters are dfs.namenode.name.dir and dfs.datanode.data.dir. We also set replication to 1, since we're using a single datanode.
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/datanode</value> </property> </configuration>
Notes:
Start the NameNode:
sbin/hadoop-daemon.sh start namenode
Start a DataNode:
sbin/hadoop-daemon.sh start datanode
(Optionally) start the SecondaryNameNode (this is not required for local development, but definitely for production).
sbin/hadoop-daemon.sh start secondarynamenode
To confirm that the processes are running, issue jps and look for lines for NameNode, DataNode and SecondaryNameNode:
$ jps 55036 Jps 55000 SecondaryNameNode 54807 NameNode 54928 DataNode
Notes:
Eventually you'll want to stop HDFS. Here are the commands to execute, in the given order:
sbin/hadoop-daemon.sh stop secondarynamenode sbin/hadoop-daemon.sh stop datanode sbin/hadoop-daemon.sh stop namenode
Use jps to confirm that the daemons are no longer running.
This section just gives the commands for configuring and starting the Resource Manager, Node Manager, and Job History Server, but it doesn't explain the details of those. Please refer to the References and Links section for more details.
The Yarn daemons use the conf directory in the distribution for configuration by default. Since we used etc/hadoop as the configuration directory for HDFS, it would be nice to use that as the config directory for mapreduce, too. As a result, we update the following files:
In conf/yarn-env.sh, add the following lines under the definition of YARNCONFDIR:
export HADOOPCONFDIR="${HADOOPCONFDIR:-$YARNHOME/etc/hadoop}" export HADOOPCOMMONHOME="${HADOOPCOMMONHOME:-$YARNHOME}" export HADOOPHDFSHOME="${HADOOPHDFSHOME:-$YARNHOME}"
In conf/yarn-site.xml, update the contents to:
<?xml version="1.0"?> <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce.shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> </configuration>
Set the contents of etc/hadoop/mapred-site.xml to:
<?xml version="1.0"?> <?xml-stylesheet href="configuration.xsl"?> <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
Now, start up the yarn daemons:
$ bin/yarn-daemon.sh start resourcemanager $ bin/yarn-daemon.sh start nodemanager $ bin/yarn-daemon.sh start historyserver
A bunch of example jobs are available via the hadoop-examples jar. For example, to run the program that calculates pi:
$ bin/hadoop jar hadoop-mapreduce-examples-0.23.0.jar pi \ -Dmapreduce.clientfactory.class.name=org.apache.hadoop.mapred.YarnClientFactory \ -libjars modules/hadoop-mapreduce-client-jobclient-0.23.0.jar 16 10000
The command will output a lot of output, but towards the end you'll see:
Job Finished in 67.705 seconds Estimated value of Pi is 3.14127500000000000000
Notes
$ curl -I http://0.0.0.0:8088/proxy/application13226221033710001/jobhistory/job/job132262210337111 HTTP/1.1 302 Found Content-Type: text/plain; charset=utf-8 Location: http://192.168.1.12:19888/jobhistory/job/job132262210337111/jobhistory/job/job132262210337111 Content-Length: 0 Server: Jetty(6.1.26)
While Hadoop 0.23 is an alpha-release, getting it up and running in psuedo-distributed mode isn't too difficult. The new architecture will take some getting used to for users of previous releases of Hadoop, but it's an exciting step forward.
Posted on Nov 13, 2011
The Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition was Wednesday, November 9. It was collocated with the Hive Meetup at Palantir's awesome office space in the meatpacking district in Manhattan.
The following are my notes from the two presentations about Flume NG and Artimon. Hopefully I've recapped details accurately, but please let me know if corrections are appropriate!
Speakers Arvind Prabhakar and Eric Sammer of Cloudera. Arvind is first.
Motivation: Flume was built piecemeal as use-cases arose, and FlumeNG is taking a step back and rearchitecting from scratch with these use-cases in mind. To understand the flume-ng architecture, Arvind took us through a tour of terminology.
If Flume is a blackbox, then there are bunches of potential inputs (e.g. log4j, thrift, avro) that go into the flume black box. Likewise there are lots of potential destinations for data (e.g. hdfs, hbase) exiting the flume black box.
Clients are the entities that produce events, e.g. log4j appender. A terminal sink is the final sink where data exits flume. Like flume 0.9.x, there are sources and sinks, but the start and end points have special names-- clients and terminal sinks. A flow describes how data moves from a client to a terminal sink.
Agents are the physical processes that house bunches of flume entities to move the flow along (pass along data from a source to a sink).
There can be other situations, such as fanout flows, in which an event gets replicated to two different sinks. From the terminology perspective, each start-end pipeline is a flow, even if two flows have hops in common and overlap.
A channel is a persistent or transient store for buffering events on agent nodes. An agent reads from a source and puts data on a channel. The sink polls the channel, picks up the events that arrived, packages them together, and sends them along. In doing so, the sink can do batching, encryption, compression, etc. A source can feed into multiple channels, but a sink only reads from a single channel.
Rather than a ChannelDriver (flume 0.9.x), the sinks and sources each have their own threads, and they collaborate using a producer-consumer paradigm with the channel as a queue/buffer.
Because the definition of a node doesn't fit well in fanout situations, this term is no longer used.
Example:
client (log4j) -> agent1 -> agent2 -> HDFS.
If agent2/HDFS link goes down, then the events will start accumulating on agent2. This is because channels are passive (the sink actively removes them). When the channel reaches its capacity, the message will get relayed as an exception and then events will start buffering on agent1. When agent1 reaches its capacity, then your client will start to fail. When link recovers, you'll start to see an immediate draining of events.
At this point, Eric takes over to chat about the flume-ng (and flume in general) dev efforts:
The bug for flume-ng is FLUME-728, the branch is called FLUME-728. Per Eric:
Notes from breakout session run by Eric:
Mathias Herberts of Arkéa- Slides on Slideshare.
"we found out that none of the existing monitoring system meet our needs, so we created artimon"
ARTIMON -- last mast of a ship with at least 3 sails, and it's supposed to stabilize the ship.
Collect variable instances on machines (using flume) and store the metrics in HDFS and in-memory buckets. Variable instances are similar to rowkeys in OpenTSDB, if you're familiar. Something like:
name, label=value
Support for types like:
The framework adds a few labels: datacenter, rack, module, context, environment (production, testing, etc), host. Retrieved via reverse dns, dc-rack-module.
Metrics are created by a library (mainly java), called ArtimonBookeeper. It has a very simple interface. To use ArtimonBookeeper:
Machines run apps that export the metrics. Can have several apps on each machine.
Each machine has flume running. Start several nodes and configure them without a master (patched flume cause they had problems with the master). Special source "artimon source" looks in Zookeeper for stuff to read. Periodically polls (every minute) the thrift endpoints exposed by the apps. Retrieve all metrics that are on the same app. Push to collectors. Collectors push to sink:
Worked well for a while, but hit a "flume-wall":
The great thing about flume is that we have several dc, but when there's a colo partition, the data recovers. When the partition ends, the metrics flood in.
VHMS has a thrift endpoint. You can query it like "Give me all metrics named memcached which have dc=jb for the last 600,000ms"
notes from breakout session:
Thanks again for everyone that help to make this event happen. It was a ton of fun.
Leave a CommentPosted on Apr 21, 2011
Last week, I attended my first Puppet NYC meetup, which was hosted at Gilt Groupe. As a fairly recent user of puppet, it was great to meet some folks from the community in NYC that are using it on a daily basis. Here are my notes from the two great presentations in hopes that they're useful for other people.
Presenter: Eric E. Moore (Brandorr Group LLC)
Foreman is an application that runs atop puppet to let you interact with and see reports of your puppet nodes. In addition, Foreman can act as a provisioning system for imaging machines with kickstart, signing puppet certificates, and much more. Foreman scales -- foreman has been known to power a 4,000 node operation.
The main page of the Foreman UI is a list of all of your hosts, and it links off to a number of reports. There is a dashboard/overview page which displays a pie chart of your Active/Error/Out of Sync instances as well as the run-distribution (and timings) of previous runs.
Foreman imports information about your hosts from the puppet storeconfigs, and there's detailed output from the last puppet run (on a per-node basis). Foreman also gives you a UI to access to the lists of facts per server, and you can do things like search for all nodes matching a particular fact (i.e. see a list of all machines with processor XYZ). This data is available via API, as well (more later).
Foreman lets you control which environment a node is in (if you're using environments on the puppet server), but it also lets you set variables that are sent to the puppet client on run (somewhat like facter). You can set these variables on four-levels: global, domain-level, hostgroups, and host-level. Internally, foreman lets you group machines by domain or into host-groups for setting variables like these (the talk at the puppet meetup was that this is an alternative to extlookup).
Foreman is designed for total-provisioning. It supports provisioning via/configuring (among others):
Foreman has full role-based access controls, meaning you can give you users access to particular views, reports, operations or subsets of nods. In addition, it provides an audit log of what has changed (including graphs of the number of changes, failures etc). It provides a mechanism to initiate puppet runs from the dashboard, and also has a "destroy" button to clean out the storeconfigs for a particular node.
An interesting feature of Foreman is the REST API, which follows full REST / HTTP semantics for CRUD operations. Eric mentioned using the API for provisioning nodes as well as for running searches over the nodes in the system. It was mentioned that authentication for the REST API was less than ideal -- suggestion was to use some sort of proxy in front of Foreman.
Foreman vs. Puppet dashboard: The conversation seemed to suggest that Foreman's features were a super-set of those of Puppet Dashboard, with a few exceptions. For example, Puppet Dashboard has support for viewing diffs of your configurations.
Presenter: Garrett Honeycutt (Puppet Labs) Slides
Ideally, you want all the environments to be exactly the same: Dev == QA == Staging == PROD so that you can catch issues early. With that said, there's typically some sort of approval criteria to move changes from one environment to the other (code review, QA procedures, etc). Given all of these different environments, each environment often has different teams and sometimes there are conflicting goals. For example, dev wants to do quick features whereas ops wants production to be stable.
It's important to document the different environments you have and what the policies are. For example, who owns what, what the order of precedence is, what the SLAs are per environment, etc. Garret has seen a flow like the following work well when doing puppet development: Puppet Test Area -> Dev -> QA -> Prod . In addition, it's important to document the gating factors between environments -- who can approve migrations between environments and how are they approved.
Suggested SVN/git layout looks like this:
branches/123 branches/124 ... tags/2011041300 tags/2011041301 tags/2011041400 ... trunk
Breaking it down:
Development flow:
Consider these environments: Puppet Test Area -> Dev -> QA -> Prod
Important: Avoid taking shortcuts, they will become more and more expensive.
Tag generation can be automated, and the selection of tags for environments can be automated as well (svn switch module).
Branch development:
It needs to be easy to have test instances using branches. Need to have it easy for spinning up instances, and you can either use puppet apply or have a puppet master that knows about each branch and has a separate environment for it. Person reviewing code can then spin up an instance to verify the module.
Release manager:
Garret has very successfully used the role of "release manager" in the past to facilitate branch-management. The RM is responsible for merging all branches back to trunk once they are stable. This person should also be responsible for monitoring commits to a branch so that they can offer constructive criticism early in the process (possibly pair-programming), particularly if there are people new to using puppet. The RM position can be a rolling person (e.g. he's had it switch weekly in the past).
Multiple teams exchanging code:
Test driven development:
We briefly discussed a use-case of a company that runs hundreds of machines. This organization runs puppet via cron rather than as the puppet agent daemon in order to disable updates during busy hours. During those hours, puppet runs in noop mode so that reports are generated for Puppet Dashboard. When they are ready to roll-out changes during the change-window, they run puppet with noop to test what would happen and give a green light before actually applying the changes.
Summary
The April puppet meetup was great, and I definitely plan on attending future meetups to learn what people are doing. I'd like to thank everyone that made the meetup possible and a great success!
Leave a CommentPosted on Apr 2, 2011
At work, we have google apps, which comes with several gigs of gmail storage. For email, though, we use outlook server with a low quota. Rather than deleting email, I "archive" to gmail via IMAP.
One day, though, gmail IMAP silently stopped syncing. I could login, but no mail was being transfered. I tried everything -- synchronizing accounts, rebuilding the mailbox, but nothing worked. I was hoping to enable some more verbose logging, when I came across an old article from eriklabs.com about Mail.app Logging.
After learning the magic incantation, I started up Mail.app to log all operations on port 993 (the SSL port that gmail IMAP uses). A message in the logs immediately stood out:
5.1 BAD [ALERT] Message too large. http://mail.google.com/support/bin/answer.py?answer=8770
The prior message logged the timestamp of the offending email, which was over 25MB in size (the max size for Gmail). Unfortunately, Mail.app had created a large number of copies of this email (over 1,000) and placed them in the "Recovered Items" folder.
Even after deleting all of these "recovered item" copies, Mail kept making new one. I tracked down a copy of this file in the .OfflineBackups directory. After removing the file that gmail rejects (see http://automatica.com.au/2010/01/mail-app-and-its-offline-cache/ -- I chose the file with the large file size in that directory), I was finally able to resync with gmail!
Unfortunately, mail.app was not happy that I removed a file from .OfflineBackups, and it refused to process the rest of the files in that directory. I wrote the following python script to convert the unprocessed files in .OfflineBackups into a single .mbox file suitable for import into Mail.app.
Thankfully, in the end, everything is back to normal.
Leave a CommentPosted on Feb 1, 2011
Joining Arrays
I found myself wanting to join a bunch of arrays in my puppet manifests. I had 3 lists of ip addresses, but wanted to join all 3 lists together into a single list to provide all ips to a template. I found some good tricks for flattening nested arrays in an erb --http://weblog.etherized.com/posts/175 -- but I found those solutions to be too "magic" and hard to read.I ended up settling on using an inline_template and split, like so:
$all_ips = split(inline_template("<%= (worker_ips+entry_point_ips+master_ips).join(',') %>"),',')
Testing Puppet Locally
To debug things like this, I like to use puppet locally to test a configuration by writing a test.pp file and passing that into puppet, e.g.:File: arrays-test.pp
define tell_me() { notify{$name:}}
$worker_ips = [ "192.168.0.1",
"192.168.0.2",
]
$entry_point_ips = ["10.0.0.1",
"10.0.0.2",
]
$master_ips = [ "192.168.15.1",
]
$all_ips = split(inline_template("<%= (worker_ips+entry_point_ips+master_ips).join(',') %>"),',')
tell_me{$all_ips:}
Running puppet:
$ puppet arrays-test.pp
notice: 192.168.0.2
notice: /Stage[main]//Tellme[192.168.0.2]/Notify[192.168.0.2]/message: defined 'message' as '192.168.0.2'
notice: 192.168.15.1
notice: /Stage[main]//Tellme[192.168.15.1]/Notify[192.168.15.1]/message: defined 'message' as '192.168.15.1'
notice: 192.168.0.1
notice: /Stage[main]//Tellme[192.168.0.1]/Notify[192.168.0.1]/message: defined 'message' as '192.168.0.1'
notice: 10.0.0.1
notice: /Stage[main]//Tellme[10.0.0.1]/Notify[10.0.0.1]/message: defined 'message' as '10.0.0.1'
notice: 10.0.0.2
notice: /Stage[main]//Tell_me[10.0.0.2]/Notify[10.0.0.2]/message: defined 'message' as '10.0.0.2'
Note that for these kinds of tests, I like to use notify
rather than building files.
Joe Crobak is a software engineer at the United States Digital Service and runs Hadoop Weekly.
Elsewhere on the internet:
subscribe via RSS