Skip to content

Commit

Permalink
replace ColumnarMetatdata with a dynamicproto
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghu Angadi committed Sep 27, 2014
1 parent 80d034d commit 285fdcd
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 29 deletions.
9 changes: 0 additions & 9 deletions core/src/main/protobuf/misc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,3 @@ message CountedMap {
optional string key = 1;
optional int64 value = 2;
};


// Metadata stored with columnar storage like Hive's RCFile
//
message ColumnarMetadata {
optional string classname = 1; // FYI only, not used.
repeated int32 field_id = 2; // list of field ids stored
optional int32 nesting = 3 [default = 0]; // when nesting is used
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Message.Builder;
import com.twitter.data.proto.Misc.ColumnarMetadata;
import com.twitter.elephantbird.util.ColumnarMetadata;
import com.twitter.elephantbird.util.RCFileUtil;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.twitter.data.proto.Misc.ColumnarMetadata;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import com.twitter.elephantbird.util.ColumnarMetadata;
import com.twitter.elephantbird.util.RCFileUtil;
import com.twitter.elephantbird.thrift.TStructDescriptor;
import com.twitter.elephantbird.thrift.TStructDescriptor.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;
import java.util.List;

import com.google.common.collect.Lists;
import com.twitter.elephantbird.util.ColumnarMetadata;
import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.ByteStream;
Expand All @@ -17,7 +19,6 @@
import com.google.protobuf.Message;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Message.Builder;
import com.twitter.data.proto.Misc.ColumnarMetadata;
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
Expand Down Expand Up @@ -71,21 +72,22 @@ private void init() {
}

protected ColumnarMetadata makeColumnarMetadata() {
ColumnarMetadata.Builder metadata = ColumnarMetadata.newBuilder();

metadata.setClassname(typeRef.getRawClass().getName());
List<Integer> fieldIds = Lists.newArrayList();

for(FieldDescriptor fd : msgFields) {
metadata.addFieldId(fd.getNumber());
fieldIds.add(fd.getNumber());
}
metadata.addFieldId(-1); // -1 for unknown fields
fieldIds.add(-1); // -1 for unknown fields

return metadata.build();
return ColumnarMetadata.newInstance(typeRef.getRawClass().getName(), fieldIds);
}

private class ProtobufWriter extends RCFileOutputFormat.Writer {

ProtobufWriter(TaskAttemptContext job) throws IOException {
super(RCFileProtobufOutputFormat.this, job, Protobufs.toText(makeColumnarMetadata()));
super(RCFileProtobufOutputFormat.this, job,
Protobufs.toText(makeColumnarMetadata().getMessage()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Map;

import com.google.common.collect.Lists;
import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.ByteStream;
Expand All @@ -25,10 +26,10 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.twitter.data.proto.Misc.ColumnarMetadata;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import com.twitter.elephantbird.thrift.TStructDescriptor;
import com.twitter.elephantbird.thrift.TStructDescriptor.Field;
import com.twitter.elephantbird.util.ColumnarMetadata;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.ThriftUtils;
import com.twitter.elephantbird.util.TypeRef;
Expand Down Expand Up @@ -79,15 +80,14 @@ private void init() {
}

protected ColumnarMetadata makeColumnarMetadata() {
ColumnarMetadata.Builder metadata = ColumnarMetadata.newBuilder();

metadata.setClassname(typeRef.getRawClass().getName());
List<Integer> fieldIds = Lists.newArrayList();
for(Field fd : tDesc.getFields()) {
metadata.addFieldId(fd.getFieldId());
fieldIds.add((int)fd.getFieldId());
}
metadata.addFieldId(-1); // -1 for unknown fields
fieldIds.add(-1); // -1 for unknown fields

return metadata.build();
return ColumnarMetadata.newInstance(typeRef.getRawClass().getName(), fieldIds);
}

private class ThriftWriter extends RCFileOutputFormat.Writer {
Expand All @@ -102,7 +102,8 @@ private class ThriftWriter extends RCFileOutputFormat.Writer {
private TBinaryProtocol skipProto;

ThriftWriter(TaskAttemptContext job) throws IOException {
super(RCFileThriftOutputFormat.this, job, Protobufs.toText(makeColumnarMetadata()));
super(RCFileThriftOutputFormat.this, job,
Protobufs.toText(makeColumnarMetadata().getMessage()));
}

@Override @SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.twitter.elephantbird.util;

import java.util.List;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;

/**
* Metadata stored with columnar storage like Hive's RCFile <p>
*
* This is a {@link DynamicMessage} equivalent of following protobuf : <pre>
*
* message ColumnarMetadata {
* optional string classname = 1; // FYI only, not used.
* repeated int32 fieldId = 2; // list of field ids stored
* optional int32 nesting = 3 [default = 0]; // when nesting is used
* };
* </pre>
*
*/
public class ColumnarMetadata {


private final Message message;

// use newInstance() to create a new message
private ColumnarMetadata(Message message) {
this.message = message;
}

public Message getMessage() {
return message;
}

public String getClassname() {
return (String) message.getField(classnameDesc);
}

public int getNesting() {
return (Integer) message.getField(nestingDesc);
}

public int getFieldId(int index) {
return getFieldIdList().get(index);
}

public List<Integer> getFieldIdList() {
return (List<Integer>) message.getField(fieldIdDesc);
}

public static ColumnarMetadata newInstance(String classname, List<Integer> fieldIdList) {
return newInstance(classname, fieldIdList, 0);
}

public static ColumnarMetadata newInstance(String classname, List<Integer> fieldIdList, int nesting) {
return new ColumnarMetadata(
DynamicMessage.newBuilder(messageDescriptor)
.setField(classnameDesc, classname)
.setField(fieldIdDesc, fieldIdList)
.setField(nestingDesc, nesting)
.build());
}

public static ColumnarMetadata parseFrom(byte[] messageBuffer)
throws InvalidProtocolBufferException {
return new ColumnarMetadata(
DynamicMessage.newBuilder(messageDescriptor)
.mergeFrom(messageBuffer)
.build());
}

private static final Descriptors.Descriptor messageDescriptor;
private static final Descriptors.FieldDescriptor classnameDesc;
private static final Descriptors.FieldDescriptor fieldIdDesc;
private static final Descriptors.FieldDescriptor nestingDesc;

static {
// initialize messageDescriptor and the three field descriptors

DescriptorProtos.FieldDescriptorProto classname =
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("classname")
.setNumber(1)
.setType(Type.TYPE_STRING)
.setLabel(Label.LABEL_OPTIONAL)
.build();

DescriptorProtos.FieldDescriptorProto fieldId =
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("fieldId")
.setNumber(2)
.setType(Type.TYPE_INT32)
.setLabel(Label.LABEL_REPEATED)
.build();

DescriptorProtos.FieldDescriptorProto nesting =
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("nesting")
.setNumber(3)
.setType(Type.TYPE_INT32)
.setLabel(Label.LABEL_OPTIONAL)
.setDefaultValue("0")
.build();

try {
messageDescriptor = Protobufs.makeMessageDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("ColumnarMetadata")
.addField(classname)
.addField(fieldId)
.addField(nesting)
.build());
} catch (Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}

classnameDesc = messageDescriptor.findFieldByName("classname");
fieldIdDesc = messageDescriptor.findFieldByName("fieldId");
nestingDesc = messageDescriptor.findFieldByName("nesting");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.twitter.data.proto.Misc.ColumnarMetadata;

public class RCFileUtil {

Expand Down Expand Up @@ -70,8 +69,7 @@ public static ColumnarMetadata readMetadata(Configuration conf, Path rcfile)
throw new IOException("could not find ColumnarMetadata in " + rcfile);
}

return Protobufs.mergeFromText(ColumnarMetadata.newBuilder(),
metadata.get(metadataKey)).build();
return ColumnarMetadata.parseFrom(metadata.get(metadataKey).getBytes());
}

/**
Expand Down Expand Up @@ -139,7 +137,7 @@ public static ArrayList<Integer> findColumnsToRead(
"reading %d%s out of %d stored columns for %d required columns",
columnsToRead.size(),
(unknownFields.length() > 0 ? " (including unknowns column)" : ""),
storedInfo.getFieldIdCount(),
storedInfo.getFieldIdList().size(),
requiredFieldIds.size()));

return columnsToRead;
Expand Down

0 comments on commit 285fdcd

Please sign in to comment.