Skip to content

Commit

Permalink
remove support for CountedMap (pig map support for protobufs)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghu Angadi committed Sep 29, 2014
1 parent 285fdcd commit a462118
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.EnumValueDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.twitter.data.proto.Misc.CountedMap;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
Expand All @@ -36,17 +33,8 @@ public class ProtobufToPig {

private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();

public enum CoercionLevel { kNoCoercion, kAllowCoercionToPigMaps }
public ProtobufToPig() { }

private final CoercionLevel coercionLevel_;

public ProtobufToPig() {
this(CoercionLevel.kAllowCoercionToPigMaps);
}

public ProtobufToPig(CoercionLevel coercionLevel) {
coercionLevel_ = coercionLevel;
}
/**
* Turn a generic message into a Tuple. Individual fields that are enums
* are converted into their string equivalents. Fields that are not filled
Expand Down Expand Up @@ -121,24 +109,11 @@ protected Object messageToTuple(FieldDescriptor fieldDescriptor, Object fieldVal
// of the underlying datatype, which in this case is a nested message.
List<Message> messageList = (List<Message>) (fieldValue != null ? fieldValue : Lists.newArrayList());

// Since protobufs do not have a map type, we use CountedMap to fake it. Whenever the protobuf has a repeated CountedMap in it,
// we can force the type into a pig map type.
if (coercionLevel_ == CoercionLevel.kAllowCoercionToPigMaps &&
fieldDescriptor.getMessageType().getName().equals(CountedMap.getDescriptor().getName())) {
Map<Object, Long> map = Maps.newHashMap();
for (Message m : messageList) {
CountedMap cm = (CountedMap) m;
final Long curCount = map.get(cm.getKey());
map.put(cm.getKey(), (curCount == null ? 0L : curCount) + cm.getValue());
}
return map;
} else {
DataBag bag = new NonSpillableDataBag(messageList.size());
for (Message m : messageList) {
bag.add(new ProtobufTuple(m));
}
return bag;
DataBag bag = new NonSpillableDataBag(messageList.size());
for (Message m : messageList) {
bag.add(new ProtobufTuple(m));
}
return bag;
} else {
return new ProtobufTuple((Message)fieldValue);
}
Expand Down Expand Up @@ -234,13 +209,6 @@ public Schema toSchema(Descriptor msgDescriptor) {
private FieldSchema messageToFieldSchema(FieldDescriptor fieldDescriptor) throws FrontendException {
assert fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE : "messageToFieldSchema called with field of type " + fieldDescriptor.getType();

// Since protobufs do not have a map type, we use CountedMap to fake it. Whenever the protobuf has a repeated CountedMap in it,
// we can force the type into a pig map type.
if (coercionLevel_ == CoercionLevel.kAllowCoercionToPigMaps &&
fieldDescriptor.getMessageType().getName().equals(CountedMap.getDescriptor().getName()) && fieldDescriptor.isRepeated()) {
return new FieldSchema(fieldDescriptor.getName(), null, DataType.MAP);
}

Schema innerSchema = toSchema(fieldDescriptor.getMessageType());

if (fieldDescriptor.isRepeated()) {
Expand Down Expand Up @@ -396,14 +364,6 @@ private StringBuffer toPigScriptInternal(Descriptor msgDescriptor, int numTabs)
private StringBuffer messageToPigScript(FieldDescriptor fieldDescriptor, int numTabs, boolean isLast) throws FrontendException {
assert fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE : "messageToPigScript called with field of type " + fieldDescriptor.getType();

// Since protobufs do not have a map type, we use CountedMap to fake it. Whenever the protobuf has a repeated CountedMap in it,
// we force the type into a pig map type.
if (coercionLevel_ == CoercionLevel.kAllowCoercionToPigMaps &&
fieldDescriptor.getMessageType().getName().equals(CountedMap.getDescriptor().getName()) && fieldDescriptor.isRepeated()) {
return new StringBuffer().append(tabs(numTabs)).append(fieldDescriptor.getName())
.append(": map[]").append(isLast ? "" : ",").append("\n");
}

if (fieldDescriptor.isRepeated()) {
return new StringBuffer().append(tabs(numTabs)).append(fieldDescriptor.getName()).append(": bag {").append("\n")
.append(tabs(numTabs + 1)).append(fieldDescriptor.getName()).append("_tuple: tuple (").append("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
* optional int32 nesting = 3 [default = 0]; // when nesting is used
* };
* </pre>
*
*/
public class ColumnarMetadata {


private final Message message;

// use newInstance() to create a new message
Expand Down

0 comments on commit a462118

Please sign in to comment.