From 3c59260a318509ed941353ee083db8871f9b4dc9 Mon Sep 17 00:00:00 2001 From: Saxon Parker Date: Wed, 4 Sep 2019 15:14:11 -0600 Subject: [PATCH] Add async commit to Transaction, and allow reject rules to be provided --- patches/async-transaction.patch | 257 ++++++++++++++++++++++++++++ patches/dpdk-config.patch | 24 +-- patches/fix-dpdk-mbuf-release.patch | 6 +- patches/series | 3 +- 4 files changed, 274 insertions(+), 16 deletions(-) create mode 100644 patches/async-transaction.patch diff --git a/patches/async-transaction.patch b/patches/async-transaction.patch new file mode 100644 index 0000000..bde76d0 --- /dev/null +++ b/patches/async-transaction.patch @@ -0,0 +1,257 @@ +Adds methods to perform transactions asynchronously. + +From: Saxon Parker + + +--- + GNUmakefile | 4 +++ + src/Transaction.cc | 48 ++++++++++++++++++++++++++++++-- + src/Transaction.h | 21 ++++++++++++-- + src/TransactionTest.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++++ + 4 files changed, 142 insertions(+), 4 deletions(-) + +diff --git a/GNUmakefile b/GNUmakefile +index 50d491f6..201131cc 100644 +--- a/GNUmakefile ++++ b/GNUmakefile +@@ -431,7 +431,10 @@ INSTALL_INCLUDES := \ + src/BoostIntrusive.h \ + src/Buffer.h \ + src/ClientException.h \ ++ src/ClientTransactionManager.h \ + src/CodeLocation.h \ ++ src/Common.h \ ++ src/Context.h \ + src/CoordinatorClient.h \ + src/CoordinatorRpcWrapper.h \ + src/Crc32C.h \ +@@ -467,6 +470,7 @@ INSTALL_INCLUDES := \ + src/SpinLock.h \ + src/Status.h \ + src/TestLog.h \ ++ src/Transaction.h \ + src/Transport.h \ + src/Tub.h \ + src/WireFormat.h \ +diff --git a/src/Transaction.cc b/src/Transaction.cc +index c12867e3..037339be 100644 +--- a/src/Transaction.cc ++++ b/src/Transaction.cc +@@ -104,6 +104,44 @@ Transaction::commitAndSync() + return commit(); + } + ++void ++Transaction::commitAsync() ++{ ++ if (!commitStarted) { ++ commitStarted = true; ++ ramcloud->transactionManager->startTransactionTask(taskPtr); ++ } ++} ++ ++bool ++Transaction::commitReady() ++{ ++ return taskPtr->allDecisionsSent(); ++} ++ ++bool ++Transaction::syncReady() ++{ ++ return taskPtr->isReady(); ++} ++ ++void ++Transaction::poll() ++{ ++ ramcloud->transactionManager->poll(); ++} ++ ++bool ++Transaction::result() ++{ ++ if (expect_false(taskPtr->getDecision() == ++ WireFormat::TxDecision::UNDECIDED)) { ++ ClientException::throwException(HERE, STATUS_INTERNAL_ERROR); ++ } ++ ++ return (taskPtr->getDecision() == WireFormat::TxDecision::COMMIT); ++} ++ + /** + * Read the current contents of an object as part of this transaction. + * +@@ -144,7 +182,7 @@ Transaction::read(uint64_t tableId, const void* key, uint16_t keyLength, + * Size in bytes of the key. + */ + void +-Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength) ++Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength, const RejectRules* rejectRules) + { + if (expect_false(commitStarted)) { + throw TxOpAfterCommit(HERE); +@@ -165,6 +203,9 @@ Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength) + } + + entry->type = ClientTransactionTask::CacheEntry::REMOVE; ++ if (rejectRules != NULL) { ++ entry->rejectRules = *rejectRules; ++ } + } + + /** +@@ -187,7 +228,7 @@ Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength) + */ + void + Transaction::write(uint64_t tableId, const void* key, uint16_t keyLength, +- const void* buf, uint32_t length) ++ const void* buf, uint32_t length, const RejectRules* rejectRules) + { + if (expect_false(commitStarted)) { + throw TxOpAfterCommit(HERE); +@@ -212,6 +253,9 @@ Transaction::write(uint64_t tableId, const void* key, uint16_t keyLength, + } + + entry->type = ClientTransactionTask::CacheEntry::WRITE; ++ if (rejectRules != NULL) { ++ entry->rejectRules = *rejectRules; ++ } + } + + /** +diff --git a/src/Transaction.h b/src/Transaction.h +index 4283fcf6..786bda79 100644 +--- a/src/Transaction.h ++++ b/src/Transaction.h +@@ -59,13 +59,30 @@ class Transaction { + void sync(); + bool commitAndSync(); + ++ /** ++ * This set of methods is used to issue a commit asynchronously. ++ * Sample code: ++ * Transaction t; ++ * ... add operations ... ++ * t.commitAsync(); ++ * while (!t.commitReady()) ++ * t.poll(); ++ * if (!t.result()) ++ * std::cerr << "transaction failed" << std::endl; ++ */ ++ void commitAsync(); ++ bool commitReady(); ++ bool syncReady(); ++ bool result(); ++ void poll(); ++ + void read(uint64_t tableId, const void* key, uint16_t keyLength, + Buffer* value, bool* objectExists = NULL); + +- void remove(uint64_t tableId, const void* key, uint16_t keyLength); ++ void remove(uint64_t tableId, const void* key, uint16_t keyLength, const RejectRules* rejectRules = NULL); + + void write(uint64_t tableId, const void* key, uint16_t keyLength, +- const void* buf, uint32_t length); ++ const void* buf, uint32_t length, const RejectRules* rejectRules = NULL); + + /** + * Encapsulates the state of a Transaction::read operation, +diff --git a/src/TransactionTest.cc b/src/TransactionTest.cc +index 724b7f23..ee36eabf 100644 +--- a/src/TransactionTest.cc ++++ b/src/TransactionTest.cc +@@ -118,6 +118,35 @@ TEST_F(TransactionTest, commit_basic) { + EXPECT_TRUE(transaction->commitStarted); + } + ++TEST_F(TransactionTest, commit_async_basic) { ++ ramcloud->write(tableId1, "0", 1, "abcdef", 6); ++ ++ Buffer value; ++ transaction->read(tableId1, "0", 1, &value); ++ transaction->write(tableId1, "0", 1, "hello", 5); ++ ++ EXPECT_FALSE(transaction->commitStarted); ++ EXPECT_EQ(ClientTransactionTask::INIT, ++ transaction->taskPtr.get()->state); ++ ++ transaction->commitAsync(); ++ while (!transaction->commitReady()) ++ transaction->poll(); ++ ++ EXPECT_TRUE(transaction->result()); ++ ++ EXPECT_EQ(ClientTransactionTask::DONE, ++ transaction->taskPtr.get()->state); ++ EXPECT_TRUE(transaction->commitStarted); ++ ++ // Check that commit does not wait for decision rpcs to return. ++ transaction->taskPtr.get()->state = ClientTransactionTask::DECISION; ++ EXPECT_TRUE(transaction->commit()); ++ EXPECT_EQ(ClientTransactionTask::DECISION, ++ transaction->taskPtr.get()->state); ++ EXPECT_TRUE(transaction->commitStarted); ++} ++ + TEST_F(TransactionTest, commit_abort) { + ramcloud->write(tableId1, "0", 1, "abcdef", 6); + +@@ -343,6 +372,25 @@ TEST_F(TransactionTest, remove) { + EXPECT_EQ(entry, task->findCacheEntry(key)); + } + ++TEST_F(TransactionTest, remove_with_reject) { ++ ramcloud->write(tableId1, "0", 1, "abcdef", 6); ++ ramcloud->write(tableId1, "0", 1, "ghijkl", 6); ++ ++ RejectRules rr; ++ rr.versionNeGiven = 1; ++ transaction->remove(tableId1, "0", 1, &rr); ++ ++ EXPECT_FALSE(transaction->commitStarted); ++ EXPECT_EQ(ClientTransactionTask::INIT, ++ transaction->taskPtr.get()->state); ++ ++ // The commit should fail because of the reject rules ++ EXPECT_FALSE(transaction->commit()); ++ EXPECT_EQ(ClientTransactionTask::DONE, ++ transaction->taskPtr.get()->state); ++ EXPECT_TRUE(transaction->commitStarted); ++} ++ + TEST_F(TransactionTest, remove_afterCommit) { + transaction->commitStarted = true; + EXPECT_THROW(transaction->remove(1, "test", 4), +@@ -382,6 +430,31 @@ TEST_F(TransactionTest, write) { + EXPECT_EQ(entry, task->findCacheEntry(key)); + } + ++TEST_F(TransactionTest, write_with_reject) { ++ ramcloud->write(tableId1, "0", 1, "abcdef", 6); ++ ramcloud->write(tableId1, "0", 1, "ghijkl", 6); ++ ++ RejectRules rr; ++ rr.versionNeGiven = 1; ++ transaction->write(tableId1, "0", 1, "mnopqrs", 6, &rr); ++ ++ EXPECT_FALSE(transaction->commitStarted); ++ EXPECT_EQ(ClientTransactionTask::INIT, ++ transaction->taskPtr.get()->state); ++ ++ // The commit should fail because of the reject rules ++ EXPECT_FALSE(transaction->commit()); ++ EXPECT_EQ(ClientTransactionTask::DONE, ++ transaction->taskPtr.get()->state); ++ EXPECT_TRUE(transaction->commitStarted); ++ ++ Buffer value; ++ ramcloud->read(tableId1, "0", 1, &value); ++ EXPECT_EQ("ghijkl", string(reinterpret_cast( ++ value.getRange(0, value.size())), ++ value.size())); ++} ++ + TEST_F(TransactionTest, write_afterCommit) { + transaction->commitStarted = true; + EXPECT_THROW(transaction->write(1, "test", 4, "hello", 5), diff --git a/patches/dpdk-config.patch b/patches/dpdk-config.patch index b14692b..99e03e9 100644 --- a/patches/dpdk-config.patch +++ b/patches/dpdk-config.patch @@ -25,10 +25,10 @@ to run sockets that are not socket zero. 5 files changed, 133 insertions(+), 34 deletions(-) diff --git a/src/DpdkDriver.cc b/src/DpdkDriver.cc -index af56e8e5..3f5e9719 100644 +index bd24aacc..1a63bd2f 100644 --- a/src/DpdkDriver.cc +++ b/src/DpdkDriver.cc -@@ -89,6 +89,7 @@ DpdkDriver::DpdkDriver() +@@ -90,6 +90,7 @@ DpdkDriver::DpdkDriver() , loopbackRing(NULL) , hasHardwareFilter(true) , bandwidthMbps(10000) @@ -36,7 +36,7 @@ index af56e8e5..3f5e9719 100644 , fileLogger(NOTICE, "DPDK: ") { localMac.construct("01:23:45:67:89:ab"); -@@ -113,7 +114,11 @@ DpdkDriver::DpdkDriver() +@@ -114,7 +115,11 @@ DpdkDriver::DpdkDriver() * Selects which physical port to use for communication. */ @@ -49,7 +49,7 @@ index af56e8e5..3f5e9719 100644 : Driver(context) , packetBufsUtilized(0) , locatorString() -@@ -123,6 +128,7 @@ DpdkDriver::DpdkDriver(Context* context, int port) +@@ -124,6 +129,7 @@ DpdkDriver::DpdkDriver(Context* context, int port) , loopbackRing(NULL) , hasHardwareFilter(true) // Cleared later if not applicable , bandwidthMbps(10000) // Default bandwidth = 10 gbs @@ -57,7 +57,7 @@ index af56e8e5..3f5e9719 100644 , fileLogger(NOTICE, "DPDK: ") { struct ether_addr mac; -@@ -130,6 +136,12 @@ DpdkDriver::DpdkDriver(Context* context, int port) +@@ -131,6 +137,12 @@ DpdkDriver::DpdkDriver(Context* context, int port) struct rte_eth_conf portConf; int ret; @@ -70,7 +70,7 @@ index af56e8e5..3f5e9719 100644 portId = downCast(port); // Initialize the DPDK environment with some default parameters. -@@ -138,34 +150,43 @@ DpdkDriver::DpdkDriver(Context* context, int port) +@@ -139,34 +151,43 @@ DpdkDriver::DpdkDriver(Context* context, int port) // This is a bug in DPDK as of 9/2016; if the bug gets fixed, then // the --file-prefix argument can be removed. LOG(NOTICE, "Using DPDK version %s", rte_version()); @@ -142,7 +142,7 @@ index af56e8e5..3f5e9719 100644 } // ensure that DPDK was able to detect a compatible and available NIC -@@ -204,10 +225,29 @@ DpdkDriver::DpdkDriver(Context* context, int port) +@@ -205,10 +226,29 @@ DpdkDriver::DpdkDriver(Context* context, int port) } } @@ -174,7 +174,7 @@ index af56e8e5..3f5e9719 100644 // set the MTU that the NIC port should support ret = rte_eth_dev_set_mtu(portId, MAX_PAYLOAD_SIZE); -@@ -443,7 +483,7 @@ DpdkDriver::sendPacket(const Address* addr, +@@ -444,7 +484,7 @@ DpdkDriver::sendPacket(const Address* addr, // Fill out the PCP field and the Ethernet frame type of the encapsulated // frame (DEI and VLAN ID are not relevant and trivially set to 0). struct vlan_hdr* vlanHdr = reinterpret_cast(p); @@ -237,10 +237,10 @@ index f68d4e72..0b89f4db 100644 ProgramOptions::value(&options.portTimeout)-> default_value(-1), // Overriding to the initial value. diff --git a/src/OptionParser.h b/src/OptionParser.h -index e241c726..df1a8591 100644 +index 55e1108f..b715995b 100644 --- a/src/OptionParser.h +++ b/src/OptionParser.h -@@ -53,6 +53,9 @@ class CommandLineOptions { +@@ -52,6 +52,9 @@ class CommandLineOptions { , clusterName() , configDir() , dpdkPort(0) @@ -250,7 +250,7 @@ index e241c726..df1a8591 100644 { } -@@ -134,6 +137,33 @@ class CommandLineOptions { +@@ -133,6 +136,33 @@ class CommandLineOptions { return dpdkPort; } @@ -284,7 +284,7 @@ index e241c726..df1a8591 100644 string coordinatorLocator; ///< See getCoordinatorLocator(). string localLocator; ///< See getLocalLocator(). string externalStorageLocator; ///< See getExternalStorageLocator(). -@@ -143,6 +173,9 @@ class CommandLineOptions { +@@ -142,6 +172,9 @@ class CommandLineOptions { string clusterName; ///< See getClusterName(). string configDir; ///< See getConfigDir(). int dpdkPort; ///< See getDpdkPort(). diff --git a/patches/fix-dpdk-mbuf-release.patch b/patches/fix-dpdk-mbuf-release.patch index b66da2c..08726ce 100644 --- a/patches/fix-dpdk-mbuf-release.patch +++ b/patches/fix-dpdk-mbuf-release.patch @@ -10,10 +10,10 @@ behvior when freeing DPDK buffers. 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DpdkDriver.cc b/src/DpdkDriver.cc -index 39fec055..af56e8e5 100644 +index ef557d33..bd24aacc 100644 --- a/src/DpdkDriver.cc +++ b/src/DpdkDriver.cc -@@ -71,7 +71,7 @@ namespace { +@@ -72,7 +72,7 @@ namespace { // Short-hand to obtain the starting address of a DPDK rte_mbuf based on its // payload address. #define payload_to_mbuf(payload) reinterpret_cast( \ @@ -22,7 +22,7 @@ index 39fec055..af56e8e5 100644 constexpr uint16_t DpdkDriver::PRIORITY_TO_PCP[8]; -@@ -351,8 +351,8 @@ DpdkDriver::receivePackets(uint32_t maxPackets, +@@ -352,8 +352,8 @@ DpdkDriver::receivePackets(uint32_t maxPackets, for (uint32_t i = 0; i < totalPkts; i++) { struct rte_mbuf* m = mPkts[i]; char* data = rte_pktmbuf_mtod(m, char*); diff --git a/patches/series b/patches/series index 526cd38..dcba1ea 100644 --- a/patches/series +++ b/patches/series @@ -1,4 +1,4 @@ -# This series applies on GIT commit 3f4a0ef1f881184da116faa7a70e2a14681dc303 +# This series applies on GIT commit e7089bc1332e311ad3f2451ebc681cc20fe29ff4 disable-log-before-main.patch fix-dpdk-mbuf-release.patch dpdk-config.patch @@ -6,3 +6,4 @@ multiop-includes.patch remove-java.patch make-rc-unit-test.patch flaky-test_fix.patch +async-transaction.patch