The result of applying a mapper to a set of records is a bunch of
key-value pairs, which need to be shuffled to the correct reduce task.
The Hadoop Partitioner
is responsible for splitting the key space; a reduce task is created
for each partition. HashPartitioner
is the default Partitioner, which simply takes the hash value of the
key and mods it by the number of reduce tasks. There are many cases
in which you want a smarter partitioning of the key space---one common
usage scenario is to compute conditional probabilities. This primer
provides exactly such an example that will illustrate the use of a
Partitioner.
The relevant example we'll discuss is the
DemoWordCondProb class in the
Cloud9 library, which in turn builds on
DemoWordCountTuple. In brief,
DemoWordCountTuple generates tuples of the following
form:
... (admirable, 0) 9 (admirable, 1) 6 (admiral, 0) 2 (admiral, 1) 4 (admiration, 0) 10 (admiration, 1) 6 (admire, 0) 5 (admire, 1) 3 (admired, 0) 12 (admired, 1) 7 ...
The first field of the key tuple contains a token, the second field indicates whether it was found on a even-length or odd-length line. The value is the count of the tuple occurrences in the collection.
DemoWordCondProb generates tuples of the following
form:
... (admirable, *) 15.0 (admirable, 0) 0.6 (admirable, 1) 0.4 (admiral, *) 6.0 (admiral, 0) 0.33333334 (admiral, 1) 0.6666667 (admiration, *) 16.0 (admiration, 0) 0.625 (admiration, 1) 0.375 (admire, *) 8.0 (admire, 0) 0.625 (admire, 1) 0.375 (admired, *) 19.0 (admired, 0) 0.6315789 (admired, 1) 0.36842105 ...
In other words, converting the counts into conditional
probabilities, i.e., p(EvenOrOdd|token).
How is this accomplished? These are the things we need to do:
- Make sure all key-value pairs with the same token are sent to
the same reducer. For example,
(admirable,*),(admirable,0),(admirable,1)all need to be sent to the same reduce task. - We also want to make sure
(admirable,*)is passed to the reducer before either(admirable,0)or(admirable,1). - If the two above conditions are met, then it becomes
straightforward to compute the conditional probabilities: when we
finishing processing
(admirable,*), we store its value in a HashMap (i.e., the reducer holds state). When we finish processing(admirable,0)or(admirable,1), we divide the count by the value of(admirable,*).
Here's how you would implement the above bullets:
- Write a custom
Partitionerthat only pays attention to the token. That is, the Partitioner uses the hashcode of the token, not the entire key, to determine which reduce task the key gets sent to. This takes care of #1 above. - Write a
custom
WritableComparatorthat implements the sort order described in #2 above. (In actuality, you don't need to do this, since the natural ordering of theTupleplaces all special symbols first. - Write a mapper that does exactly what is stated in #3 above.
Putting everything together, use the
setPartitionerClass and
setOutputKeyComparatorClass methods of
JobConf to correctly configure the MapReduce job. See
the code of DemoWordCondProb for more details.
Potential gotcha: Be very careful on your use of a
Combiner class! In the basic word count demo, the
reducer can also be used as the combiner, since all it did was compute
the sum (i.e., the operation is both associative and commutative).
This is no longer the case for the reducer in
DemoWordCondProb, since there's a division at the end to
compute the conditional probability. As a result, this must
happen at the reduce stage, because only then are you guaranteed that
all values with the same key have been collected (so, use the identity
Combiner).