Stop Thinking, Just Do!

Sung-Soo Kim's Blog

Using Avro in MapReduce Jobs with Java

tagsTags

23 March 2014


Using Avro in MapReduce Jobs with Java

Apache Avro is a very popular data serialization format in the Hadoop technology stack. In this article I show code examples of MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format. We will use a small, Twitter-like data set as input for our example MapReduce jobs.


Table of Contents


avro-hadoop-starter

Example MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format.

Prerequisites

First you must clone my avro-hadoop-starter repository on GitHub.

$ git clone git@github.com:miguno/avro-hadoop-starter.git
$ cd avro-hadoop-starter

Requirements

The examples require the following software versions:

  • Gradle 1.9 (only for the Java examples)
  • Java JDK 7 (only for the Java examples)
    • It is easy to switch to JDK 6. Mostly you will need to change the sourceCompatibility and targetCompatibility parameters in build.gradle from 1.7 to 1.6 But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
  • Hadoop 2.x with MRv1 (not MRv2/YARN)
  • Pig 0.11
  • Hive 0.10
  • Twitter Bijection 0.6
  • Avro 1.7.6

More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.

Example data

We are using a small, Twitter-like data set as input for our example MapReduce jobs.

Avro schema

twitter.avro defines a basic schema for storing tweets:

{
  "type" : "record",
  "name" : "Tweet",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc"  : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc"  : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

The latest version of the schema is always available at twitter.avsc.

If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.

Avro data files

The actual data is stored in the following files:

  • twitter.avro – encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
  • twitter.json – JSON representation of the same example data

You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.

Here is a snippet of the example data:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }

Requirements

The examples require the following software versions:

  • Gradle 1.9 (only for the Java examples)
  • Java JDK 7 (only for the Java examples)
    • It is easy to switch to JDK 6. Mostly you will need to change the sourceCompatibility and targetCompatibility parameters in build.gradle from 1.7 to 1.6 But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
  • Hadoop 2.x with MRv1 (not MRv2/YARN)
  • Pig 0.11
  • Hive 0.10
  • Twitter Bijection 0.6
  • Avro 1.7.6

More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.

Example data

We are using a small, Twitter-like data set as input for our example MapReduce jobs.

Avro schema

twitter.avsc defines a basic schema for storing tweets:

{
  "type" : "record",
  "name" : "Tweet",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc"  : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc"  : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

The latest version of the schema is always available at twitter.avsc.

If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.

Avro data files

The actual data is stored in the following files:

  • twitter.avro – encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
  • twitter.json – JSON representation of the same example data

You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.

Here is a snippet of the example data:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }

Preparing the input data

The example input data we are using is twitter.avro. Upload twitter.avro to HDFS to make the input data available to our MapReduce jobs.

# Upload the input data
$ hadoop fs -mkdir examples/input
$ hadoop fs -copyFromLocal https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/test/resources/avro/twitter.avro examples/input

We will also upload the Avro schema twitter.avsc to HDFS because we will use a schema available at an HDFS location in one of the Hive examples.

# Upload the Avro schema
$ hadoop fs -mkdir examples/schema
$ hadoop fs -copyFromLocal https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/main/resources/avro/twitter.avsc examples/schema

Java

Usage

To prepare your Java IDE:

# IntelliJ IDEA
$ ./gradlew cleanIdea idea   # then File > Open... > avro-hadoop-starter.ipr
# Eclipse
$ ./gradlew cleanEclipse eclipse

To build the Java code and to compile the Avro-based Java classes from the schemas (*.avsc) in https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/main/resources/avro/:

$ ./gradlew clean build

The generated Avro-based Java classes are written under the directory tree generated-sources/. The Avro compiler will generate a Java class Tweet from the twitter.avsc schema.

To run the unit tests (notably TweetCountTest, see section Examples below):

$ ./gradlew test

Note: ./gradlew test executes any JUnit unit tests. If you add any TestNG unit tests you need to run ./gradlew testng for executing those.

You can also run ./gradlew cobertura which will generate a test coverage report at ./build/reports/cobertura/coverage.xml that you can integrate into your CI setup.

Examples

TweetCount

TweetCount implements a MapReduce job that counts the number of tweets created by Twitter users.

TweetCount: Usage: TweetCount <input path> <output path>

TweetCountTest

TweetCountTest is very similar to TweetCount.
It uses twitter.avro as its input and runs a unit test on it with the same MapReduce job as TweetCount. The unit test includes comparing the actual MapReduce output (in Snappy-compressed Avro format) with expected output. TweetCountTest extends ClusterMapReduceTestCase(MRv1), which means that the corresponding MapReduce job is launched in-memory via MiniMRCluster.

MiniMRCluster and Hadoop MRv2

The MiniMRCluster that is used by ClusterMapReduceTestCase in MRv1 is deprecated in Hadoop MRv2. When using MRv2 you should switch to MiniMRClientClusterFactory, which provides a wrapper interface called MiniMRClientCluster around the MiniMRYarnCluster (MRv2):

MiniMRClientClusterFactory

A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster interface around the MiniMRYarnCluster. While in MR1, it provides such wrapper around MiniMRCluster. This factory should be used in tests to provide an easy migration of tests across MR1 and MR2.

See Experimenting with MapReduce 2.0 for more information.

Avro - String vs CharSequence vs Utf8

One caveat when using Avro in Java (or Scala, …) is that you may create a new Avro-backed object with a java.lang.String parameter (e.g. the username in the Avro schema we use in our examples), but as you convert your data record to binary and back to POJO you will observe that Avro actually gives you an instance of CharSequence instead of a String. Now the problem is that by default Avro generated Java classes expose CharSequence for string fields in their API but unfortunately you cannot use just any CharSequence when interacting with your data records – such as java.lang.String, which does implement CharSequence.

You must use Avro’s own Utf8 instead. A typical case where you run into this gotcha is when your unit tests complain that doing a round-trip conversion of a data record does apparently not result in the original record.

One possible remedy to this problem is to instruct Avro to explicitly return an instance of String. This is usually what you want as it provides you with the intuitive behavior that you’d typically expect. Your mileage may vary though.

For details see AVRO-803 Java generated Avro classes make using Avro painful and surprising.

Enforce use of String when using sbt

Add the following to your build.sbt, assuming you use cavorite’s sbt-avro plugin.

(stringType in avroConfig) := "String"

Enforce use of String when using gradle

Add the following to your build.gradle, assuming you use my avro-gradle-plugin.

compileAvro {
  stringType = 'String'
}

Enforce use of String when using maven

Add the following to the configuration of avro-maven-plugin in your pom.xml.

<stringType>String</stringType>

Further readings on Java


comments powered by Disqus