The basic format that you need is something like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool | |
def mapFunction(value): | |
# work on values | |
return a_tuple | |
def partition(tuples): | |
# marshalll tuples into a dictionary of lists of tuples | |
return mapping | |
def reduceFunction(mapping): | |
# work on the mapping | |
return results | |
if __name__ == '__main__': | |
pool = Pool(processes=N) | |
tuples = pool.map(mapFunction, your_iterable_data) | |
mapping = partition(tuples) | |
results = pool.map(reduceFunction, mapping.items()) | |
# do something wit the results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool | |
import string | |
import random | |
def mapFunction(letter): | |
return (letter, 1) | |
def partition(tuples): | |
mapping = {} | |
for t in tuples: | |
try: | |
mapping[t[0]].append (t) | |
except KeyError: | |
mapping[t[0]] = [t] | |
return mapping | |
def reduceFunction(mapping): | |
return (mapping[0], sum(pair[1] for pair in mapping[1])) | |
if __name__ == '__main__': | |
pool = Pool(processes=10) | |
letters = [random.choice(string.uppercase) for i in range(30)] | |
print "letters: ", letters, "\n" | |
tuples = pool.map(mapFunction, letters) | |
print "tuples: ", tuples, "\n" | |
mapping = partition(tuples) | |
print "mapping: ", mapping, "\n" | |
results = pool.map(reduceFunction, mapping.items()) | |
results.sort(key=lambda r: r[1]) | |
results.reverse() | |
for r in results: | |
print "The letter %s appeared %d times" % r |
The output will look something like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
letters: ['D', 'Q', 'Q', 'J', 'W', 'D', 'Q', 'N', 'U', 'B', 'T', 'A', 'S', 'Z', 'M', 'V', 'E', 'N', 'Y', 'N', 'S', 'Q', 'Y', 'M', 'Y', 'M', 'C', 'C', 'L', 'Y'] | |
tuples: [('D', 1), ('Q', 1), ('Q', 1), ('J', 1), ('W', 1), ('D', 1), ('Q', 1), ('N', 1), ('U', 1), ('B', 1), ('T', 1), ('A', 1), ('S', 1), ('Z', 1), ('M', 1), ('V', 1), ('E', 1), ('N', 1), ('Y', 1), ('N', 1), ('S', 1), ('Q', 1), ('Y', 1), ('M', 1), ('Y', 1), ('M', 1), ('C', 1), ('C', 1), ('L', 1), ('Y', 1)] | |
mapping: {'A': [('A', 1)], 'C': [('C', 1), ('C', 1)], 'B': [('B', 1)], 'E': [('E', 1)], 'D': [('D', 1), ('D', 1)], 'J': [('J', 1)], 'M': [('M', 1), ('M', 1), ('M', 1)], 'L': [('L', 1)], 'N': [('N', 1), ('N', 1), ('N', 1)], 'Q': [('Q', 1), ('Q', 1), ('Q', 1), ('Q', 1)], 'S': [('S', 1), ('S', 1)], 'U': [('U', 1)], 'T': [('T', 1)], 'W': [('W', 1)], 'V': [('V', 1)], 'Y': [('Y', 1), ('Y', 1), ('Y', 1), ('Y', 1)], 'Z': [('Z', 1)]} | |
The letter Y appeared 4 times | |
The letter Q appeared 4 times | |
The letter N appeared 3 times | |
The letter M appeared 3 times | |
The letter S appeared 2 times | |
The letter D appeared 2 times | |
The letter C appeared 2 times | |
The letter Z appeared 1 times | |
The letter V appeared 1 times | |
The letter W appeared 1 times | |
The letter T appeared 1 times | |
The letter U appeared 1 times | |
The letter L appeared 1 times | |
The letter J appeared 1 times | |
The letter E appeared 1 times | |
The letter B appeared 1 times | |
The letter A appeared 1 times |
- I got most of this from the following blog: http://goo.gl/nW8iA .
- Docs are here: http://docs.python.org/library/multiprocessing.html
Hey there!
ReplyDeleteWhile your multiprocessing usage is nice, it seems to me that you missed the point of MapReduce in your example because most of the work actually happens in partition().
A more classic example would look something like this: https://gist.github.com/1406554
There you have proper partitioning BEFORE the mapping happens, solving sub-problems in the map phase and finally reducing sub-results to a single aggregate result.
Is there a way to reduce the part of the partition part?
ReplyDeleteEasy jumpstart to python Mapreduce : http://pymapreduce.com
ReplyDelete