Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error in tests #2

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
669 changes: 669 additions & 0 deletions patches/assignment-1_SCHROTT.patch

Large diffs are not rendered by default.

744 changes: 744 additions & 0 deletions patches/assignment-2_SCHROTT.patch

Large diffs are not rendered by default.

532 changes: 532 additions & 0 deletions patches/assignment-3_SCHROTT.patch

Large diffs are not rendered by default.

493 changes: 493 additions & 0 deletions patches/assignment-4_SCHROTT.patch

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.7.0-incubating</version>
<version>0.10-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -76,7 +76,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.7.0-incubating</version>
<version>0.10-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -88,7 +88,21 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.10-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,151 @@

import de.tuberlin.dima.aim3.HadoopJob;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;

public class AverageTemperaturePerMonth extends HadoopJob {

public static final String PARAM_MIN_QUALITY = "minQuality";

@Override
public int run(String[] args) throws Exception {
Map<String,String> parsedArgs = parseArgs(args);

Path inputPath = new Path(parsedArgs.get("--input"));
Path outputPath = new Path(parsedArgs.get("--output"));

double minimumQuality = Double.parseDouble(parsedArgs.get("--minimumQuality"));
float minimumQuality = Float.parseFloat(parsedArgs.get("--minimumQuality"));

Job avgTmpMonth = prepareJob(inputPath, outputPath, TextInputFormat.class,
AvgTempMonthMapper.class, YearMonthKey.class, IntWritable.class,
AvgTempMonthReducer.class, YearMonthKey.class, IntWritable.class,
TextOutputFormat.class);

//IMPLEMENT ME
avgTmpMonth.getConfiguration().setFloat(PARAM_MIN_QUALITY, minimumQuality);

avgTmpMonth.waitForCompletion(true);

return 0;
}

static class AvgTempMonthMapper extends Mapper<Object, Text, YearMonthKey, IntWritable> {

@Override
protected void map(Object key, Text line, Context ctx) throws IOException, InterruptedException {

// get the config
float minimumQuality = ctx.getConfiguration().getFloat(PARAM_MIN_QUALITY, 0.0f);

// split the input line by tab
String[] values = line.toString().split("\\t");

if(Float.valueOf(values[3]) >= minimumQuality) {
// filter lines with quality > minimum quality
// map the temperatures to the corresponding year and month
ctx.write(new YearMonthKey(Integer.valueOf(values[0]), Integer.valueOf(values[1])),
new IntWritable(Integer.valueOf(values[2])));

}
}
}

static class AvgTempMonthReducer extends Reducer<YearMonthKey, IntWritable, NullWritable, Text> {

@Override
protected void reduce(YearMonthKey key, Iterable<IntWritable> values, Context ctx)
throws IOException, InterruptedException {

// sum up the measured temperatures and count the amount
int sum = 0;
int cntr = 0;

for (IntWritable val : values) {
sum += val.get();
cntr++;
}

// write output (year|tab|month|tab|avgtempearatur)
ctx.write(NullWritable.get(), new Text(key.getYear() + "\t" + key.getMonth() + "\t" + (double)sum / (double)cntr));
}
}

/**
* WritableComparable to reduce the dataset containing more then
* one measurement per year and month to year and month
*/
static class YearMonthKey implements WritableComparable<YearMonthKey> {

private int year;
private int month;

public YearMonthKey() {
}

public YearMonthKey(int year, int month) {
this.set(year, month);
}

public void set(int year, int month) {
this.year = year;
this.month = month;
}

public int getYear() {
return this.year;
}

public int getMonth() {
return this.month;
}

public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.month = in.readInt();
}

public void write(DataOutput out) throws IOException {
out.writeInt(this.year);
out.writeInt(this.month);
}

public boolean equals(Object o) {
if (!(o instanceof YearMonthKey)) {
return false;
} else {
YearMonthKey other = (YearMonthKey) o;
return this.year == other.year && this.month == other.month;
}
}

public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + this.year;
result = prime * result + (this.month ^ (this.month >>> 32));
return result;
}

public int compareTo(YearMonthKey o) {
int thisValue = 100 * this.year + this.month;
int thatValue = 100 * o.year + o.month;
return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);
}

public String toString() {
return this.year + " " + this.month;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,26 @@
package de.tuberlin.dima.aim3.assignment1;

import de.tuberlin.dima.aim3.HadoopJob;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

public class BookAndAuthorBroadcastJoin extends HadoopJob {

public static final String FILE_NAME_AUTHORS = "authors.tsv";

@Override
public int run(String[] args) throws Exception {

Expand All @@ -34,9 +48,71 @@ public int run(String[] args) throws Exception {
Path books = new Path(parsedArgs.get("--books"));
Path outputPath = new Path(parsedArgs.get("--output"));

//IMPLEMENT ME
Job booksAndAuthors = prepareJob(books,
outputPath, TextInputFormat.class,
BooksAndAuthorsBroadcastJoinMapper.class, Text.class, Text.class,
TextOutputFormat.class);

DistributedCache.addFileToClassPath(authors, booksAndAuthors.getConfiguration(),
FileSystem.get(booksAndAuthors.getConfiguration()));

booksAndAuthors.waitForCompletion(true);

return 0;
}

static class BooksAndAuthorsBroadcastJoinMapper extends Mapper<Object, Text, NullWritable, Text> {

// hash map to hold smaller dataset in memory
private Hashtable<Integer, String> htAuthors;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);

// step 1: load the smaller dataset into memory (here: authors)
htAuthors = new Hashtable<Integer, String>();
BufferedReader br = null ;
String lineAuthor = null;
Path[] authorsArray = DistributedCache.getLocalCacheFiles(context.getConfiguration());
Path authors = null;
for(Path a: authorsArray) {
if(a.getName().equals(FILE_NAME_AUTHORS)) {
authors = authorsArray[0];
}
}
if(authors == null) {
throw new InterruptedException("Join file authors.tsv not in distributed cache.");
}

try {
br = new BufferedReader(new FileReader(authors.getParent() + "/" + authors.getName()));
while ((lineAuthor = br.readLine()) != null) {
String record[] = lineAuthor.split("\t");
if(record.length == 2) {
//Insert into hash table for easy lookup
htAuthors.put(Integer.valueOf(record[0]), record[1]);
}
}
}
catch (Exception ex) {
ex.printStackTrace();
}
}

@Override
protected void map(Object key, Text lineBook, Context ctx) throws IOException, InterruptedException {

// step 2: read the bigger dataset using map reduce
String[] values = lineBook.toString().split("\\t");
if(values.length == 3) {
// step 3: map smaller dataset to bigger dataset by join key
String author = htAuthors.get(Integer.valueOf(values[0]));
if(author != null || !author.isEmpty()) {
// write output (author|tab|nameofbook|tab|yearofpublication)
ctx.write(NullWritable.get(), new Text(author + "\t" + values[2] + "\t" + values[1]));
}
}
}
}
}
Loading