Monday, September 19, 2011

Joins with plain Map Reduce or MultipleInputs



Being a map reduce developer I’d never recommend to write joins of data sets using custom map reduce code. You have very intelligent and powerful tools handy in hadoop like hive and pig that can easily join huge data sets with the choice of join like inner, outer etc. But if such a scenario arises where you need to do join using map reduce you should be able to accomplish that with your knowledge on basic map reduce programming.
                Let us look into a mocked up example for the same.

Problem Statement: A retailer has a customer data base and he need to do some promotions based on the same. He chooses bulk sms as the choice of his promotion which is done by a third part for him. And once the sms is pushed the sms provider returns the delivery status back to the retailer. Now let us look into more details which makes things a little complicated
We have 3 input files as follows
1.       UserDetails.txt
                                 i.            Every record is of the format ‘mobile number , consumer name’
2.       DeliveryDetails.txt
                                 i.            Every record is of the format ‘mobile number, delivery status code’
3.       DeliveryStatusCodes.txt
                                 i.            Every record is of the format ‘delivery status code, status message’
The retailer has a consumer data base(UserDetails.txt)  from which only the mobile number are provided to a bulk sms provider. He can’t reveal the customer name due to security reasons. Once the messages are pushed the sms provider sends back a report of the mobile numbers with status code (DeliveryDetails.txt) and also a look up file that relates every status code to the corresponding Status message (DeliveryStatusCodes.txt).
 The requirement is that for meaningful information we need the consumer name along with its corresponding status message. And we need to obtain the same from these 3 files.

Sample Inputs
File 1 – UserDetails.txt
123 456, Jim
456 123, Tom
789 123, Harry
789 456, Richa

File 2 – DeliveryDetails.txt
123 456, 001
456 123, 002
789 123, 003
789 456, 004

File 3 – DeliveryStatusCodes.txt
001, Delivered
002, Pending
003, Failed
004, Resend

Expected Output
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend


Solution : Using core MapReduce
1.       Use two different mapper classes for both processing the  initial inputs from UserDetails.txt and DeliveryDetails.txt, The Key value output from the mappers should be as follows
a)      UserDetails.txt
                                                         i.            Key(Text) – mobile number
                                                       ii.            Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
b)      DeliveryDetails.txt
                                                         i.            Key(Text) – mobile number
                                                       ii.            Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code
So here since the two files needs to be parsed separately using two mappers. I’m using
UserFile Mapper.java to process UserDetails.txt and
DeliveryFileMapper.java to process DeliveryDetails.txt
In map reduce API, I’m using MulipleInputFormat to specify which input to go into which mapper. But the ouput key value pairs from the mapper go into the same reducer, for the Reducer to identify the source of the value we are prepending the values ‘CD’ or ‘DR’.

2.       On the reducer end use distributed cache to distribute the DeliveryStatusCodes.txt. Parse the file and load the contents into HashMap with Key being the status code and value being the status message

3.       On the reducer every key would be having two values one with prefix ‘CD’ and other ‘DR’. (For simplicity let us assume only 2 values, in real time it can be more). Identify the records and from CD get the customer name corresponding to the cell number (input key) and from DR get the status code. On obtaining the status code do a look up on the HashMap to get the status message. So finally the output Key values from the reducer would be as follows
a)      Key : Customer Name
b)      Value : Status Message

Let’s just look at the source code

Mapper Class1: UserFileMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class UserFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
    //variables to process Consumer Details
    private String cellNumber,customerName,fileTag="CD~";
   
    /* map method that process ConsumerDetails.txt and frames the initial key value pairs
       Key(Text) – mobile number
       Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
     */
    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
       //taking one line/record at a time and parsing them into key value pairs
        String line = value.toString();
        String splitarray[] = line.split(",");
        cellNumber = splitarray[0].trim();
        customerName = splitarray[1].trim();
       
      //sending the key value pair out of mapper
        output.collect(new Text(cellNumber), new Text(fileTag+customerName));
     }
}

Mapper Class2:DeliverFileMapper.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class DeliveryFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
    //variables to process delivery report
    private String cellNumber,deliveryCode,fileTag="DR~";
   
   /* map method that process DeliveryReport.txt and frames the initial key value pairs
    Key(Text) – mobile number
    Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code*/

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
       //taking one line/record at a time and parsing them into key value pairs
        String line = value.toString();
        String splitarray[] = line.split(",");
        cellNumber = splitarray[0].trim();
        deliveryCode = splitarray[1].trim();
       
        //sending the key value pair out of mapper
        output.collect(new Text(cellNumber), new Text(fileTag+deliveryCode));
     }
}

Reducer Class:SmsReducer.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SmsReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
      
       //Variables to aid the join process
       private String customerName,deliveryReport;
       /*Map to store Delivery Codes and Messages
       Key being the status code and vale being the status message*/
       private static Map<String,String> DeliveryCodesMap= new HashMap<String,String>();
      
       public void configure(JobConf job)
       {
              //To load the Delivery Codes and Messages into a hash map
              loadDeliveryStatusCodes();
             
       }


       public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
        while (values.hasNext())
        {
             String currValue = values.next().toString();
             String valueSplitted[] = currValue.split("~");
             /*identifying the record source that corresponds to a cell number
             and parses the values accordingly*/
             if(valueSplitted[0].equals("CD"))
             {
               customerName=valueSplitted[1].trim();
             }
             else if(valueSplitted[0].equals("DR"))
             {
              //getting the delivery code and using the same to obtain the Message
               deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
             }
        }
        
        //pump final output to file
        if(customerName!=null && deliveryReport!=null)
        {
               output.collect(new Text(customerName), new Text(deliveryReport));
        }
        else if(customerName==null)
               output.collect(new Text("customerName"), new Text(deliveryReport));
        else if(deliveryReport==null)
               output.collect(new Text(customerName), new Text("deliveryReport"));
        
    }
      
      
       //To load the Delivery Codes and Messages into a hash map
    private void loadDeliveryStatusCodes()
    {
       String strRead;
       try {
              //read file from Distributed Cache
                     BufferedReader reader = new BufferedReader(new FileReader("DeliveryStatusCodes.txt"));
                     while ((strRead=reader.readLine() ) != null)
                     {
                           String splitarray[] = strRead.split(",");
                           //parse record and load into HahMap
                           DeliveryCodesMap.put(splitarray[0].trim(), splitarray[1].trim());
                          
                     }
              }
              catch (FileNotFoundException e) {
              e.printStackTrace();
              }catch( IOException e ) {
                       e.printStackTrace();
                }
             
       }
}

Driver Class: SmsDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.lib.MultipleInputs;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SmsDriver extends Configured implements Tool
{
       public int run(String[] args) throws Exception {

              //get the configuration parameters and assigns a job name
              JobConf conf = new JobConf(getConf(), SmsDriver.class);
              conf.setJobName("SMS Reports");

              //setting key value types for mapper and reducer outputs
              conf.setOutputKeyClass(Text.class);
              conf.setOutputValueClass(Text.class);

              //specifying the custom reducer class
              conf.setReducerClass(SmsReducer.class);

              //Specifying the input directories(@ runtime) and Mappers independently for inputs from multiple sources
              MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
              MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, DeliveryFileMapper.class);
             
              //Specifying the output directory @ runtime
              FileOutputFormat.setOutputPath(conf, new Path(args[2]));

              JobClient.runJob(conf);
              return 0;
       }

       public static void main(String[] args) throws Exception {
              int res = ToolRunner.run(new Configuration(), new SmsDriver(),
                           args);
              System.exit(res);
       }
}


Let us go in for a small code walk through, I’m not going in for each and every line of code as it is commented and is bit self-explanatory. The few points to keep in mind

1.       The only difference in the code is that we are using MultipleInputFormat instead of FileInputFormat. This is necessary as we use two mappers and we need the output of the two mappers to be processed by a single reducer
2.       When we normally execute our map reduce with the hadoop jar command the last two arguments on the command line represent the input and output dir in hdfs. But here instead of two we’d have three input locations and one output location.
3.       The second key thing to be noted here is that in place of input locations don’t provide the full path with file names. Provide the input directories instead. Load the two files in two separate directories and provide the corresponding paths to mappers.
4.       Since my driver is getting the arguments from command line, the order of arguments is also very critical. Make sure that the input directories always point to their corresponding mappers itself.

You can run the above example with the following command on CLI as
hadoop jar /home/bejoys/samples/ smsMarketing.jar com.bejoy.samples.smsmarketing.SmsDriver  -files /home/bejoys/samples/ DeliveryStatusCodes.txt /userdata/bejoys/samples/sms/consumerdata /userdata/bejoys/samples/sms/deliveryinformation /userdata/bejoys/samples/sms/output  

Note:
                     i.            Since the join the happening on reduce, it is termed as a reduce side join.
                   ii.            This is a very basic approach to implement joins in map reduce and is for those who have a basic knowledge on map reduce programming. You can implement it in more sophisticated manner in mapreduce frame work using DataJoin Mappers and Reducers with TaggedMap Output Types.

But if it is a join then I’d strongly recommend you to go in with Pig or Hive as both of these are highly optimized for implementing joins. Also you can eliminate the coding effort you need to put in. It is not exaggerating if I say I can implement the same in a single step using hive. Let us just check it out

Using Hive
1.       Load the data into 3 hive tables
2.       Perform join using hive QL

Creating Hive tables to store the files
CREATE TABLE customer_details (cellNumber String,consumerName String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/ConsumerDetails.txt' INTO TABLE customer_details;

CREATE TABLE delivery_report (cellNumber String,statusCode int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryReport.txt' INTO TABLE delivery_report;

CREATE TABLE status_codes (statusCode int,statusMessage String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryStatusCodes.txt' INTO TABLE status_codes;

Hive Query to execute Join operation on data sets

Select cd.consumerName,sc.statusMessage FROM customer_details cd
JOIN delivery_report dr ON (cd.cellNumber = dr.cellNumber) JOIN
status_codes sc ON(dr.statusCode = sc.statusCode);

You can optimize the hive Query again for performance boosting. Refer Optimizing Joins in hive