From 285fdcd1309af65dd465a75cd1544eaeb3885c3c Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Fri, 26 Sep 2014 18:30:05 -0700 Subject: [PATCH] replace ColumnarMetatdata with a dynamicproto --- core/src/main/protobuf/misc.proto | 9 -- .../input/RCFileProtobufInputFormat.java | 2 +- .../input/RCFileThriftInputFormat.java | 2 +- .../output/RCFileProtobufOutputFormat.java | 16 ++- .../output/RCFileThriftOutputFormat.java | 15 ++- .../elephantbird/util/ColumnarMetadata.java | 126 ++++++++++++++++++ .../twitter/elephantbird/util/RCFileUtil.java | 6 +- 7 files changed, 147 insertions(+), 29 deletions(-) create mode 100644 rcfile/src/main/java/com/twitter/elephantbird/util/ColumnarMetadata.java diff --git a/core/src/main/protobuf/misc.proto b/core/src/main/protobuf/misc.proto index 412bebd21..7d9930c42 100644 --- a/core/src/main/protobuf/misc.proto +++ b/core/src/main/protobuf/misc.proto @@ -8,12 +8,3 @@ message CountedMap { optional string key = 1; optional int64 value = 2; }; - - -// Metadata stored with columnar storage like Hive's RCFile -// -message ColumnarMetadata { - optional string classname = 1; // FYI only, not used. - repeated int32 field_id = 2; // list of field ids stored - optional int32 nesting = 3 [default = 0]; // when nesting is used -} \ No newline at end of file diff --git a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileProtobufInputFormat.java b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileProtobufInputFormat.java index fc117135b..8f09aa2e2 100644 --- a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileProtobufInputFormat.java +++ b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileProtobufInputFormat.java @@ -28,7 +28,7 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Message.Builder; -import com.twitter.data.proto.Misc.ColumnarMetadata; +import com.twitter.elephantbird.util.ColumnarMetadata; import com.twitter.elephantbird.util.RCFileUtil; import com.twitter.elephantbird.util.Protobufs; import com.twitter.elephantbird.util.TypeRef; diff --git a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileThriftInputFormat.java b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileThriftInputFormat.java index 795570f15..8ee70585b 100644 --- a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileThriftInputFormat.java +++ b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileThriftInputFormat.java @@ -25,8 +25,8 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.twitter.data.proto.Misc.ColumnarMetadata; import com.twitter.elephantbird.mapreduce.io.ThriftWritable; +import com.twitter.elephantbird.util.ColumnarMetadata; import com.twitter.elephantbird.util.RCFileUtil; import com.twitter.elephantbird.thrift.TStructDescriptor; import com.twitter.elephantbird.thrift.TStructDescriptor.Field; diff --git a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileProtobufOutputFormat.java b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileProtobufOutputFormat.java index d94baaa58..e37f3f95b 100644 --- a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileProtobufOutputFormat.java +++ b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileProtobufOutputFormat.java @@ -3,6 +3,8 @@ import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; +import com.twitter.elephantbird.util.ColumnarMetadata; import com.twitter.elephantbird.util.HadoopCompat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.ByteStream; @@ -17,7 +19,6 @@ import com.google.protobuf.Message; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Message.Builder; -import com.twitter.data.proto.Misc.ColumnarMetadata; import com.twitter.elephantbird.mapreduce.io.ProtobufWritable; import com.twitter.elephantbird.util.Protobufs; import com.twitter.elephantbird.util.TypeRef; @@ -71,21 +72,22 @@ private void init() { } protected ColumnarMetadata makeColumnarMetadata() { - ColumnarMetadata.Builder metadata = ColumnarMetadata.newBuilder(); - metadata.setClassname(typeRef.getRawClass().getName()); + List fieldIds = Lists.newArrayList(); + for(FieldDescriptor fd : msgFields) { - metadata.addFieldId(fd.getNumber()); + fieldIds.add(fd.getNumber()); } - metadata.addFieldId(-1); // -1 for unknown fields + fieldIds.add(-1); // -1 for unknown fields - return metadata.build(); + return ColumnarMetadata.newInstance(typeRef.getRawClass().getName(), fieldIds); } private class ProtobufWriter extends RCFileOutputFormat.Writer { ProtobufWriter(TaskAttemptContext job) throws IOException { - super(RCFileProtobufOutputFormat.this, job, Protobufs.toText(makeColumnarMetadata())); + super(RCFileProtobufOutputFormat.this, job, + Protobufs.toText(makeColumnarMetadata().getMessage())); } @Override diff --git a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileThriftOutputFormat.java b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileThriftOutputFormat.java index 2af694d14..95637f409 100644 --- a/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileThriftOutputFormat.java +++ b/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileThriftOutputFormat.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; import com.twitter.elephantbird.util.HadoopCompat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.ByteStream; @@ -25,10 +26,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.twitter.data.proto.Misc.ColumnarMetadata; import com.twitter.elephantbird.mapreduce.io.ThriftWritable; import com.twitter.elephantbird.thrift.TStructDescriptor; import com.twitter.elephantbird.thrift.TStructDescriptor.Field; +import com.twitter.elephantbird.util.ColumnarMetadata; import com.twitter.elephantbird.util.Protobufs; import com.twitter.elephantbird.util.ThriftUtils; import com.twitter.elephantbird.util.TypeRef; @@ -79,15 +80,14 @@ private void init() { } protected ColumnarMetadata makeColumnarMetadata() { - ColumnarMetadata.Builder metadata = ColumnarMetadata.newBuilder(); - metadata.setClassname(typeRef.getRawClass().getName()); + List fieldIds = Lists.newArrayList(); for(Field fd : tDesc.getFields()) { - metadata.addFieldId(fd.getFieldId()); + fieldIds.add((int)fd.getFieldId()); } - metadata.addFieldId(-1); // -1 for unknown fields + fieldIds.add(-1); // -1 for unknown fields - return metadata.build(); + return ColumnarMetadata.newInstance(typeRef.getRawClass().getName(), fieldIds); } private class ThriftWriter extends RCFileOutputFormat.Writer { @@ -102,7 +102,8 @@ private class ThriftWriter extends RCFileOutputFormat.Writer { private TBinaryProtocol skipProto; ThriftWriter(TaskAttemptContext job) throws IOException { - super(RCFileThriftOutputFormat.this, job, Protobufs.toText(makeColumnarMetadata())); + super(RCFileThriftOutputFormat.this, job, + Protobufs.toText(makeColumnarMetadata().getMessage())); } @Override @SuppressWarnings("unchecked") diff --git a/rcfile/src/main/java/com/twitter/elephantbird/util/ColumnarMetadata.java b/rcfile/src/main/java/com/twitter/elephantbird/util/ColumnarMetadata.java new file mode 100644 index 000000000..79713b4f2 --- /dev/null +++ b/rcfile/src/main/java/com/twitter/elephantbird/util/ColumnarMetadata.java @@ -0,0 +1,126 @@ +package com.twitter.elephantbird.util; + +import java.util.List; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +/** + * Metadata stored with columnar storage like Hive's RCFile

+ * + * This is a {@link DynamicMessage} equivalent of following protobuf :

+ *
+ * message ColumnarMetadata {
+ *   optional string   classname   = 1;                // FYI only, not used.
+ *   repeated int32    fieldId     = 2;                // list of field ids stored
+ *   optional int32    nesting     = 3 [default = 0];  // when nesting is used
+ * };
+ * 
+ * + */ +public class ColumnarMetadata { + + + private final Message message; + + // use newInstance() to create a new message + private ColumnarMetadata(Message message) { + this.message = message; + } + + public Message getMessage() { + return message; + } + + public String getClassname() { + return (String) message.getField(classnameDesc); + } + + public int getNesting() { + return (Integer) message.getField(nestingDesc); + } + + public int getFieldId(int index) { + return getFieldIdList().get(index); + } + + public List getFieldIdList() { + return (List) message.getField(fieldIdDesc); + } + + public static ColumnarMetadata newInstance(String classname, List fieldIdList) { + return newInstance(classname, fieldIdList, 0); + } + + public static ColumnarMetadata newInstance(String classname, List fieldIdList, int nesting) { + return new ColumnarMetadata( + DynamicMessage.newBuilder(messageDescriptor) + .setField(classnameDesc, classname) + .setField(fieldIdDesc, fieldIdList) + .setField(nestingDesc, nesting) + .build()); + } + + public static ColumnarMetadata parseFrom(byte[] messageBuffer) + throws InvalidProtocolBufferException { + return new ColumnarMetadata( + DynamicMessage.newBuilder(messageDescriptor) + .mergeFrom(messageBuffer) + .build()); + } + + private static final Descriptors.Descriptor messageDescriptor; + private static final Descriptors.FieldDescriptor classnameDesc; + private static final Descriptors.FieldDescriptor fieldIdDesc; + private static final Descriptors.FieldDescriptor nestingDesc; + + static { + // initialize messageDescriptor and the three field descriptors + + DescriptorProtos.FieldDescriptorProto classname = + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("classname") + .setNumber(1) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build(); + + DescriptorProtos.FieldDescriptorProto fieldId = + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("fieldId") + .setNumber(2) + .setType(Type.TYPE_INT32) + .setLabel(Label.LABEL_REPEATED) + .build(); + + DescriptorProtos.FieldDescriptorProto nesting = + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("nesting") + .setNumber(3) + .setType(Type.TYPE_INT32) + .setLabel(Label.LABEL_OPTIONAL) + .setDefaultValue("0") + .build(); + + try { + messageDescriptor = Protobufs.makeMessageDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("ColumnarMetadata") + .addField(classname) + .addField(fieldId) + .addField(nesting) + .build()); + } catch (Descriptors.DescriptorValidationException e) { + throw new RuntimeException(e); + } + + classnameDesc = messageDescriptor.findFieldByName("classname"); + fieldIdDesc = messageDescriptor.findFieldByName("fieldId"); + nestingDesc = messageDescriptor.findFieldByName("nesting"); + } +} diff --git a/rcfile/src/main/java/com/twitter/elephantbird/util/RCFileUtil.java b/rcfile/src/main/java/com/twitter/elephantbird/util/RCFileUtil.java index 24a15e833..c3c946136 100644 --- a/rcfile/src/main/java/com/twitter/elephantbird/util/RCFileUtil.java +++ b/rcfile/src/main/java/com/twitter/elephantbird/util/RCFileUtil.java @@ -18,7 +18,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; -import com.twitter.data.proto.Misc.ColumnarMetadata; public class RCFileUtil { @@ -70,8 +69,7 @@ public static ColumnarMetadata readMetadata(Configuration conf, Path rcfile) throw new IOException("could not find ColumnarMetadata in " + rcfile); } - return Protobufs.mergeFromText(ColumnarMetadata.newBuilder(), - metadata.get(metadataKey)).build(); + return ColumnarMetadata.parseFrom(metadata.get(metadataKey).getBytes()); } /** @@ -139,7 +137,7 @@ public static ArrayList findColumnsToRead( "reading %d%s out of %d stored columns for %d required columns", columnsToRead.size(), (unknownFields.length() > 0 ? " (including unknowns column)" : ""), - storedInfo.getFieldIdCount(), + storedInfo.getFieldIdList().size(), requiredFieldIds.size())); return columnsToRead;