Using Avro in MapReduce Jobs with Hive
Apache Avro is a very popular data serialization format in the Hadoop technology stack. In this article I show code examples of MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format. We will use a small, Twitter-like data set as input for our example MapReduce jobs.
Table of Contents
avro-hadoop-starter
Example MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format.
Prerequisites
First you must clone my avro-hadoop-starter repository on GitHub.
$ git clone git@github.com:miguno/avro-hadoop-starter.git
$ cd avro-hadoop-starter
Requirements
The examples require the following software versions:
- Gradle 1.9 (only for the Java examples)
- Java JDK 7 (only for the Java examples)
- It is easy to switch to JDK 6. Mostly you will need to change the
sourceCompatibility
andtargetCompatibility
parameters in build.gradle from1.7
to1.6
But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
- It is easy to switch to JDK 6. Mostly you will need to change the
- Hadoop 2.x with MRv1 (not MRv2/YARN)
- Pig 0.11
- Hive 0.10
- Twitter Bijection 0.6
- Avro 1.7.6
More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.
Example data
We are using a small, Twitter-like data set as input for our example MapReduce jobs.
Avro schema
twitter.avro defines a basic schema for storing tweets:
{
"type" : "record",
"name" : "Tweet",
"namespace" : "com.miguno.avro",
"fields" : [ {
"name" : "username",
"type" : "string",
"doc" : "Name of the user account on Twitter.com"
}, {
"name" : "tweet",
"type" : "string",
"doc" : "The content of the user's Twitter message"
}, {
"name" : "timestamp",
"type" : "long",
"doc" : "Unix epoch time in seconds"
} ],
"doc:" : "A basic schema for storing Twitter messages"
}
The latest version of the schema is always available at twitter.avsc.
If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.
Avro data files
The actual data is stored in the following files:
- twitter.avro – encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
- twitter.json – JSON representation of the same example data
You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.
Here is a snippet of the example data:
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }
Requirements
The examples require the following software versions:
- Gradle 1.9 (only for the Java examples)
- Java JDK 7 (only for the Java examples)
- It is easy to switch to JDK 6. Mostly you will need to change the
sourceCompatibility
andtargetCompatibility
parameters in build.gradle from1.7
to1.6
But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
- It is easy to switch to JDK 6. Mostly you will need to change the
- Hadoop 2.x with MRv1 (not MRv2/YARN)
- Pig 0.11
- Hive 0.10
- Twitter Bijection 0.6
- Avro 1.7.6
More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.
Example data
We are using a small, Twitter-like data set as input for our example MapReduce jobs.
Avro schema
twitter.avsc defines a basic schema for storing tweets:
{
"type" : "record",
"name" : "Tweet",
"namespace" : "com.miguno.avro",
"fields" : [ {
"name" : "username",
"type" : "string",
"doc" : "Name of the user account on Twitter.com"
}, {
"name" : "tweet",
"type" : "string",
"doc" : "The content of the user's Twitter message"
}, {
"name" : "timestamp",
"type" : "long",
"doc" : "Unix epoch time in seconds"
} ],
"doc:" : "A basic schema for storing Twitter messages"
}
The latest version of the schema is always available at twitter.avsc.
If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.
Avro data files
The actual data is stored in the following files:
- twitter.avro – encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
- twitter.json – JSON representation of the same example data
You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.
Here is a snippet of the example data:
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }
Preparing the input data
The example input data we are using is twitter.avro.
Upload twitter.avro
to HDFS to make the input data available to our MapReduce jobs.
# Upload the input data
$ hadoop fs -mkdir examples/input
$ hadoop fs -copyFromLocal https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/test/resources/avro/twitter.avro examples/input
We will also upload the Avro schema twitter.avsc to HDFS because we will use a schema available at an HDFS location in one of the Hive examples.
# Upload the Avro schema
$ hadoop fs -mkdir examples/schema
$ hadoop fs -copyFromLocal https://github.com/sungsoo/avro-hadoop-starter/tree/master/src/main/resources/avro/twitter.avsc examples/schema
Hive
Preliminaries
Important: The examples below assume you have access to a running Hadoop cluster.
Examples
In this section we demonstrate how to create a Hive table backed by Avro data, followed by running a few simple Hive queries against that data.
Defining a Hive table backed by Avro data
Using avro.schema.url to point to remote a Avro schema file
The following CREATE TABLE
statement creates an external Hive table named tweets
for storing Twitter messages
in a very basic data structure that consists of username, content of the message and a timestamp.
For Hive version 0.11+:
Starting with Hive version 0.11 you must use SERDEPROPERTIES
instead of with TBLPROPERTIES
to specify the Avro
schema. If you mistakingly use TBLPROPERTIES
Hive will complain with a AvroSerdeException
.
-- Use the following syntax for Hive 0.11+
--
CREATE EXTERNAL TABLE tweets
COMMENT "A table backed by Avro data with the Avro schema stored in HDFS"
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
with SERDEPROPERTIES (
'avro.schema.url' = 'hdfs:///user/YOURUSER/examples/schema/twitter.avsc'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/YOURUSER/examples/input/';
For Hive versions <= 0.10:
-- Use the following syntax for Hive versions <= 0.10
--
CREATE EXTERNAL TABLE tweets_deprecated
COMMENT "A table backed by Avro data with the Avro schema stored in HDFS"
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/YOURUSER/examples/input/'
TBLPROPERTIES (
'avro.schema.url' = 'hdfs:///user/YOURUSER/examples/schema/twitter.avsc'
);
Important: Notice how with SERDEPROPERTIES
is specified after SERDE
and TBLPROPERTIES
after LOCATION
, respectively.
Note: You must replace YOURUSER
with your actual username.
See section Preparing the input data above.
The serde parameter avro.schema.url
can use URI schemes such as hdfs://
, http://
and file://
. It is
recommended to use HDFS locations though:
[If the avro.schema.url points] to a location on HDFS […], the AvroSerde will then read the file from HDFS, which should provide resiliency against many reads at once [which can be a problem for HTTP locations]. Note that the serde will read this file from every mapper, so it is a good idea to turn the replication of the schema file to a high value to provide good locality for the readers. The schema file itself should be relatively small, so this does not add a significant amount of overhead to the process.
That said, when hosting the schemas on a high-performance web server such as nginx that is very efficient at serving static files then using HTTP locations for Avro schemas should not be a problem either.
If you need to point to a particular HDFS namespace you can include the hostname and port of the NameNode in
avro.schema.url
:
CREATE EXTERNAL TABLE [...]
with SERDEPROPERTIES (
'avro.schema.url'='hdfs://namenode01:8020/path/to/twitter.avsc'
)
[...]
Note: Remember to use TBLPROPERTIES
(after LOCATION
) instead of with SERDEPROPERTIES
(after SERDE
) for Hive versions <= 0.10.
Using avro.schema.literal to embed an Avro schema
An alternative to setting avro.schema.url
and using an external Avro schema is to embed the schema directly within
the CREATE TABLE
statement:
CREATE EXTERNAL TABLE tweets
COMMENT "A table backed by Avro data with the Avro schema embedded in the CREATE TABLE statement"
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
with SERDEPROPERTIES (
'avro.schema.literal' = '{
"type": "record",
"name": "Tweet",
"namespace": "com.miguno.avro",
"fields": [
{ "name":"username", "type":"string"},
{ "name":"tweet", "type":"string"},
{ "name":"timestamp", "type":"long"}
]
}'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/YOURUSER/examples/input/';
Note: Remember to use TBLPROPERTIES
(after LOCATION
) instead of with SERDEPROPERTIES
(after SERDE
) for Hive versions <= 0.10.
Note: You must replace YOURUSER
with your actual username.
See section Preparing the input data above.
Hive can also use variable substitution to embed the required Avro schema at run-time of a Hive script:
CREATE EXTERNAL TABLE tweets [...]
with SERDEPROPERTIES ('avro.schema.literal' = '${hiveconf:schema}');
Note: Remember to use TBLPROPERTIES
(after LOCATION
) instead of with SERDEPROPERTIES
(after SERDE
) for Hive versions <= 0.10.
To execute the Hive script you would then run:
# SCHEMA must be a properly escaped version of the Avro schema; i.e. carriage returns converted to \n, tabs to \t,
# quotes escaped, and so on.
$ export SCHEMA="..."
$ hive -hiveconf schema="${SCHEMA}" -f hive_script.hql
Switching from avro.schema.url to avro.schema.literal or vice versa
If for a given Hive table you want to change how the Avro schema is specified you need to use a workaround:
Hive does not provide an easy way to unset or remove a property. If you wish to switch from using url or schema to the other, set the to-be-ignored value to none and the AvroSerde will treat it as if it were not set.
Analyzing the data with Hive
After you have created the Hive table tweets
with one of the CREATE TABLE
statements above (no matter which),
you can start analyzing the example data with Hive. We will demonstrate this via the interactive Hive shell, but you
can also use a Hive script, of course.
First, start the Hive shell:
$ hive
hive>
Let us inspect how Hive interprets the Avro data with DESCRIBE
. You can also use DESCRIBE EXTENDED
to see even
more details, including the Avro schema of the table.
hive> DESCRIBE tweets;
OK
username string from deserializer
tweet string from deserializer
timestamp bigint from deserializer
Time taken: 1.786 seconds
Now we can perform interactive analysis of our example data:
hive> SELECT * FROM tweets LIMIT 5;
OK
miguno Rock: Nerf paper, scissors is fine. 1366150681
BlizzardCS Works as intended. Terran is IMBA. 1366154481
DarkTemplar From the shadows I come! 1366154681
VoidRay Prismatic core online! 1366160000
VoidRay Fire at will, commander. 1366160010
Time taken: 0.126 seconds
The following query will launch a MapReduce job to compute the result:
hive> SELECT DISTINCT(username) FROM tweets;
Total MapReduce jobs = 1
Launching Job 1 out of 1
[...snip...]
MapReduce Total cumulative CPU time: 4 seconds 290 msec
Ended Job = job_201305070634_0187
MapReduce Jobs Launched:
Job 0: Map: 1 Reduce: 1 Cumulative CPU: 4.29 sec HDFS Read: 1887 HDFS Write: 47 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 290 msec
OK
BlizzardCS <<< Query results start here
DarkTemplar
Immortal
VoidRay
miguno
Time taken: 16.782 seconds
As you can see Hive makes working Avro data completely transparent once you have defined the Hive table accordingly.
Enabling compression of Avro output data
To enable compression add the following statements to your Hive script or enter them into the Hive shell:
# For compression with Snappy
SET hive.exec.compress.output=true;
SET avro.output.codec=snappy;
# For compression with Deflate
SET hive.exec.compress.output=true;
SET avro.output.codec=deflate;
To disable compression again in the same Hive script/Hive shell:
SET hive.exec.compress.output=false;
Hive and Hue
There are good and bad news:
- Good: You can readily browse Avro-backed Hive tables in Hue via a table’s “Sample” tab.
- Bad: You cannot (yet) inspect the table metadata – e.g. column names and types – for Avro-backed Hive tables via a table’s “Columns” tab. Hue will also display “No data available”.
Figure 1: Browsing data of Avro Hive tables works as expected.
Figure 2: Displaying metadata of Avro Hive tables does not work yet.
Further readings on Hive
- AvroSerDe - working with Avro from Hive – Hive documentation