Skip to content

Commit

Permalink
Merge pull request #478 from dieu/apanasenko/writebale_seq_with_combined
Browse files Browse the repository at this point in the history
Added CombinedWritableSequenceFile to cascading2
  • Loading branch information
isnotinvain authored Mar 8, 2017
2 parents 87efd8c + 6200548 commit 6b2e450
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class CombinedSequenceFile extends SequenceFile {
public CombinedSequenceFile(Fields fields) { super(fields); }

// We can allow overriding the compression settings for just this scheme here
private void updateJobConfForLocalSettings(JobConf conf) {
private static void updateJobConfForLocalSettings(JobConf conf) {
String localSetCompressionEnabled = conf.get(COMPRESS_ENABLE);
if(localSetCompressionEnabled != null) {
conf.set(MR_COMPRESS_ENABLE, localSetCompressionEnabled);
Expand All @@ -56,13 +56,7 @@ private void updateJobConfForLocalSettings(JobConf conf) {
}
}

@Override
public void sourceConfInit(
FlowProcess<JobConf> flowProcess,
Tap<JobConf, RecordReader, OutputCollector> tap,
JobConf conf ) {
super.sourceConfInit(flowProcess, tap, conf);

public static void sourceConfInit(JobConf conf) {
updateJobConfForLocalSettings(conf);

// Since the EB combiner works over the mapreduce API while Cascading is on the mapred API,
Expand All @@ -73,6 +67,16 @@ public void sourceConfInit(
DelegateCombineFileInputFormat.setDelegateInputFormat(conf, MapReduceInputFormatWrapper.class);
}

@Override
public void sourceConfInit(
FlowProcess<JobConf> flowProcess,
Tap<JobConf, RecordReader, OutputCollector> tap,
JobConf conf ) {
super.sourceConfInit(flowProcess, tap, conf);

sourceConfInit(conf);
}

@Override
public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.twitter.elephantbird.cascading2.scheme;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

import cascading.flow.FlowProcess;
import cascading.scheme.hadoop.WritableSequenceFile;
import cascading.tap.Tap;
import cascading.tuple.Fields;

public class CombinedWritableSequenceFile extends WritableSequenceFile {
public CombinedWritableSequenceFile(Fields fields, Class<? extends Writable> valueType) {
super(fields, valueType);
}

public CombinedWritableSequenceFile(Fields fields, Class<? extends Writable> keyType, Class<? extends Writable> valueType) {
super(fields, keyType, valueType);
}

@Override
public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
super.sourceConfInit(flowProcess, tap, conf);

CombinedSequenceFile.sourceConfInit(conf);
}
}

0 comments on commit 6b2e450

Please sign in to comment.