Friday, February 10, 2012
Enable Multiple threads in a mapper aka MultithreadedMapper
As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads. (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner
Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.
You can set the number of threads within a mapper in MultiThreadedMapper by
mapred.map.multithreadedrunner.threads = n
Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.