From 966e5b0aeadbd5ec7286efb404ec1d6adb149221 Mon Sep 17 00:00:00 2001 From: Leon Lee Date: Wed, 19 Jun 2013 16:21:45 +0800 Subject: [PATCH] changed upset to support $set and $inc --- .../org/riderzen/flume/sink/MongoSink.java | 18 ++- .../riderzen/flume/sink/MongoSinkTest.java | 121 +++++++++++++++++- src/test/resources/testng.xml | 2 +- 3 files changed, 134 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/riderzen/flume/sink/MongoSink.java b/src/main/java/org/riderzen/flume/sink/MongoSink.java index e28f722..1d468a5 100644 --- a/src/main/java/org/riderzen/flume/sink/MongoSink.java +++ b/src/main/java/org/riderzen/flume/sink/MongoSink.java @@ -23,7 +23,6 @@ * Time: 下午3:31 */ public class MongoSink extends AbstractSink implements Configurable { - public static final String OP_SET = "$set"; private static Logger logger = LoggerFactory.getLogger(MongoSink.class); private static DateTimeParser[] parsers = { @@ -54,6 +53,8 @@ public class MongoSink extends AbstractSink implements Configurable { public static final String TIMESTAMP_FIELD = "timestampField"; public static final String OPERATION = "op"; public static final String PK = "_id"; + public static final String OP_INC = "$inc"; + public static final String OP_SET = "$set"; public static final boolean DEFAULT_AUTHENTICATION_ENABLED = false; public static final String DEFAULT_HOST = "localhost"; @@ -263,7 +264,17 @@ private void doUpsert(Map> eventMap) { } DBCollection collection = db.getCollection(collectionName); for (DBObject doc : docs) { - DBObject query = BasicDBObjectBuilder.start().add(PK, doc.removeField(PK)).get(); + if (logger.isDebugEnabled()) { + logger.debug("doc: {}", doc); + } + DBObject query = BasicDBObjectBuilder.start().add(PK, doc.get(PK)).get(); + + if (doc.keySet().contains(OP_INC) || doc.keySet().contains(OP_SET)) { + doc = BasicDBObjectBuilder.start() + .add(OP_INC, doc.get(OP_INC)) + .add(OP_SET, doc.get(OP_SET)).get(); + } + CommandResult result = collection.update(query, doc, true, false, WriteConcern.NORMAL).getLastError(); if (result.ok()) { String errorMessage = result.getErrorMessage(); @@ -342,8 +353,7 @@ private List addEventToList(List documents, Event event) { return documents; } } - - if (timestampField != null) { + if (!event.getHeaders().containsKey(OPERATION) && timestampField != null) { Date timestamp; if (eventJson.containsField(timestampField)) { try { diff --git a/src/test/java/org/riderzen/flume/sink/MongoSinkTest.java b/src/test/java/org/riderzen/flume/sink/MongoSinkTest.java index 5ebc02f..374a153 100644 --- a/src/test/java/org/riderzen/flume/sink/MongoSinkTest.java +++ b/src/test/java/org/riderzen/flume/sink/MongoSinkTest.java @@ -2,13 +2,14 @@ import com.mongodb.*; import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.time.DateUtils; import org.apache.flume.*; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.json.simple.JSONObject; -import org.testng.annotations.*; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; import java.net.UnknownHostException; import java.text.ParseException; @@ -418,4 +419,120 @@ public void timestampExistingFieldTest() throws EventDeliveryException, ParseExc } } + + @Test(groups = "dev") + public void upsertTest() throws EventDeliveryException, ParseException { + ctx.put(MongoSink.MODEL, MongoSink.CollectionModel.dynamic.name()); + String tsField = "createdOn"; + ctx.put(MongoSink.TIMESTAMP_FIELD, tsField); + MongoSink sink = new MongoSink(); + Configurables.configure(sink, ctx); + + sink.setChannel(channel); + sink.start(); + + JSONObject msg = new JSONObject(); + msg.put("age", 11); + msg.put("birthday", new Date().getTime()); + String dateText = "2013-02-19T14:20:53+08:00"; + msg.put(tsField, dateText); + + Transaction tx; + + for (int i = 0; i < 10; i++) { + tx = channel.getTransaction(); + tx.begin(); + msg.put("_id", 1111 + i); + msg.put("name", "test" + i); + JSONObject header = new JSONObject(); + header.put(MongoSink.COLLECTION, "my_events"); + header.put(MongoSink.DB_NAME, "dynamic_db"); + + Event e = EventBuilder.withBody(msg.toJSONString().getBytes(), header); + channel.put(e); + tx.commit(); + tx.close(); + } + sink.process(); + sink.stop(); + + for (int i = 0; i < 10; i++) { + tx = channel.getTransaction(); + tx.begin(); + msg.put("_id", 1111 + i); + msg.put("name", "test" + i * 10); + JSONObject header = new JSONObject(); + header.put(MongoSink.COLLECTION, "my_events"); + header.put(MongoSink.DB_NAME, "dynamic_db"); + header.put(MongoSink.OPERATION, MongoSink.OP_UPSERT); + + Event e = EventBuilder.withBody(msg.toJSONString().getBytes(), header); + channel.put(e); + tx.commit(); + tx.close(); + } + sink.process(); + sink.stop(); + + msg.put(tsField, MongoSink.dateTimeFormatter.parseDateTime(dateText).toDate()); + for (int i = 0; i < 10; i++) { + System.out.println("i = " + i); + + DB db = mongo.getDB("dynamic_db"); + DBCollection collection = db.getCollection("my_events"); + DBCursor cursor = collection.find(BasicDBObjectBuilder.start().add("_id", 1111 + i).get()); + assertTrue(cursor.hasNext()); + DBObject dbObject = cursor.next(); + assertNotNull(dbObject); + assertEquals(dbObject.get("name"), "test" + i * 10); + assertEquals(dbObject.get("age"), msg.get("age")); + assertEquals(dbObject.get("birthday"), msg.get("birthday")); + assertTrue(dbObject.get(tsField) instanceof Date); + System.out.println("ts = " + dbObject.get(tsField)); + System.out.println("_id = " + dbObject.get("_id")); + } + + } + + @Test + public static void sandbox() throws EventDeliveryException { + JSONObject msg = new JSONObject(); + JSONObject set = new JSONObject(); + set.put("pid", "274"); + set.put("fac", "missin-do"); + msg.put("$set", set); + + JSONObject inc = new JSONObject(); + inc.put("sum", 1); + + msg.put("$inc", inc); + msg.put("_id", "111111111111111111111111111"); + msg.put("pid", "111111111111111111111111111"); + + ctx.put(MongoSink.MODEL, MongoSink.CollectionModel.dynamic.name()); + String tsField = "createdOn"; + ctx.put(MongoSink.TIMESTAMP_FIELD, tsField); + MongoSink sink = new MongoSink(); + Configurables.configure(sink, ctx); + + sink.setChannel(channel); + sink.start(); + + Transaction tx; + + tx = channel.getTransaction(); + tx.begin(); + + JSONObject header = new JSONObject(); + header.put(MongoSink.COLLECTION, "my_eventsjj"); + header.put(MongoSink.DB_NAME, "dynamic_dbjj"); + header.put(MongoSink.OPERATION, MongoSink.OP_UPSERT); + + Event e = EventBuilder.withBody(msg.toJSONString().getBytes(), header); + channel.put(e); + tx.commit(); + tx.close(); + sink.process(); + sink.stop(); + } } diff --git a/src/test/resources/testng.xml b/src/test/resources/testng.xml index c8060c5..3b93a36 100644 --- a/src/test/resources/testng.xml +++ b/src/test/resources/testng.xml @@ -13,7 +13,7 @@ --> - +