Monday, October 24, 2011

Enable Sorted Bucketing in Hive


From the hive documents mostly we get to an impression as for grouping records, we go in for partitions and for sampling purposes, ie for evenly distributed records across multiple files we go in for buckets. But can we group records based on some columns/fields in buckets as well (individual files in buckets).
Concepts get clearer when we explain it through examples. So I’m taking the same route here.  Once with a hadoop assignment we did design a hadoop hybrid solution where the final output was on a hive partitioned table. This final output has to be consumed by an Oracle DWH for some legacy applications. The hand shake between hadoop and oracle team was they wanted ‘n’ files for each sub partition/folder and the files should have data grouped based on a few columns in the table (country and continent). If the files are grouped then the oracle load would be much efficient. How can we get the solution materialized?

1.       After hive operations do a map reduce on the final folders that would do the Group by
You do this by setting the number of reducers to ’n’ for n output files while running against each sub folder.It is really not a good solution because you have to run the map reduce for all sub partitions/folders which is definitely a performance glitch.
2.       Bucketing in hive
Using bucketing in hive for sub paritions. It is not plain bucketing but sorted bucketing. Normally we enable bucketing in hive during table creation as
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';

When we go into sorted bucketing/grouped bucketing our DDL would look like

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(         
Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) SORTED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';     

Now to enforce bucketing while loading data into the table, we need to enable a hive parameter as follows
set hive.enforce.bucketing = true;
                     
With this DDL our requirement would be satisfied. The n individual files within each sub partitions and the records would be grouped into n files based on country, continent. ie the a particular combination of country, continent would be present in only one file. Now if the question arises, which combination in which file? It is decided by the hash partitioning function. If you want control over that you need to write your custom hash partitioner and plug in the same into your hive session.

NOTE: When we use partitions data is stored under individual directories/sub directories in hdfs. But when we use buckets the records are stored as files with naming convention ranging from 0 to n-1.

NOTE: In partitioned tables when we issue a query only the required partitions are scanned, no need to specify any hints in your hive query. But for bucketed tables it is not the case, you need to hint your hive query if you want to scan some particular buckets else the whole set of files would be scanned. We hint the buckets using TABLESAMPLE clause in our hive query. For example in our example if we want to choose only the data from BUCKET 2
SELECT * FROM test_table TABLESAMPLE(2 OUT OF n BUCKETS)WHERE dt=’2011-10-11’ AND hr=’13’;

Include values during execution time in hive QL/ Dynamically substitute values in hive


When you play around with data warehousing it is very common to come across scenarios where you’d like to submit values at run time. In production environments when we have to enable a hive job we usually write our series of hive operations in HQL on a file and trigger it using the hive –f option from a shell script or some workflow management systems like oozie. Let’s have this discussion limited to triggering the hive job from shell as it is the basic one.
      Say I’m having a hive job called hive_job.hql, normally from a shell I’d trigger the hive job as
hive -f hive_job.hql

If I need to set some hive config parameters, say I need to enable compression in hive then I’d include the following arguments along as
hive -f hive_job.hql  -hiveconf hive.exec.compress.output=true -hiveconf mapred.output.compress=true -hiveconf mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

Now the final one, say my hive QL is doing some operations on date range and this date is varying/dynamic. This date is to be accepted from CLI each time. We can achieve the same with the following steps
1.       Pass the variables as config parameters
2.       Refer these config parameters in your hive query

                Let me make it more specific with a small example. I need to perform some operation on records in a table called ‘test_table’ which has a column/field named ‘creation_date’ .(ie I need to filter records based on creation_date). The date range for the operation is supplied at run time. It is achieved as
1.       Pass the variables as config parameters
                Here we need to pass two parameters, the start date and end date to get all the records within a specific date range
hive -f hive_job.hql  -hiveconf start_date=2011-01-01 –hiveconf end _date=2011-12-31
2.       Refer these config parameters in your hive query
                In our hive QL the start date and end date are to be decorated with place holders to be replaced by actual values during execution time.
SELECT * FROM test_table t1 WHERE t1.created_date=’ ${hiveconf: start_date }’ AND t2.created_date=’ ${hiveconf: end _date}’


Let us look into one more example, decide on the number of reducers during run time with the previous set of requirements along with compression. I have 2 components here 

·         Shell script that triggers .hql
Hive_trigger.sh
# The script accepts 3 parameters in order- number of reducers, start date and end date.
NUM_REDUCERS=$1
BEGIN_DATE=$2
CLOSE_DATE=$3
hive -f hive_job.hql  -hiveconf mapred.reduce.tasks= NUM_REDUCERS  -hiveconf start_date= BEGIN_DATE –hiveconf end _date= CLOSE_DATE

·         HQL that accepts parameters at run time
hive_job.hql
SELECT * FROM test_table t1 WHERE t1.created_date=’ ${hiveconf: start_date }’ AND t2.created_date=’ ${hiveconf: end _date}’

The components are ready you can now trigger you shell script. For example as
./hive_trigger.sh 50 2011-01-01 2011-12-31

How to efficiently store data in hive/ Store and retrieve compressed data in hive


Hive is a data warehousing tool built on top of hadoop. The data corresponding to hive tables are stored as delimited files in hdfs. Since it is used for data warehousing, the data for production system hive tables would definitely be at least in terms of hundreds of gigs. Now naturally the question arises, how efficiently we can store this data, definitely it has to be compressed. Now a few more questions arise, how can store compressed data in hive? How can we process and retrieve compressed data in hive using hive QL.
                Now let’s look into these, it is fairly simple if you know hive. Before you use hive you need to enable a few parameters for dealing with compressed tables. It is the same compression enablers when you play around with map reduce along with a few of hive parameters.

·         hive.exec.compress.output=true
·         mapred.output.compress=true
·         mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

Here I have used LZO as my compression in hdfs, hence using the LzopCodec. Beyond setting this you don’t need to do anything else, use the hive QLs normally as you do with uncompressed data. I have tried out the same successfully with Dynamic Partitions, Buckets etc, It works like any normal hive operations.
               The input data for me from conventional sources were normal text, this raw data was loaded into a staging table. From the staging table with some hive QL the cleansed data was loaded into actual hive tables . The staging table gets flushed every time the data is loaded into target hive table.