diff --git a/CH15_Big data and MapReduce/Mapper.py b/CH15_Big data and MapReduce/Mapper.py new file mode 100644 index 0000000..0dd47cd --- /dev/null +++ b/CH15_Big data and MapReduce/Mapper.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +#coding=utf-8 + +import sys +from numpy import mat, mean, power + +def read_input(file): + for line in file: + yield line.rstrip() +input = read_input(sys.stdin) +input = [float(line) for line in input] +#print(input,'=====') +numInputs = len(input) +input = mat(input) +sqInput = power(input, 2) + +print ("%d\t%f\t%f" % (numInputs, mean(input), mean(sqInput)) ) # calc mean of columns +#print ( sys.stderr, "report: still alive" ) \ No newline at end of file diff --git a/CH15_Big data and MapReduce/Reducer.py b/CH15_Big data and MapReduce/Reducer.py new file mode 100644 index 0000000..7fdeb12 --- /dev/null +++ b/CH15_Big data and MapReduce/Reducer.py @@ -0,0 +1,22 @@ +import sys +from numpy import mat, mean, power + +def read_input(file): + for line in file: + yield line.rstrip() +input = read_input(sys.stdin) +mapperOut = [line.split('\t') for line in input] +print(mapperOut) +cumVal = 0.0 +cumSumSq = 0.0 +cumN = 0.0 +for instance in mapperOut: + nj = float(instance[0]) + cumN += nj + cumVal += nj * float(instance[1]) + cumSumSq += nj * float(instance[2]) +mean = cumVal / cumN +var = (cumSumSq/cumN-mean*mean) + +print ("%d\t%f\t%f" % (cumN, mean, var) ) +print ( sys.stderr, "report: still alive" ) \ No newline at end of file diff --git a/CH15_Big data and MapReduce/inputFile.txt b/CH15_Big data and MapReduce/inputFile.txt new file mode 100644 index 0000000..73a10c1 --- /dev/null +++ b/CH15_Big data and MapReduce/inputFile.txt @@ -0,0 +1,100 @@ +0.970413 +0.901817 +0.828698 +0.197744 +0.466887 +0.962147 +0.187294 +0.388509 +0.243889 +0.115732 +0.616292 +0.713436 +0.761446 +0.944123 +0.200903 +0.547166 +0.800028 +0.848790 +0.001641 +0.058010 +0.859900 +0.009178 +0.736598 +0.683586 +0.142515 +0.212120 +0.752769 +0.546184 +0.652227 +0.583803 +0.812863 +0.036862 +0.075076 +0.257536 +0.431278 +0.600214 +0.985564 +0.055846 +0.905295 +0.336262 +0.198738 +0.845815 +0.527989 +0.448650 +0.235313 +0.599749 +0.443923 +0.968723 +0.911076 +0.279338 +0.569492 +0.635985 +0.267532 +0.975018 +0.463698 +0.842340 +0.065590 +0.233049 +0.810390 +0.448260 +0.431967 +0.549648 +0.703612 +0.187974 +0.231709 +0.784160 +0.072283 +0.921053 +0.735468 +0.715923 +0.150431 +0.661089 +0.734955 +0.633709 +0.216102 +0.498474 +0.195620 +0.339548 +0.245314 +0.819848 +0.521242 +0.549276 +0.200906 +0.202525 +0.922876 +0.025404 +0.604032 +0.752204 +0.158860 +0.651622 +0.592898 +0.500392 +0.410614 +0.968388 +0.265918 +0.565707 +0.413670 +0.080507 +0.929978 +0.609755 diff --git a/CH15_Big data and MapReduce/mrMean.py b/CH15_Big data and MapReduce/mrMean.py new file mode 100644 index 0000000..5536187 --- /dev/null +++ b/CH15_Big data and MapReduce/mrMean.py @@ -0,0 +1,47 @@ +''' +Created on Feb 28, 2011 + +@author: Peter +''' +from mrjob.step import MRStep +from mrjob.job import MRJob + +class MRmean(MRJob): + def __init__(self, *args, **kwargs): + super(MRmean, self).__init__(*args, **kwargs) + self.inCount = 0 + self.inSum = 0 + self.inSqSum = 0 + + def map(self, key, val): #needs exactly 2 arguments + if False: yield + inVal = float(val) + self.inCount += 1 + self.inSum += inVal + self.inSqSum += inVal*inVal + + def map_final(self): + mn = self.inSum/self.inCount + mnSq = self.inSqSum/self.inCount + yield (1, [self.inCount, mn, mnSq]) + + def reduce(self, key, packedValues): + cumVal=0.0; cumSumSq=0.0; cumN=0.0 + for valArr in packedValues: #get values from streamed inputs + nj = float(valArr[0]) + cumN += nj + cumVal += nj*float(valArr[1]) + cumSumSq += nj*float(valArr[2]) + mean = cumVal/cumN + var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN + yield (mean, var) #emit mean and var + + def steps(self): + return ([MRStep(mapper=self.map, mapper_final=self.map_final,\ + reducer=self.reduce,), + MRStep(mapper=self.map, mapper_final=self.map_final,\ + reducer=self.reduce,) + ]) + +if __name__ == '__main__': + MRmean.run()