The Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition was Wednesday, November 9. It was collocated with the Hive Meetup at Palantir's awesome office space in the meatpacking district in Manhattan.

The following are my notes from the two presentations about Flume NG and Artimon. Hopefully I've recapped details accurately, but please let me know if corrections are appropriate!

Flume NG

Speakers Arvind Prabhakar and Eric Sammer of Cloudera. Arvind is first.

Motivation: Flume was built piecemeal as use-cases arose, and FlumeNG is taking a step back and rearchitecting from scratch with these use-cases in mind.  To understand the flume-ng architecture, Arvind took us through a tour of terminology.

If Flume is a blackbox, then there are bunches of potential inputs (e.g. log4j, thrift, avro) that go into the flume black box. Likewise there are lots of potential destinations for data (e.g. hdfs, hbase) exiting the flume black box.

Clients are the entities that produce events, e.g. log4j appender. A terminal sink is the final sink where data exits flume. Like flume 0.9.x, there are sources and sinks, but the start and end points have special names--  clients and terminal sinks. A flow describes how data moves from a client to a terminal sink.

Agents are the physical processes that house bunches of flume entities to move the flow along (pass along data from a source to a sink).

There can be other situations, such as fanout flows, in which an event gets replicated to two different sinks. From the terminology perspective, each start-end pipeline is a flow, even if two flows have hops in common and overlap.

A channel is a persistent or transient store for buffering events on agent nodes. An agent reads from a source and puts data on a channel. The sink polls the channel, picks up the events that arrived, packages them together, and sends them along. In doing so, the sink can do batching, encryption, compression, etc.  A source can feed into multiple channels, but a sink only reads from a single channel.

Rather than a ChannelDriver (flume 0.9.x), the sinks and sources each have their own threads, and they collaborate using a producer-consumer paradigm with the channel as a queue/buffer.

Because the definition of a node doesn't fit well in fanout situations, this term is no longer used.

Transactional Guarantees

  • if a agent has a single source and single sink. The source initiates a transaction with the channel. If the source can't accept the message, it invalidates the transaction.
  • likewise, sink has a transaction with its channel, so there is a channel -> sink -> src -> channel transaction, meaning you can have guaranteed delivery.
  • implementations of channels include:
    • memory, file, jdbc (with acid guarantees).

Example:

client (log4j) -> agent1 -> agent2 -> HDFS.

If agent2/HDFS link goes down, then the events will start accumulating on agent2. This is because channels are passive (the sink actively removes them). When the channel reaches its capacity, the message will get relayed as an exception and then events will start buffering on agent1. When agent1 reaches its capacity, then your client will start to fail. When link recovers, you'll start to see an immediate draining of events.

At this point, Eric takes over to chat about the flume-ng (and flume in general) dev efforts:

The bug for flume-ng is FLUME-728, the branch is called FLUME-728. Per Eric:

  • alpha quality.
  • sources: avro source, unix (tail) source, net cat source (for testing), a few other things.
  • configuration is now plugin/user extendable (zookeeper implemented, bug could use other stores).
  • hdfs sink supports append and sync, as well as bucketing.
  • avro client sink that lets you do tiered collection.
  • looking for committers that don't work for Cloudera.

Notes from breakout session run by Eric:

  • Avro is the new default RPC system (no more reliance on thrift binary).
  • Avro is a bit slower, so batching / buffering is builtin to improve performance
  • Not 100% api compatible, so custom flume decorators/sinks will have to have (minimal) changes.
  • lots more that I can't recall :).

Artimon - Arkéa Real Time Information Monitoring.

Mathias Herberts of Arkéa- Slides on Slideshare.

"we found out that none of the existing monitoring system meet our needs, so we created artimon"

ARTIMON -- last mast of a ship with at least 3 sails, and it's supposed to stabilize the ship.

Collecting data

Collect variable instances on machines (using flume) and store the metrics in HDFS and in-memory buckets.  Variable instances are similar to rowkeys in OpenTSDB, if you're familiar. Something like:

name, label=value

Support for types like:

  • Integer
  • Double
  • Boolean
  • String

The framework adds a few labels: datacenter, rack, module, context, environment (production, testing, etc), host. Retrieved via reverse dns, dc-rack-module.

Metrics are created by a library (mainly java), called ArtimonBookeeper. It has a very simple interface. To use ArtimonBookeeper:

    • build a singleton.
    • use a method like: addToIntegerVar(name, labels, offset).
      • creates (or adds to) a variable metric (counter) held in memory.
    • the bookkeeper is part of a context, and it registers itself in zookeeper.
      • part of the registration tells ZK the context, IP and port.
      • the ip and port are a thrift end point that you can retrieve variables/counters stored by the application.
    • metrics have been exported by java library using a thrift endpoint. The design allows you to expose metrics from any language supported by thrift.
    • if you can't provide a thrift endpoint to expose metrics, you can create a file in /var/run/artimon, and it'll read the data every minute.

Machines run apps that export the metrics. Can have several apps on each machine.

Flume

Each machine has flume running. Start several nodes and configure them without a master (patched flume cause they had problems with the master). Special source "artimon source" looks in Zookeeper for stuff to read. Periodically polls (every minute) the thrift endpoints exposed by the apps. Retrieve all metrics that are on the same app. Push to collectors. Collectors push to sink:

  • put most granular data in HDFS.
  • VHMS - VarHistoryMemStore (several servers running this). Store in buckets with various resolutions, depth, etc.
    • 60 buckets of 60 seconds.
    • 48 buckets of 5 minutes
    • 96 buckets of 15 minutes.
    • 72 buckets of 60 minutes. (3 days worth of data with 1-hour granularity).

Worked well for a while, but hit a "flume-wall":

  • DFOs for failures, which works very well.
  • problems arise when you have a slowdown, but not a complete failure. Not going to benefit from the DFO. You're going to run into OOME because you fill up buffers. Had to put in place a heuristic to drop stuff.
    • not great because when there is congestion is when there is a problem.
  • Solution was to create a Kafka sink. Kafka is the buffer. Failure scenario is much better. Haven't had any slowdowns.
  • Wrote a Kafka source on the other side to collect metrics.

Stats and numbers

  • Collecting around 1.5 million metrics per minute.
    • 25K metrics/second. But they experience that all metrics are collected in about 10s of every minute.
  • 1.5B metrics per day.
  • Keeping all raw data since end of june, collected around 80TB (divide by 3). 200-300GB/day, compressed.

The great thing about flume is that we have several dc, but when there's a colo partition, the data recovers. When the partition ends, the metrics flood in.

What they do with metrics?

VHMS has a thrift endpoint. You can query it like "Give me all metrics named memcached which have dc=jb for the last 600,000ms"

  • retrieves a matrix of values. memcached.varX x value
  • you can display and have a pretty quick view.
  • you can apply stuff on the matrices.
    • "Map paradigm" - historical data on one variable data. Library is in Groovy. DSL to play with metrics.
    • map_delta() -- take two values, create a new one. Creates a new dataset.
    • provides a bunch of builtin functions, but you can create your own using groovy closures.
  • labels are important. You can define equivalence classes. If you have the same names and values (or subset).
  • can do bulk operations.
    • group two operands by equivalence classes. e.g. compute available disk space. Works for one volume or thousands of volumes.
  • reduce paradigm to an equivalence class. e.g. could compute the average temperature of each rack in your datacenter with 1-line.

notes from breakout session:

  • have a JMX adapter that's used for pulling metrics from hadoop, hbase, etc.
  • using Kafka very much like the channels in flume-ng (flume releases use a push model, which is responsible for many of the problems).
  • Write pig scripts for custom / post-mortem analysis.
  • Groovy library has an email service for alerting.

Thanks again for everyone that help to make this event happen. It was a ton of fun.