Skip to content

Commit

Permalink
Merge pull request #17 from saxonparker/async_transaction
Browse files Browse the repository at this point in the history
Add async commit to Transaction, and allow reject rules to be provided
  • Loading branch information
Saxon Parker committed Sep 16, 2019
2 parents dbffa0b + 3c59260 commit c57214f
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 16 deletions.
257 changes: 257 additions & 0 deletions patches/async-transaction.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
Adds methods to perform transactions asynchronously.

From: Saxon Parker <saxon@bestateless.com>


---
GNUmakefile | 4 +++
src/Transaction.cc | 48 ++++++++++++++++++++++++++++++--
src/Transaction.h | 21 ++++++++++++--
src/TransactionTest.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 142 insertions(+), 4 deletions(-)

diff --git a/GNUmakefile b/GNUmakefile
index 50d491f6..201131cc 100644
--- a/GNUmakefile
+++ b/GNUmakefile
@@ -431,7 +431,10 @@ INSTALL_INCLUDES := \
src/BoostIntrusive.h \
src/Buffer.h \
src/ClientException.h \
+ src/ClientTransactionManager.h \
src/CodeLocation.h \
+ src/Common.h \
+ src/Context.h \
src/CoordinatorClient.h \
src/CoordinatorRpcWrapper.h \
src/Crc32C.h \
@@ -467,6 +470,7 @@ INSTALL_INCLUDES := \
src/SpinLock.h \
src/Status.h \
src/TestLog.h \
+ src/Transaction.h \
src/Transport.h \
src/Tub.h \
src/WireFormat.h \
diff --git a/src/Transaction.cc b/src/Transaction.cc
index c12867e3..037339be 100644
--- a/src/Transaction.cc
+++ b/src/Transaction.cc
@@ -104,6 +104,44 @@ Transaction::commitAndSync()
return commit();
}

+void
+Transaction::commitAsync()
+{
+ if (!commitStarted) {
+ commitStarted = true;
+ ramcloud->transactionManager->startTransactionTask(taskPtr);
+ }
+}
+
+bool
+Transaction::commitReady()
+{
+ return taskPtr->allDecisionsSent();
+}
+
+bool
+Transaction::syncReady()
+{
+ return taskPtr->isReady();
+}
+
+void
+Transaction::poll()
+{
+ ramcloud->transactionManager->poll();
+}
+
+bool
+Transaction::result()
+{
+ if (expect_false(taskPtr->getDecision() ==
+ WireFormat::TxDecision::UNDECIDED)) {
+ ClientException::throwException(HERE, STATUS_INTERNAL_ERROR);
+ }
+
+ return (taskPtr->getDecision() == WireFormat::TxDecision::COMMIT);
+}
+
/**
* Read the current contents of an object as part of this transaction.
*
@@ -144,7 +182,7 @@ Transaction::read(uint64_t tableId, const void* key, uint16_t keyLength,
* Size in bytes of the key.
*/
void
-Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength)
+Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength, const RejectRules* rejectRules)
{
if (expect_false(commitStarted)) {
throw TxOpAfterCommit(HERE);
@@ -165,6 +203,9 @@ Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength)
}

entry->type = ClientTransactionTask::CacheEntry::REMOVE;
+ if (rejectRules != NULL) {
+ entry->rejectRules = *rejectRules;
+ }
}

/**
@@ -187,7 +228,7 @@ Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength)
*/
void
Transaction::write(uint64_t tableId, const void* key, uint16_t keyLength,
- const void* buf, uint32_t length)
+ const void* buf, uint32_t length, const RejectRules* rejectRules)
{
if (expect_false(commitStarted)) {
throw TxOpAfterCommit(HERE);
@@ -212,6 +253,9 @@ Transaction::write(uint64_t tableId, const void* key, uint16_t keyLength,
}

entry->type = ClientTransactionTask::CacheEntry::WRITE;
+ if (rejectRules != NULL) {
+ entry->rejectRules = *rejectRules;
+ }
}

/**
diff --git a/src/Transaction.h b/src/Transaction.h
index 4283fcf6..786bda79 100644
--- a/src/Transaction.h
+++ b/src/Transaction.h
@@ -59,13 +59,30 @@ class Transaction {
void sync();
bool commitAndSync();

+ /**
+ * This set of methods is used to issue a commit asynchronously.
+ * Sample code:
+ * Transaction t;
+ * ... add operations ...
+ * t.commitAsync();
+ * while (!t.commitReady())
+ * t.poll();
+ * if (!t.result())
+ * std::cerr << "transaction failed" << std::endl;
+ */
+ void commitAsync();
+ bool commitReady();
+ bool syncReady();
+ bool result();
+ void poll();
+
void read(uint64_t tableId, const void* key, uint16_t keyLength,
Buffer* value, bool* objectExists = NULL);

- void remove(uint64_t tableId, const void* key, uint16_t keyLength);
+ void remove(uint64_t tableId, const void* key, uint16_t keyLength, const RejectRules* rejectRules = NULL);

void write(uint64_t tableId, const void* key, uint16_t keyLength,
- const void* buf, uint32_t length);
+ const void* buf, uint32_t length, const RejectRules* rejectRules = NULL);

/**
* Encapsulates the state of a Transaction::read operation,
diff --git a/src/TransactionTest.cc b/src/TransactionTest.cc
index 724b7f23..ee36eabf 100644
--- a/src/TransactionTest.cc
+++ b/src/TransactionTest.cc
@@ -118,6 +118,35 @@ TEST_F(TransactionTest, commit_basic) {
EXPECT_TRUE(transaction->commitStarted);
}

+TEST_F(TransactionTest, commit_async_basic) {
+ ramcloud->write(tableId1, "0", 1, "abcdef", 6);
+
+ Buffer value;
+ transaction->read(tableId1, "0", 1, &value);
+ transaction->write(tableId1, "0", 1, "hello", 5);
+
+ EXPECT_FALSE(transaction->commitStarted);
+ EXPECT_EQ(ClientTransactionTask::INIT,
+ transaction->taskPtr.get()->state);
+
+ transaction->commitAsync();
+ while (!transaction->commitReady())
+ transaction->poll();
+
+ EXPECT_TRUE(transaction->result());
+
+ EXPECT_EQ(ClientTransactionTask::DONE,
+ transaction->taskPtr.get()->state);
+ EXPECT_TRUE(transaction->commitStarted);
+
+ // Check that commit does not wait for decision rpcs to return.
+ transaction->taskPtr.get()->state = ClientTransactionTask::DECISION;
+ EXPECT_TRUE(transaction->commit());
+ EXPECT_EQ(ClientTransactionTask::DECISION,
+ transaction->taskPtr.get()->state);
+ EXPECT_TRUE(transaction->commitStarted);
+}
+
TEST_F(TransactionTest, commit_abort) {
ramcloud->write(tableId1, "0", 1, "abcdef", 6);

@@ -343,6 +372,25 @@ TEST_F(TransactionTest, remove) {
EXPECT_EQ(entry, task->findCacheEntry(key));
}

+TEST_F(TransactionTest, remove_with_reject) {
+ ramcloud->write(tableId1, "0", 1, "abcdef", 6);
+ ramcloud->write(tableId1, "0", 1, "ghijkl", 6);
+
+ RejectRules rr;
+ rr.versionNeGiven = 1;
+ transaction->remove(tableId1, "0", 1, &rr);
+
+ EXPECT_FALSE(transaction->commitStarted);
+ EXPECT_EQ(ClientTransactionTask::INIT,
+ transaction->taskPtr.get()->state);
+
+ // The commit should fail because of the reject rules
+ EXPECT_FALSE(transaction->commit());
+ EXPECT_EQ(ClientTransactionTask::DONE,
+ transaction->taskPtr.get()->state);
+ EXPECT_TRUE(transaction->commitStarted);
+}
+
TEST_F(TransactionTest, remove_afterCommit) {
transaction->commitStarted = true;
EXPECT_THROW(transaction->remove(1, "test", 4),
@@ -382,6 +430,31 @@ TEST_F(TransactionTest, write) {
EXPECT_EQ(entry, task->findCacheEntry(key));
}

+TEST_F(TransactionTest, write_with_reject) {
+ ramcloud->write(tableId1, "0", 1, "abcdef", 6);
+ ramcloud->write(tableId1, "0", 1, "ghijkl", 6);
+
+ RejectRules rr;
+ rr.versionNeGiven = 1;
+ transaction->write(tableId1, "0", 1, "mnopqrs", 6, &rr);
+
+ EXPECT_FALSE(transaction->commitStarted);
+ EXPECT_EQ(ClientTransactionTask::INIT,
+ transaction->taskPtr.get()->state);
+
+ // The commit should fail because of the reject rules
+ EXPECT_FALSE(transaction->commit());
+ EXPECT_EQ(ClientTransactionTask::DONE,
+ transaction->taskPtr.get()->state);
+ EXPECT_TRUE(transaction->commitStarted);
+
+ Buffer value;
+ ramcloud->read(tableId1, "0", 1, &value);
+ EXPECT_EQ("ghijkl", string(reinterpret_cast<const char*>(
+ value.getRange(0, value.size())),
+ value.size()));
+}
+
TEST_F(TransactionTest, write_afterCommit) {
transaction->commitStarted = true;
EXPECT_THROW(transaction->write(1, "test", 4, "hello", 5),
24 changes: 12 additions & 12 deletions patches/dpdk-config.patch
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ to run sockets that are not socket zero.
5 files changed, 133 insertions(+), 34 deletions(-)

diff --git a/src/DpdkDriver.cc b/src/DpdkDriver.cc
index af56e8e5..3f5e9719 100644
index bd24aacc..1a63bd2f 100644
--- a/src/DpdkDriver.cc
+++ b/src/DpdkDriver.cc
@@ -89,6 +89,7 @@ DpdkDriver::DpdkDriver()
@@ -90,6 +90,7 @@ DpdkDriver::DpdkDriver()
, loopbackRing(NULL)
, hasHardwareFilter(true)
, bandwidthMbps(10000)
+ , vlanTag(0)
, fileLogger(NOTICE, "DPDK: ")
{
localMac.construct("01:23:45:67:89:ab");
@@ -113,7 +114,11 @@ DpdkDriver::DpdkDriver()
@@ -114,7 +115,11 @@ DpdkDriver::DpdkDriver()
* Selects which physical port to use for communication.
*/

Expand All @@ -49,15 +49,15 @@ index af56e8e5..3f5e9719 100644
: Driver(context)
, packetBufsUtilized(0)
, locatorString()
@@ -123,6 +128,7 @@ DpdkDriver::DpdkDriver(Context* context, int port)
@@ -124,6 +129,7 @@ DpdkDriver::DpdkDriver(Context* context, int port)
, loopbackRing(NULL)
, hasHardwareFilter(true) // Cleared later if not applicable
, bandwidthMbps(10000) // Default bandwidth = 10 gbs
+ , vlanTag(tag)
, fileLogger(NOTICE, "DPDK: ")
{
struct ether_addr mac;
@@ -130,6 +136,12 @@ DpdkDriver::DpdkDriver(Context* context, int port)
@@ -131,6 +137,12 @@ DpdkDriver::DpdkDriver(Context* context, int port)
struct rte_eth_conf portConf;
int ret;

Expand All @@ -70,7 +70,7 @@ index af56e8e5..3f5e9719 100644
portId = downCast<uint8_t>(port);

// Initialize the DPDK environment with some default parameters.
@@ -138,34 +150,43 @@ DpdkDriver::DpdkDriver(Context* context, int port)
@@ -139,34 +151,43 @@ DpdkDriver::DpdkDriver(Context* context, int port)
// This is a bug in DPDK as of 9/2016; if the bug gets fixed, then
// the --file-prefix argument can be removed.
LOG(NOTICE, "Using DPDK version %s", rte_version());
Expand Down Expand Up @@ -142,7 +142,7 @@ index af56e8e5..3f5e9719 100644
}

// ensure that DPDK was able to detect a compatible and available NIC
@@ -204,10 +225,29 @@ DpdkDriver::DpdkDriver(Context* context, int port)
@@ -205,10 +226,29 @@ DpdkDriver::DpdkDriver(Context* context, int port)
}
}

Expand Down Expand Up @@ -174,7 +174,7 @@ index af56e8e5..3f5e9719 100644

// set the MTU that the NIC port should support
ret = rte_eth_dev_set_mtu(portId, MAX_PAYLOAD_SIZE);
@@ -443,7 +483,7 @@ DpdkDriver::sendPacket(const Address* addr,
@@ -444,7 +484,7 @@ DpdkDriver::sendPacket(const Address* addr,
// Fill out the PCP field and the Ethernet frame type of the encapsulated
// frame (DEI and VLAN ID are not relevant and trivially set to 0).
struct vlan_hdr* vlanHdr = reinterpret_cast<struct vlan_hdr*>(p);
Expand Down Expand Up @@ -237,10 +237,10 @@ index f68d4e72..0b89f4db 100644
ProgramOptions::value<int32_t>(&options.portTimeout)->
default_value(-1), // Overriding to the initial value.
diff --git a/src/OptionParser.h b/src/OptionParser.h
index e241c726..df1a8591 100644
index 55e1108f..b715995b 100644
--- a/src/OptionParser.h
+++ b/src/OptionParser.h
@@ -53,6 +53,9 @@ class CommandLineOptions {
@@ -52,6 +52,9 @@ class CommandLineOptions {
, clusterName()
, configDir()
, dpdkPort(0)
Expand All @@ -250,7 +250,7 @@ index e241c726..df1a8591 100644
{
}

@@ -134,6 +137,33 @@ class CommandLineOptions {
@@ -133,6 +136,33 @@ class CommandLineOptions {
return dpdkPort;
}

Expand Down Expand Up @@ -284,7 +284,7 @@ index e241c726..df1a8591 100644
string coordinatorLocator; ///< See getCoordinatorLocator().
string localLocator; ///< See getLocalLocator().
string externalStorageLocator; ///< See getExternalStorageLocator().
@@ -143,6 +173,9 @@ class CommandLineOptions {
@@ -142,6 +172,9 @@ class CommandLineOptions {
string clusterName; ///< See getClusterName().
string configDir; ///< See getConfigDir().
int dpdkPort; ///< See getDpdkPort().
Expand Down
6 changes: 3 additions & 3 deletions patches/fix-dpdk-mbuf-release.patch
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ behvior when freeing DPDK buffers.
1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/DpdkDriver.cc b/src/DpdkDriver.cc
index 39fec055..af56e8e5 100644
index ef557d33..bd24aacc 100644
--- a/src/DpdkDriver.cc
+++ b/src/DpdkDriver.cc
@@ -71,7 +71,7 @@ namespace {
@@ -72,7 +72,7 @@ namespace {
// Short-hand to obtain the starting address of a DPDK rte_mbuf based on its
// payload address.
#define payload_to_mbuf(payload) reinterpret_cast<struct rte_mbuf*>( \
Expand All @@ -22,7 +22,7 @@ index 39fec055..af56e8e5 100644

constexpr uint16_t DpdkDriver::PRIORITY_TO_PCP[8];

@@ -351,8 +351,8 @@ DpdkDriver::receivePackets(uint32_t maxPackets,
@@ -352,8 +352,8 @@ DpdkDriver::receivePackets(uint32_t maxPackets,
for (uint32_t i = 0; i < totalPkts; i++) {
struct rte_mbuf* m = mPkts[i];
char* data = rte_pktmbuf_mtod(m, char*);
Expand Down
3 changes: 2 additions & 1 deletion patches/series
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# This series applies on GIT commit 3f4a0ef1f881184da116faa7a70e2a14681dc303
# This series applies on GIT commit e7089bc1332e311ad3f2451ebc681cc20fe29ff4
disable-log-before-main.patch
fix-dpdk-mbuf-release.patch
dpdk-config.patch
multiop-includes.patch
remove-java.patch
make-rc-unit-test.patch
flaky-test_fix.patch
async-transaction.patch

0 comments on commit c57214f

Please sign in to comment.