Haivvreo (pronounced with as many diphthongs as possible. Diphthongs are cool, like bowties.) is a Hive Serde that LinkedIn has developed to process Avro-encoded data in Hive. Haivvreo's bullet points:
- Infers the schema of the Hive table from the Avro schema.
- Reads all Avro files within a table against a specified schema, taking advantage of Avro's backwards compatibility abilities
- Supports arbitrarily nested schemas.
- Translates all Avro data types into equivalent Hive types. Most types map exactly, but some Avro types don't exist in Hive and are automatically converted by Haivvreo.
- Understands compressed Avro files.
- Transparently converts the Avro idiom of handling nullable types as Union[T, null] into just T and returns null when appropriate.
- Writes any Hive table to Avro files.
- Has worked reliably against our most convoluted Avro schemas in our ETL process.
Haivvreo is Apache 2 licensed.
Haivvreo has been built and tested against Hive 0.7 and Avro 1.4.1. Its build script is built on Maven 3.
Once the jar has been has been built via maven, it and Avro and Avro's transitive dependencies (avro-1.4.1.jar, jackson-core-asl-1.4.2.jar, jackson-mapper-asl-1.4.2.jar) should be made available to Hive either by placing them in the cluster's lib folders or by pointing HIVE_AUX_JARS_PATH to a location with them.
While most Avro types convert directly to equivalent Hive types, there are some which do not exist in Hive and are converted to reasonable equivalents. Also, Haivvreo special cases unions of null and another type, as described below:
Avro type | Becomes Hive type | Note |
---|---|---|
null | void | |
boolean | boolean | |
int | int | |
long | bigint | |
float | float | |
double | double | |
bytes | Array[smallint] | Hive converts these to signed bytes. |
string | string | |
record | struct | |
map | map | |
list | array | |
union | union | Unions of [T, null] transparently convert to nullable T, other types translate directly to Hive's unions of those types. However, unions were introduced in Hive 7 and are not currently able to be used in where/group-by statements. They are essentially look-at-only. I'll file a bug with Hive on this. Because Haivvreo transparently converts [T,null], to nullable T, this limitation only applies to unions of multiple types or unions not of a single type and null. |
enum | string | Hive has no concept of enums |
fixed | Array[smallint] | Hive converts the bytes to signed int |
To create a Haivvreo-backed table, specify the serde as com.linkedin.AvroSerDe, specify the inputformat as com.linkedin.haivvreo.AvroContainerInputFormat, and the outputformat as com.linkedin.haivvreo.AvroContainerOutputFormat. Also provide a location from which Haivvreo will pull the most current schema for the table. For example:
CREATE TABLE kst PARTITIONED BY (ds string) ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe' WITH SERDEPROPERTIES ( 'schema.url'='http://schema_provider/kst.avsc') STORED AS INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat' OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
In this example we're pulling the source-of-truth reader schema from a webserver. Other options for providing the schema are described below. Add the Avro files to the database (or create an external table) using standard Hive operations. This table might result in a description as below:
hive> describe kst; OK string1 string from deserializer string2 string from deserializer int1 int from deserializer boolean1 boolean from deserializer long1 bigint from deserializer float1 float from deserializer double1 double from deserializer inner_record1 struct<int_in_inner_record1:int,string_in_inner_record1:string> from deserializer enum1 string from deserializer array1 array<string> from deserializer map1 map<string,string> from deserializer union1 uniontype<float,boolean,string> from deserializer fixed1 array<tinyint> from deserializer null1 void from deserializer unionnullint int from deserializer bytes1 array<tinyint> from deserializer
At this point, the Avro-backed table can be worked with in Hive like any other table. Haivvreo cannot yet show comments included in the Avro schema, though a JIRA has been opened for this feature.
Haivvreo can serialize any Hive table to Avro files. This makes it effectively an any-Hive-type to Avro converter. In order to write a table to an Avro file, you must first create an appropriate Avro schema. Create as select type statements are not currently supported. Types translate as detailed in the table above. For types that do not translate directly, there are a few items to keep in mind:
- Types that may be null must be defined as a union of that type and Null within Avro. A null in a field that is not so defined with result in an exception during the save. No changes need be made to the Hive schema to support this, as all fields in Hive can be null.
- Avro Bytes type should be defined in Hive as lists of tiny ints. Haivvreo will convert these to Bytes during the saving process.
- Avro Fixed type should be defined in Hive as lists of tiny ints. Haivvreo will convert these to Fixed during the saving process.
- Avro Enum type should be defined in Hive as strings, since Hive doesn't have a concept of enums. Ensure that only valid enum values are present in the table - trying to save a non-defined enum will result in an exception.
Example
Consider the following Hive table, which coincidentally covers all types of Hive data types, making it a good example:
CREATE TABLE test_serializer(string1 STRING, int1 INT, tinyint1 TINYINT, smallint1 SMALLINT, bigint1 BIGINT, boolean1 BOOLEAN, float1 FLOAT, double1 DOUBLE, list1 ARRAY<STRING>, map1 MAP<STRING,INT>, struct1 STRUCT<sint:INT,sboolean:BOOLEAN,sstring:STRING>, union1 uniontype<FLOAT, BOOLEAN, STRING>, enum1 STRING, nullableint INT, bytes1 ARRAY<TINYINT>, fixed1 ARRAY<TINYINT>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' MAP KEYS TERMINATED BY '#' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
To save this table as an Avro file, create an equivalent Avro schema (the namespace and actual name of the record are not important):
{ "namespace": "com.linkedin.haivvreo", "name": "test_serializer", "type": "record", "fields": [ { "name":"string1", "type":"string" }, { "name":"int1", "type":"int" }, { "name":"tinyint1", "type":"int" }, { "name":"smallint1", "type":"int" }, { "name":"bigint1", "type":"long" }, { "name":"boolean1", "type":"boolean" }, { "name":"float1", "type":"float" }, { "name":"double1", "type":"double" }, { "name":"list1", "type":{"type":"array", "items":"string"} }, { "name":"map1", "type":{"type":"map", "values":"int"} }, { "name":"struct1", "type":{"type":"record", "name":"struct1_name", "fields": [ { "name":"sInt", "type":"int" }, { "name":"sBoolean", "type":"boolean" }, { "name":"sString", "type":"string" } ] } }, { "name":"union1", "type":["float", "boolean", "string"] }, { "name":"enum1", "type":{"type":"enum", "name":"enum1_values", "symbols":["BLUE","RED", "GREEN"]} }, { "name":"nullableint", "type":["int", "null"] }, { "name":"bytes1", "type":"bytes" }, { "name":"fixed1", "type":{"type":"fixed", "name":"threebytes", "size":3} } ] }
If the table were backed by a csv such as:
why hello there | 42 | 3 | 100 | 1412341 | true | 42.43 | 85.23423424 | alpha:beta:gamma | Earth#42:Control#86:Bob#31 | 17:true:Abe Linkedin | 0:3.141459 | BLUE | 72 | 0:1:2:3:4:5 | 50:51:53 |
another record | 98 | 4 | 101 | 9999999 | false | 99.89 | 0.00000009 | beta | Earth#101 | 1134:false:wazzup | 1:true | RED | NULL | 6:7:8:9:10 | 54:55:56 |
third record | 45 | 5 | 102 | 999999999 | true | 89.99 | 0.00000000000009 | alpha:gamma | Earth#237:Bob#723 | 102:false:BNL | 2:Time to go home | GREEN | NULL | 11:12:13 | 57:58:59 |
CREATE TABLE as_avro ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe' WITH SERDEPROPERTIES ( 'schema.url'='file:///path/to/the/schema/test_serializer.avsc') STORED as INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat' OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat'; insert overwrite table as_avro select * from test_serializer;The files that are written by the Hive job are valid Avro files, however, MapReduce doesn't add the standard .avro extension. If you copy these files out, you'll likely want to rename them with .avro.
Hive is very forgiving about types: it will attempt to store whatever value matches the provided column in the equivalent column position in the new table. No matching is done on column names, for instance. Therefore, it is incumbent on the query writer to make sure the the target column types are correct. If they are not, Avro may accept the type or it may throw an exception, this is dependent on the particular combination of types.
There are three ways to provide the reader schema for an Avro table, all of which involve parameters to the serde. As the schema involves, one can update these values by updating the parameters in the table.
Use schema.url
Specifies a url to access the schema from. For http schemas, this works for testing and small-scale clusters, but as the schema will be accessed at least once from each task in the job, this can quickly turn the job into a DDOS attack against the URL provider (a web server, for instance). Use caution when using this parameter for anything other than testing.
The schema can also point to a location on HDFS, for instance: hdfs://your-nn:9000/path/to/avsc/file. Haivvreo will then read the file from HDFS, which should provide resiliency against many reads at once. Note that the serde will read this file from every mapper, so it's a good idea to turn the replication of the schema file to a high value to provide good locality for the readers. The schema file itself should be relatively small, so this does not add a significant amount of overhead to the process.
Use schema.literal and embed the schema in the create statement
One can embed the schema directly into the create statement. This works if the schema doesn't have any single quotes (or they are appropriately escaped), as Hive uses this to define the parameter value. For instance:
CREATE TABLE embedded COMMENT "just drop the schema right into the HQL" ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe' WITH SERDEPROPERTIES ( 'schema.literal'='{ "namespace": "com.linkedin.haivvreo", "name": "some_schema", "type": "record", "fields": [ { "name":"string1","type":"string"}] }') STORED AS INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat' OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
Note that the value is enclosed in single quotes and just pasted into the create statement.
Use schema.literal and pass the schema into the script
Hive can do simple variable substitution and one can pass the schema embedded in a variable to the script. Note that to do this, the schema must be completely escaped (carriage returns converted to \n, tabs to \t, quotes escaped, etc). An example:
set hiveconf:schema; DROP TABLE example; CREATE TABLE example ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe' WITH SERDEPROPERTIES ( 'schema.literal'='${hiveconf:schema}') STORED AS INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat' OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
To execute this script file, assuming $SCHEMA has been defined to be the escaped schema value:
hive -hiveconf schema="${SCHEMA}" -f your_script_file.sql
Note that $SCHEMA is interpolated into the quotes to correctly handle spaces within the schema.
Use none to ignore either schema.literal or schema.url
Hive does not provide an easy way to unset or remove a property. If you wish to switch from using url or schema to the other, set the to-be-ignored value to none and Haivvreo will treat it as if it were not set.
Hive tends to swallow exceptions from Haivvreo that occur before job submission. To force Hive to be more verbose, it can be started with hive -hiveconf hive.root.logger=INFO,console, which will spit orders of magnitude more information to the console and will likely include any information Haivvreo is trying to get you about what went wrong. If Haivvreo encounters an error during MapReduce, the stack trace will be provided in the failed task log, which can be examined from the JobTracker's web interface. Haivvreo only emits HaivvreoException; look for these. Please include these in any bug reports. The most common is expected to be exceptions while attempting to serializing an incompatible type from what Avro is expecting.
- Why do I get error-error-error-error-error-error-error and a message to check schema.literal and schema.url when describing a table or ruinning a query against a table?
Haivvreo returns this message when it has trouble finding or parsing the schema provided by either the schema.literal or schema.url value. It is unable to be more specific because Hive expects all calls to the serde config methods to be successful, meaning we are unable to return an actual exception. By signaling an error via this message, the table is left in a good state and the incorrect value can be corrected with a call to alter table T set serdeproperties.
- Why do I get a java.io.IOException: com.linkedin.haivvreo.HaivvreoException: Neither schema.literal nor schema.url specified, can't determine table schema when pruning by a partition that doesn't exist?
Hive creates a temporary empty file for non-existent partitions in order that queries referencing them succeed (returning a count of zero rows). However, when doing so, it doesn't pass the correct information to the RecordWriter, leaving Haivvreo unable to construct one. This problem has been corrected in Hive 0.8 . With previous versions of Hive, either be sure to only filter on existing partitions or apply HIVE-2260.
We're currently testing Haivvreo in our production ETL process and have found it reliable and flexible. We'll be working on improving its performance (there are several opportunities for improvement both in Hive and Haivvreo itself). The next feature we'll add is schema induction when writing from Hive tables so that it is not necessary to provide an equivalent Avro schema ahead of time.
Please kick the tires and file bugs.