Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions

By Matthew Rathbone on August 17, 2015

hive hadoop UDAF SQL

This article was co-authored by Elena Akhmatova

This is part 3/3 in my tutorial series for extending Apache Hive.

In previous articles I outlined how to write very simple functions for Hive - UDF and GenericUDF, followed by the generic version - GenericUDTF.

In this post we will look at a function type in Hive that allows working with column data - a GenericUDAF represented by org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver and org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.

Examples of built-in UDAF functions include sum() and count().

Code

All code and data used in this post can be found in my hive examples GitHub repository.

Demonstration Data

The table that will be used for demonstration is called people. It has one column - name, which contains names of individuals and couples.

It is stored in a file called people.txt

~$ cat ./people.txt

John Smith
John and Ann White
Ted Green
Dorothy

We can upload this to Hadoop to a directory called people:

hadoop fs -mkdir people
hadoop fs -put ./people.txt people

Then load up the hive shell, and create the hive table

CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED FIELDS 
	TERMINATED BY '\t' 
	ESCAPED BY '' 
	LINES TERMINATED BY '\n'
STORED AS TEXTFILE 
LOCATION '/user/matthew/people';

The Value of UDAF

There are cases when we want to process data inside a column, contrary to a row data. Aggregating or ordering the data in a column for example.

A Practical Example

I will work through an example of aggregating data. Our UDTF post manipulated peoples’ names, so I will do something similar. Lets suppose we want to calculate number of letters in the entire name column of our people table.

To create a GenericUDAF we have to implement org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver and org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.

The resolver simply checks the input parameters and specifies which resolver to use, and so is fairly simple.

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;

The main work happens inside the Evaluator, in which we have several methods to implement.

Before proceeding, if you are not familiar with object inspectors, you might want to read my first post on Hive UDFs, in which I write a brief summary of their purpose.

// Object inspectors for input and output parameters
public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;

// class to store the result of the data processing
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

// reset Aggregation buffer
public void reset(AggregationBuffer agg) throws HiveException;

// process input record
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

// finilize processing of a part of all the input data
public Object terminatePartial(AggregationBuffer agg) throws HiveException;

// add the results of two partial aggregations together
public void merge(AggregationBuffer agg, Object partial) throws HiveException;

// output final result
public Object terminate(AggregationBuffer agg) throws HiveException;

The function below calculates the total number of characters in all the strings in the specified column (including spaces)

    public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

        PrimitiveObjectInspector inputOI;
        ObjectInspector outputOI;
        PrimitiveObjectInspector integerOI;
        
        int total = 0;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {
        	
            assert (parameters.length == 1);
            super.init(m, parameters);
           
            // init input object inspectors
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
                inputOI = (PrimitiveObjectInspector) parameters[0];
            } else {
                integerOI = (PrimitiveObjectInspector) parameters[0];
            }

            // init output object inspectors
            outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
                    ObjectInspectorOptions.JAVA);
            return outputOI;

        }

        /**
         * class for storing the current sum of letters
         */
        static class LetterSumAgg implements AggregationBuffer {
            int sum = 0;
            void add(int num){
            	sum += num;
            }
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            LetterSumAgg result = new LetterSumAgg();
            return result;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
        	LetterSumAgg myagg = new LetterSumAgg();
        }
        
        private boolean warned = false;

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)
                throws HiveException {
            assert (parameters.length == 1);
            if (parameters[0] != null) {
                LetterSumAgg myagg = (LetterSumAgg) agg;
                Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
                myagg.add(String.valueOf(p1).length());
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            LetterSumAgg myagg = (LetterSumAgg) agg;
            total += myagg.sum;
            return total;
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            if (partial != null) {
                
                LetterSumAgg myagg1 = (LetterSumAgg) agg;
                
                Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
                
                LetterSumAgg myagg2 = new LetterSumAgg();
                
                myagg2.add(partialSum);
                myagg1.add(myagg2.sum);
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            LetterSumAgg myagg = (LetterSumAgg) agg;
            total = myagg.sum;
            return myagg.sum;
        }

    }

Code walkthrough

To understand the API of this function better remember that Hive is just a set of MapReduce functions. The MapReduce code itself has been written for us and is hidden from our view for convenience (or inconvenience, perhaps). So let us refresh ourselves on Mappers and Combiners and Reducers while thinking about this function. Remember that with Hadoop we have different machines, and on each machine Mappers and Reducers work independently of all the others.

So broadly, this function reads data (mapper), combines a bunch of mapper output into partial results (combiner), and finally creates a final, combined output (reducer). Because we aggregage across many combiners, we need to accomodate the idea of partial results.

Looking deeper at the structure of the class:

  • init - specifies input and output types of data (we have previously seen the requirement to specify input and output parameters)

  • iterate - reads data from the input table (a typical Mapper)

  • terminate - outputs the final result (the Reducer)

and then there are Partials and an AggregationBuffer:

  • terminatePartial - outputs a partial result
  • merge - merges partial results into a single result (eg the outputs of multiple combiner calls)

There are some good resources on combiners, Philippe Adjiman has a really good walkthrough.

The AggregationBuffer allows us to store intermediate (and final) results. By defining our own buffer, we can process any type of data we like.

In my code example a sum of letters is stored in our (simple) AggregationBuffer.

/**
* class for storing the current sum of letters
*/
static class LetterSumAgg implements AggregationBuffer {
	int sum = 0;
	void add(int num){
		sum += num;
	}
}

One final part of the init method which may still be confusing is the concept of Mode. Mode is uded to define what the function should be doing at different stages of the MapReduce pipeline (mapping, combining or reducing)

Hive Documentation pages give the following explanation for the Mode:

Parameters: In PARTIAL1 and COMPLETE mode, the parameters are original data; In PARTIAL2 and FINAL mode, the parameters are just partial aggregations.
Returns: In PARTIAL1 and PARTIAL2 mode, the ObjectInspector for the return value of terminatePartial() call; In FINAL and COMPLETE mode, the ObjectInspector for the return value of terminate() call.

That means the UDAF receives different input at different MapReduce stages. iterate reads a line from our table (or an input record as per the InputFormat of our table to be more precise), and outputs something for aggregation in some other format. partialAggregation combines a number of these elements into an aggregated form of the same format. And then the final reducer takes this input and outputs a final result a format of which may be different from format in which the data was received.

Our Implementation

In the init() function we specify input as a string, final output as an integer, and partial aggregation output as an integer (stored in an aggregation buffer). That is, iterate() gets a String, merge() an Integer; and both terminate() and terminatePartial() output an Integer.

// init input object inspectors depending on the mode
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
	inputOI = (PrimitiveObjectInspector) parameters[0];
} else {
	integerOI = (PrimitiveObjectInspector) parameters[0];
}

// output
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
                    ObjectInspectorOptions.JAVA);

The iterate() function gets an input string from the column and calculates and stores the length of the input string.

public void iterate(AggregationBuffer agg, Object[] parameters)
	throws HiveException {
	...
	Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
	myagg.add(String.valueOf(p1).length());
	}
}

Merge adds a result of a partial sum to the AggregationBuffer

public void merge(AggregationBuffer agg, Object partial)
      	throws HiveException {
	if (partial != null) {
                
		LetterSumAgg myagg1 = (LetterSumAgg) agg;
                
		Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
                
		LetterSumAgg myagg2 = new LetterSumAgg();
                
		myagg2.add(partialSum);
		myagg1.add(myagg2.sum);
	}
}

Terminate returns the contents of an AggregationBuffer. This is where the final result is produced.

public Object terminate(AggregationBuffer agg) throws HiveException {
	LetterSumAgg myagg = (LetterSumAgg) agg;
	total = myagg.sum;
	return myagg.sum;
}

Using the Function in Hive

ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE TEMPORARY FUNCTION letters as 'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';

SELECT letters(name) 
FROM people;
OK
44
Time taken: 20.688 seconds

Testing

It is possible to write an effective Unit Test for part of this process, although effective unit testing is complex due to the complex nature of the API. I would recommend testing the individual aggregation functions if they are particularly complex, but testing the function as a whole is tough. More trivially, the final function can be tested on a test table in Hive.

This is actually the recommended workflow for developers wishing to submit their functions to the Hive project itself. See the “Creating the tests” in the official GenericUDAF tutorial.

Finishing up

By now you should be a pro at customizing Hive functions.

If you need more resources you can check out my personal blog post for a walkthrough of building regular user defined functions, or take a look at the Apache Hive Book.

Still Writing SQL in a Terminal?

Beekeeper is a free and full featured SQL IDE with query autocomplete, error detection, dataset previews, and result visualization. We support SparkSQL, Hive, Postgres and more!

Create your account »

Matthew Rathbone bio photo

Matthew Rathbone

CEO of Beekeeper Data. British. Data nerd.

Big Data Blog Email Twitter Github Stackoverflow