Skip to content

Commit

Permalink
changed upset to support $set and $inc
Browse files Browse the repository at this point in the history
  • Loading branch information
leonlee committed Jun 19, 2013
1 parent e480f20 commit 966e5b0
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 7 deletions.
18 changes: 14 additions & 4 deletions src/main/java/org/riderzen/flume/sink/MongoSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
* Time: 下午3:31
*/
public class MongoSink extends AbstractSink implements Configurable {
public static final String OP_SET = "$set";
private static Logger logger = LoggerFactory.getLogger(MongoSink.class);

private static DateTimeParser[] parsers = {
Expand Down Expand Up @@ -54,6 +53,8 @@ public class MongoSink extends AbstractSink implements Configurable {
public static final String TIMESTAMP_FIELD = "timestampField";
public static final String OPERATION = "op";
public static final String PK = "_id";
public static final String OP_INC = "$inc";
public static final String OP_SET = "$set";

public static final boolean DEFAULT_AUTHENTICATION_ENABLED = false;
public static final String DEFAULT_HOST = "localhost";
Expand Down Expand Up @@ -263,7 +264,17 @@ private void doUpsert(Map<String, List<DBObject>> eventMap) {
}
DBCollection collection = db.getCollection(collectionName);
for (DBObject doc : docs) {
DBObject query = BasicDBObjectBuilder.start().add(PK, doc.removeField(PK)).get();
if (logger.isDebugEnabled()) {
logger.debug("doc: {}", doc);
}
DBObject query = BasicDBObjectBuilder.start().add(PK, doc.get(PK)).get();

if (doc.keySet().contains(OP_INC) || doc.keySet().contains(OP_SET)) {
doc = BasicDBObjectBuilder.start()
.add(OP_INC, doc.get(OP_INC))
.add(OP_SET, doc.get(OP_SET)).get();
}

CommandResult result = collection.update(query, doc, true, false, WriteConcern.NORMAL).getLastError();
if (result.ok()) {
String errorMessage = result.getErrorMessage();
Expand Down Expand Up @@ -342,8 +353,7 @@ private List<DBObject> addEventToList(List<DBObject> documents, Event event) {
return documents;
}
}

if (timestampField != null) {
if (!event.getHeaders().containsKey(OPERATION) && timestampField != null) {
Date timestamp;
if (eventJson.containsField(timestampField)) {
try {
Expand Down
121 changes: 119 additions & 2 deletions src/test/java/org/riderzen/flume/sink/MongoSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import com.mongodb.*;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.flume.*;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.json.simple.JSONObject;
import org.testng.annotations.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.UnknownHostException;
import java.text.ParseException;
Expand Down Expand Up @@ -418,4 +419,120 @@ public void timestampExistingFieldTest() throws EventDeliveryException, ParseExc
}

}

@Test(groups = "dev")
public void upsertTest() throws EventDeliveryException, ParseException {
ctx.put(MongoSink.MODEL, MongoSink.CollectionModel.dynamic.name());
String tsField = "createdOn";
ctx.put(MongoSink.TIMESTAMP_FIELD, tsField);
MongoSink sink = new MongoSink();
Configurables.configure(sink, ctx);

sink.setChannel(channel);
sink.start();

JSONObject msg = new JSONObject();
msg.put("age", 11);
msg.put("birthday", new Date().getTime());
String dateText = "2013-02-19T14:20:53+08:00";
msg.put(tsField, dateText);

Transaction tx;

for (int i = 0; i < 10; i++) {
tx = channel.getTransaction();
tx.begin();
msg.put("_id", 1111 + i);
msg.put("name", "test" + i);
JSONObject header = new JSONObject();
header.put(MongoSink.COLLECTION, "my_events");
header.put(MongoSink.DB_NAME, "dynamic_db");

Event e = EventBuilder.withBody(msg.toJSONString().getBytes(), header);
channel.put(e);
tx.commit();
tx.close();
}
sink.process();
sink.stop();

for (int i = 0; i < 10; i++) {
tx = channel.getTransaction();
tx.begin();
msg.put("_id", 1111 + i);
msg.put("name", "test" + i * 10);
JSONObject header = new JSONObject();
header.put(MongoSink.COLLECTION, "my_events");
header.put(MongoSink.DB_NAME, "dynamic_db");
header.put(MongoSink.OPERATION, MongoSink.OP_UPSERT);

Event e = EventBuilder.withBody(msg.toJSONString().getBytes(), header);
channel.put(e);
tx.commit();
tx.close();
}
sink.process();
sink.stop();

msg.put(tsField, MongoSink.dateTimeFormatter.parseDateTime(dateText).toDate());
for (int i = 0; i < 10; i++) {
System.out.println("i = " + i);

DB db = mongo.getDB("dynamic_db");
DBCollection collection = db.getCollection("my_events");
DBCursor cursor = collection.find(BasicDBObjectBuilder.start().add("_id", 1111 + i).get());
assertTrue(cursor.hasNext());
DBObject dbObject = cursor.next();
assertNotNull(dbObject);
assertEquals(dbObject.get("name"), "test" + i * 10);
assertEquals(dbObject.get("age"), msg.get("age"));
assertEquals(dbObject.get("birthday"), msg.get("birthday"));
assertTrue(dbObject.get(tsField) instanceof Date);
System.out.println("ts = " + dbObject.get(tsField));
System.out.println("_id = " + dbObject.get("_id"));
}

}

@Test
public static void sandbox() throws EventDeliveryException {
JSONObject msg = new JSONObject();
JSONObject set = new JSONObject();
set.put("pid", "274");
set.put("fac", "missin-do");
msg.put("$set", set);

JSONObject inc = new JSONObject();
inc.put("sum", 1);

msg.put("$inc", inc);
msg.put("_id", "111111111111111111111111111");
msg.put("pid", "111111111111111111111111111");

ctx.put(MongoSink.MODEL, MongoSink.CollectionModel.dynamic.name());
String tsField = "createdOn";
ctx.put(MongoSink.TIMESTAMP_FIELD, tsField);
MongoSink sink = new MongoSink();
Configurables.configure(sink, ctx);

sink.setChannel(channel);
sink.start();

Transaction tx;

tx = channel.getTransaction();
tx.begin();

JSONObject header = new JSONObject();
header.put(MongoSink.COLLECTION, "my_eventsjj");
header.put(MongoSink.DB_NAME, "dynamic_dbjj");
header.put(MongoSink.OPERATION, MongoSink.OP_UPSERT);

Event e = EventBuilder.withBody(msg.toJSONString().getBytes(), header);
channel.put(e);
tx.commit();
tx.close();
sink.process();
sink.stop();
}
}
2 changes: 1 addition & 1 deletion src/test/resources/testng.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<class name="test.sample.ParameterTest"/>
</classes>
</test>-->
<test name="all" thread-count="1" parallel="classes">
<test name="all" thread-count="1" parallel="classes" preserve-order="true">
<groups>
<run>
<include name="unit"/>
Expand Down

0 comments on commit 966e5b0

Please sign in to comment.