Stop Thinking, Just Do!

Sungsoo Kim's Blog

Using Avro in MapReduce Jobs with Hadoop Streaming

tagsTags

23 March 2014


Using Avro in MapReduce Jobs with Hadoop Streaming

Apache Avro is a very popular data serialization format in the Hadoop technology stack. In this article I show code examples of MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format. We will use a small, Twitter-like data set as input for our example MapReduce jobs.


Table of Contents


avro-hadoop-starter

Example MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format.

Prerequisites

First you must clone my avro-hadoop-starter repository on GitHub.

$ git clone git@github.com:miguno/avro-hadoop-starter.git
$ cd avro-hadoop-starter

Requirements

The examples require the following software versions:

  • Gradle 1.9 (only for the Java examples)
  • Java JDK 7 (only for the Java examples)
    • It is easy to switch to JDK 6. Mostly you will need to change the sourceCompatibility and targetCompatibility parameters in build.gradle from 1.7 to 1.6 But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
  • Hadoop 2.x with MRv1 (not MRv2/YARN)
  • Pig 0.11
  • Hive 0.10
  • Twitter Bijection 0.6
  • Avro 1.7.6

More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.

Example data

We are using a small, Twitter-like data set as input for our example MapReduce jobs.

Avro schema

twitter.avro defines a basic schema for storing tweets:

{
  "type" : "record",
  "name" : "Tweet",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc"  : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc"  : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

The latest version of the schema is always available at twitter.avsc.

If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.

Avro data files

The actual data is stored in the following files:

  • twitter.avro – encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
  • twitter.json – JSON representation of the same example data

You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.

Here is a snippet of the example data:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }

Requirements

The examples require the following software versions:

  • Gradle 1.9 (only for the Java examples)
  • Java JDK 7 (only for the Java examples)
    • It is easy to switch to JDK 6. Mostly you will need to change the sourceCompatibility and targetCompatibility parameters in build.gradle from 1.7 to 1.6 But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
  • Hadoop 2.x with MRv1 (not MRv2/YARN)
  • Pig 0.11
  • Hive 0.10
  • Twitter Bijection 0.6
  • Avro 1.7.6

More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.

Example data

We are using a small, Twitter-like data set as input for our example MapReduce jobs.

Avro schema

twitter.avsc defines a basic schema for storing tweets:

{
  "type" : "record",
  "name" : "Tweet",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc"  : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc"  : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

The latest version of the schema is always available at twitter.avsc.

If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.

Avro data files

The actual data is stored in the following files:

  • twitter.avro – encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
  • twitter.json – JSON representation of the same example data

You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.

Here is a snippet of the example data:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }

Preparing the input data

The example input data we are using is twitter.avro. Upload twitter.avro to HDFS to make the input data available to our MapReduce jobs.

# Upload the input data
$ hadoop fs -mkdir examples/input
$ hadoop fs -copyFromLocal https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/test/resources/avro/twitter.avro examples/input

We will also upload the Avro schema twitter.avsc to HDFS because we will use a schema available at an HDFS location in one of the Hive examples.

# Upload the Avro schema
$ hadoop fs -mkdir examples/schema
$ hadoop fs -copyFromLocal https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/main/resources/avro/twitter.avsc examples/schema

Hadoop Streaming

Preliminaries

Important: The examples below assume you have access to a running Hadoop cluster.

How Streaming sees data when reading via AvroAsTextInputFormat

When using AvroAsTextInputFormat as the input format your streaming code will receive the data in JSON format, one record (“datum” in Avro parlance) per line. Note that Avro will also add a trailing TAB (\t) at the end of each line.

<JSON representation of Avro record #1>\t
<JSON representation of Avro record #2>\t
<JSON representation of Avro record #3>\t
...

Here is the basic data flow from your input data in binary Avro format to our streaming mapper:

input.avro (binary)  ---AvroAsTextInputFormat---> deserialized data (JSON) ---> Mapper

Examples

Prerequisites

The example commands below use the Hadoop Streaming jar for MRv1 shipped with Cloudera CDH4:

If you are not using Cloudera CDH4 or are using a new version of CDH4 just replace the jar file with the one included in your Hadoop installation.

The Avro jar files are straight from the Avro project:

Reading Avro, writing plain-text

The following command reads Avro data from the relative HDFS directory examples/input/ (which normally resolves to /user/<your-unix-username>/examples/input/). It writes the deserialized version of each data record (see section How Streaming sees data when reading via AvroAsTextInputFormat above) as is to the output HDFS directory streaming/output/. For this simple demonstration we are using the IdentityMapper as a naive map step implementation – it outputs its input data unmodified (equivalently we coud use the Unix tool cat, here) . We do not need to run a reduce phase here, which is why we disable the reduce step via the option -D mapred.reduce.tasks=0 (see Specifying Map-Only Jobs in the Hadoop Streaming documentation).

# Run the streaming job
$ hadoop jar hadoop-streaming-2.0.0-mr1-cdh4.3.0.jar \
    -D mapred.job.name="avro-streaming" \
    -D mapred.reduce.tasks=0 \
    -files avro-1.7.6.jar,avro-mapred-1.7.6-hadoop1.jar \
    -libjars avro-1.7.6.jar,avro-mapred-1.7.6-hadoop1.jar \
    -input  examples/input/ \
    -output streaming/output/ \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -inputformat org.apache.avro.mapred.AvroAsTextInputFormat

Once the job completes you can inspect the output data as follows:

$ hadoop fs -cat streaming/output/part-00000 | head -4
{"username": "miguno", "tweet": "Rock: Nerf paper, scissors is fine.", "timestamp": 1366150681}
{"username": "BlizzardCS", "tweet": "Works as intended.  Terran is IMBA.", "timestamp": 1366154481}
{"username": "DarkTemplar", "tweet": "From the shadows I come!", "timestamp": 1366154681}
{"username": "VoidRay", "tweet": "Prismatic core online!", "timestamp": 1366160000}

Please be aware that the output data just happens to be JSON. This is because we opted not to modify any of the input data in our MapReduce job. And since the input data to our MapReduce job is deserialized by Avro into JSON, the output turns out to be JSON, too. with a different MapReduce job you could of course write the output data in TSV or CSV format, for instance.

Reading Avro, writing Avro

AvroTextOutputFormat (implies “bytes” schema)

To write the output in Avro format instead of plain-text, use the same general options as in the previous example but also add:

$ hadoop jar hadoop-streaming-2.0.0-mr1-cdh4.3.0.jar \
    [...]
    -outputformat org.apache.avro.mapred.AvroTextOutputFormat

AvroTextOutputFormat is the equivalent of TextOutputFormat. It writes Avro data files with a “bytes” schema.

Note that using IdentityMapper as a naive mapper as shown in the previous example will not result in the output file being identical to the input file. This is because AvroTextOutputFormat will escape (quote) the input data it receives. An illustration might be worth a thousand words:

# After having used IdentityMapper as in the previous example
$ hadoop fs -copyToLocal streaming/output/part-00000.avro .
$ java -jar avro-tools-1.7.6.jar tojson part-00000.avro  | head -4
"{\"username\": \"miguno\", \"tweet\": \"Rock: Nerf paper, scissors is fine.\", \"timestamp\": 1366150681}\t"
"{\"username\": \"BlizzardCS\", \"tweet\": \"Works as intended.  Terran is IMBA.\", \"timestamp\": 1366154481}\t"
"{\"username\": \"DarkTemplar\", \"tweet\": \"From the shadows I come!\", \"timestamp\": 1366154681}\t"
"{\"username\": \"VoidRay\", \"tweet\": \"Prismatic core online!\", \"timestamp\": 1366160000}\t"

Custom Avro output schema

This looks not to be supported by stock Avro at the moment. A related JIRA ticket AVRO-1067, created in April 2012, is still unresolved as of July 2013.

For a workaround take a look at the section Avro output for Hadoop Streaming at avro-utils, a third-party library for Avro.

Enabling compression of Avro output data (Snappy or Deflate)

If you want to enable compression for the Avro output data, you must add the following parameters to the streaming job:

# For compression with Snappy
-D mapred.output.compress=true -D avro.output.codec=snappy

# For compression with Deflate
-D mapred.output.compress=true -D avro.output.codec=deflate

Be aware that if you enable compression with mapred.output.compress but are NOT specifying an Avro output format (such as AvroTextOutputFormat) your cluster’s configured default compression codec will determine the final format of the output data. For instance, if mapred.output.compression.codec is set to com.hadoop.compression.lzo.LzopCodec then the job’s output files would be compressed with LZO (e.g. you would see part-00000.lzo output files instead of uncompressed part-00000 files).

See also Compression and Avro in the CDH4 documentation.

Further readings on Hadoop Streaming


comments powered by Disqus