Cloud9: Primer on the Partitioner, or computing conditional probabilities

by Jimmy Lin

The result of applying a mapper to a set of records is a bunch of key-value pairs, which need to be shuffled to the correct reduce task. The Hadoop Partitioner is responsible for splitting the key space; a reduce task is created for each partition. HashPartitioner is the default Partitioner, which simply takes the hash value of the key and mods it by the number of reduce tasks. There are many cases in which you want a smarter partitioning of the key space---one common usage scenario is to compute conditional probabilities. This primer provides exactly such an example that will illustrate the use of a Partitioner.

The relevant example we'll discuss is the DemoWordCondProb class in the Cloud9 library, which in turn builds on DemoWordCountTuple. In brief, DemoWordCountTuple generates tuples of the following form:

...
(admirable, 0)    9
(admirable, 1)    6
(admiral, 0)      2
(admiral, 1)      4
(admiration, 0)  10
(admiration, 1)   6
(admire, 0)       5
(admire, 1)       3
(admired, 0)     12
(admired, 1)      7
...

The first field of the key tuple contains a token, the second field indicates whether it was found on a even-length or odd-length line. The value is the count of the tuple occurrences in the collection.

DemoWordCondProb generates tuples of the following form:

...
(admirable, *)   15.0
(admirable, 0)   0.6
(admirable, 1)   0.4
(admiral, *)     6.0
(admiral, 0)     0.33333334
(admiral, 1)     0.6666667
(admiration, *)  16.0
(admiration, 0)  0.625
(admiration, 1)  0.375
(admire, *)      8.0
(admire, 0)      0.625
(admire, 1)      0.375
(admired, *)     19.0
(admired, 0)     0.6315789
(admired, 1)     0.36842105
...

In other words, converting the counts into conditional probabilities, i.e., p(EvenOrOdd|token).

How is this accomplished? These are the things we need to do:

  1. Make sure all key-value pairs with the same token are sent to the same reducer. For example, (admirable,*), (admirable,0), (admirable,1) all need to be sent to the same reduce task.
  2. We also want to make sure (admirable,*) is passed to the reducer before either (admirable,0) or (admirable,1).
  3. If the two above conditions are met, then it becomes straightforward to compute the conditional probabilities: when we finishing processing (admirable,*), we store its value in a HashMap (i.e., the reducer holds state). When we finish processing (admirable,0) or (admirable,1), we divide the count by the value of (admirable,*).

Here's how you would implement the above bullets:

  • Write a custom Partitioner that only pays attention to the token. That is, the Partitioner uses the hashcode of the token, not the entire key, to determine which reduce task the key gets sent to. This takes care of #1 above.
  • Write a custom WritableComparator that implements the sort order described in #2 above. (In actuality, you don't need to do this, since the natural ordering of the Tuple places all special symbols first.
  • Write a mapper that does exactly what is stated in #3 above.

Putting everything together, use the setPartitionerClass and setOutputKeyComparatorClass methods of JobConf to correctly configure the MapReduce job. See the code of DemoWordCondProb for more details.

Potential gotcha: Be very careful on your use of a Combiner class! In the basic word count demo, the reducer can also be used as the combiner, since all it did was compute the sum (i.e., the operation is both associative and commutative). This is no longer the case for the reducer in DemoWordCondProb, since there's a division at the end to compute the conditional probability. As a result, this must happen at the reduce stage, because only then are you guaranteed that all values with the same key have been collected (so, use the identity Combiner).

Back to main page

This page, first created: 20 Nov 2007; last updated: Creative Commons: Attribution-Noncommercial-Share Alike 3.0 United States Valid XHTML 1.0! Valid CSS!