This is part 3/3 in my tutorial series for extending Apache Hive.
In previous articles I outlined how to write very simple functions for Hive -
GenericUDF, followed by the generic version -
In this post we will look at a function type in Hive that allows working with column data - a GenericUDAF represented by
Examples of built-in UDAF functions include
All code and data used in this post can be found in my
hive examples GitHub repository.
The table that will be used for demonstration is called
people. It has one column - name, which contains names of individuals and couples.
It is stored in a file called
We can upload this to Hadoop to a directory called
Then load up the
hive shell, and create the hive table
The Value of UDAF
There are cases when we want to process data inside a column, contrary to a row data. Aggregating or ordering the data in a column for example.
A Practical Example
I will work through an example of aggregating data. Our UDTF post manipulated peoples’ names, so I will do something similar. Lets suppose we want to calculate number of letters in the entire
name column of our
To create a GenericUDAF we have to implement
The resolver simply checks the input parameters and specifies which resolver to use, and so is fairly simple.
The main work happens inside the Evaluator, in which we have several methods to implement.
Before proceeding, if you are not familiar with object inspectors, you might want to read my first post on Hive UDFs, in which I write a brief summary of their purpose.
The function below calculates the total number of characters in all the strings in the specified column (including spaces)
To understand the API of this function better remember that Hive is just a set of MapReduce functions. The MapReduce code itself has been written for us and is hidden from our view for convenience (or inconvenience, perhaps). So let us refresh ourselves on Mappers and Combiners and Reducers while thinking about this function. Remember that with Hadoop we have different machines, and on each machine Mappers and Reducers work independently of all the others.
So broadly, this function reads data (mapper), combines a bunch of mapper output into partial results (combiner), and finally creates a final, combined output (reducer). Because we aggregage across many combiners, we need to accomodate the idea of partial results.
Looking deeper at the structure of the class:
init- specifies input and output types of data (we have previously seen the requirement to specify input and output parameters)
iterate- reads data from the input table (a typical Mapper)
terminate- outputs the final result (the Reducer)
and then there are Partials and an AggregationBuffer:
terminatePartial- outputs a partial result
merge- merges partial results into a single result (eg the outputs of multiple combiner calls)
There are some good resources on combiners, Philippe Adjiman has a really good walkthrough.
AggregationBuffer allows us to store intermediate (and final) results. By defining our own buffer, we can process any type of data we like.
In my code example a sum of letters is stored in our (simple) AggregationBuffer.
One final part of the
init method which may still be confusing is the concept of
Mode. Mode is uded to define what the function should be doing at different stages of the MapReduce pipeline (mapping, combining or reducing)
Hive Documentation pages give the following explanation for the Mode:
That means the UDAF receives different input at different MapReduce stages.
iterate reads a line from our table (or an input record as per the InputFormat of our table to be more precise), and outputs something for aggregation in some other format.
partialAggregation combines a number of these elements into an aggregated form of the same format. And then the final reducer takes this input and outputs a final result a format of which may be different from format in which the data was received.
In the init() function we specify input as a string, final output as an integer, and partial aggregation output as an integer (stored in an aggregation buffer). That is,
iterate() gets a String,
merge() an Integer; and both
terminatePartial() output an Integer.
iterate() function gets an input string from the column and calculates and stores the length of the input string.
Merge adds a result of a partial sum to the AggregationBuffer
Terminate returns the contents of an AggregationBuffer. This is where the final result is produced.
Using the Function in Hive
It is possible to write an effective Unit Test for part of this process, although effective unit testing is complex due to the complex nature of the API. I would recommend testing the individual aggregation functions if they are particularly complex, but testing the function as a whole is tough. More trivially, the final function can be tested on a test table in Hive.
This is actually the recommended workflow for developers wishing to submit their functions to the Hive project itself. See the “Creating the tests” in the official GenericUDAF tutorial.
By now you should be a pro at customizing Hive functions.