diff --git a/engine/src/main/java/com/blobcity/db/data/RowCountStore.java b/engine/src/main/java/com/blobcity/db/data/RowCountStore.java
index e15c6d7..dcd39aa 100644
--- a/engine/src/main/java/com/blobcity/db/data/RowCountStore.java
+++ b/engine/src/main/java/com/blobcity/db/data/RowCountStore.java
@@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.expression.Operation;
import org.springframework.stereotype.Component;
/**
@@ -51,21 +52,15 @@ public class RowCountStore {
public long getRowCount(final String app, final String table) throws OperationException {
final String mapKey = getKey(app, table);
- /* Cache count value if not already cached */
- if (!map.containsKey(mapKey)) {
- loadCount(app, table);
- }
+ map.computeIfAbsent(mapKey, k -> {
+ try {
+ return new AtomicCounter(getCount(app, table));
+ } catch(OperationException ex) {
+ return new AtomicCounter(0);
+ }
+ });
- AtomicCounter counter = map.get(mapKey);
- try {
- counter.getSemaphore().acquireReadLock();
- return counter.getValue();
- } catch (InterruptedException ex) {
- LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex);
- throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR);
- } finally {
- counter.getSemaphore().releaseReadLock();
- }
+ return map.get(mapKey).getValue();
}
/**
@@ -79,19 +74,19 @@ public long getRowCount(final String app, final String table) throws OperationEx
public void incrementRowCount(final String app, final String table) throws OperationException {
final String mapKey = getKey(app, table);
- /* Cache count value if not already cached */
- if (!map.containsKey(mapKey)) {
- loadCount(app, table);
- }
+ map.computeIfAbsent(mapKey, k -> {
+ try {
+ return new AtomicCounter(getCount(app, table));
+ } catch(OperationException ex) {
+ return new AtomicCounter(0);
+ }
+ });
AtomicCounter counter = map.get(mapKey);
+ counter.getSemaphore().acquireWriteLock();
try {
- counter.getSemaphore().acquireWriteLock();
final long newValue = counter.incrementAndGet();
rowCountManager.writeCount(app, table, newValue);
- } catch (InterruptedException ex) {
- LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex);
- throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR);
} finally {
counter.getSemaphore().releaseWriteLock();
}
@@ -108,19 +103,19 @@ public void incrementRowCount(final String app, final String table) throws Opera
public void decrementRowCount(final String app, final String table) throws OperationException {
final String mapKey = getKey(app, table);
- /* Cache count value if not already cached */
- if (!map.containsKey(mapKey)) {
- loadCount(app, table);
- }
+ map.computeIfAbsent(mapKey, k -> {
+ try {
+ return new AtomicCounter(getCount(app, table));
+ } catch(OperationException ex) {
+ return new AtomicCounter(0);
+ }
+ });
AtomicCounter counter = map.get(mapKey);
+ counter.getSemaphore().acquireWriteLock();
try {
- counter.getSemaphore().acquireWriteLock();
final long newValue = counter.decrementAndGet();
rowCountManager.writeCount(app, table, newValue);
- } catch (InterruptedException ex) {
- LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex);
- throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR);
} finally {
counter.getSemaphore().releaseWriteLock();
}
@@ -153,4 +148,8 @@ private void loadCount(final String app, final String table) throws OperationExc
final String mapKey = getKey(app, table);
map.put(mapKey, counter);
}
+
+ private long getCount(final String app, final String table) throws OperationException {
+ return rowCountManager.readCount(app, table);
+ }
}
diff --git a/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java b/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java
index 27e7403..139fcd4 100644
--- a/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java
+++ b/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java
@@ -32,6 +32,7 @@ public final class ReadWriteSemaphore {
private final Semaphore semaphore;
private final int readPermits;
private LockType lockType;
+ private long lastOperatedAt = System.currentTimeMillis();
/**
* Construct used to consider the semaphore with the permitted read concurrency or in other words with the number of
@@ -58,7 +59,7 @@ public ReadWriteSemaphore(final int readPermits) {
* NONE
state is marked by the availability of all permits and a read or write lock can be immediately
* acquired on the semaphore.
- * This function is a blocking call until a read lock is acquired or an {@link InterruptedExeception} is thrown
+ * This function is a blocking call until a read lock is acquired + * * - * @throws InterruptedException if an interrupt event occurs while waiting for a single acquire on the semaphore. */ - public void acquireReadLock() throws InterruptedException { - semaphore.acquire(); + public void acquireReadLock() { + lastOperatedAt = System.currentTimeMillis(); + semaphore.acquireUninterruptibly(); lockType = LockType.READ; } @@ -113,8 +115,9 @@ public void acquireReadLock() throws InterruptedException { * Lock blocks the item from being available for writing by anyone other than the lock holder. Others may still read * the current value of the item */ - public void acquireWriteLock() throws InterruptedException { - semaphore.acquire(readPermits); + public void acquireWriteLock() { + lastOperatedAt = System.currentTimeMillis(); + semaphore.acquireUninterruptibly(readPermits); lockType = LockType.WRITE; } @@ -125,6 +128,7 @@ public void acquireWriteLock() throws InterruptedException { * maximum read permits. */ public void releaseReadLock() { + lastOperatedAt = System.currentTimeMillis(); if (lockType == LockType.READ && semaphore.availablePermits() < readPermits) { semaphore.release(); } @@ -135,8 +139,18 @@ public void releaseReadLock() { *WRITE
and if the currently available permits are not zero.
*/
public void releaseWriteLock() {
+ lastOperatedAt = System.currentTimeMillis();
if (lockType == LockType.WRITE && semaphore.availablePermits() == 0) {
semaphore.release(readPermits);
}
}
+
+ /**
+ * Gets the time at which this Semaphore was last operated at. Only lock and release operations affect the
+ * lastOperatedAt time reported by this function
+ * @return the lastOperatedAt time in milli-seconds from epoch
+ */
+ public long getLastOperatedAt() {
+ return this.lastOperatedAt;
+ }
}
diff --git a/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java b/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java
index 8762728..ff04d4c 100644
--- a/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java
+++ b/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java
@@ -16,21 +16,22 @@
package com.blobcity.db.locks;
-import com.blobcity.db.exceptions.ErrorCode;
import com.blobcity.db.exceptions.OperationException;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
*
* @author sanketsarang
*/
+
@Component
public class TransactionLocking {
/* Keyed on {appId}-{table}-{pk} */
- private final Map@@ -87,15 +94,20 @@ public String select(final String app, final String table, final String key) thr } } - Path path = Paths.get(PathUtil.dataFile(app, table, key)); + transactionLocking.acquireLock(app, table, key, LockType.READ); try { - result = new String(Files.readAllBytes(path), "UTF-8"); - if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { - dataCache.cache(app, table, key, result); + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + try { + result = new String(Files.readAllBytes(path), "UTF-8"); + if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { + dataCache.cache(app, table, key, result); + } + return result; + } catch (IOException e) { + throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); } - return result; - } catch (IOException e) { - throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); + } finally { + transactionLocking.releaseLock(app, table, key, LockType.READ); } /* Old and stable implementation. Was not working for special characters */ @@ -285,18 +297,23 @@ public boolean exists(final String app, final String table, final String key) th * @param key The row mapped to the key to delete */ public void remove(final String app, final String table, String key) throws OperationException { - Path path = Paths.get(PathUtil.dataFile(app, table, key)); - if (!Files.exists(path)) { - throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); - } + transactionLocking.acquireLock(app, table, key, LockType.WRITE); try { - if (Files.deleteIfExists(path) && LicenseRules.DATA_CACHING) { - dataCache.invalidate(app, table, key); + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + if (!Files.exists(path)) { + throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); } - } catch (IOException ex) { + try { + if (Files.deleteIfExists(path) && LicenseRules.DATA_CACHING) { + dataCache.invalidate(app, table, key); + } + } catch (IOException ex) { - //TODO: Notify admin - throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not delete record in table: " + table); + //TODO: Notify admin + throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not delete record in table: " + table); + } + } finally { + transactionLocking.releaseLock(app, table, key, LockType.WRITE); } } @@ -310,32 +327,42 @@ public void remove(final String app, final String table, String key) throws Oper * @throws com.blobcity.db.exceptions.OperationException */ public void save(final String app, final String table, final String key, final String jsonString) throws OperationException { - Path path = Paths.get(PathUtil.dataFile(app, table, key)); + transactionLocking.acquireLock(app, table, key, LockType.WRITE); try { - Files.write(path, jsonString.getBytes("UTF-8")); - if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { - dataCache.cache(app, table, key, jsonString); - } - } catch (IOException ex) { + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + try { + Files.write(path, jsonString.getBytes("UTF-8")); + if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { + dataCache.cache(app, table, key, jsonString); + } + } catch (IOException ex) { - //TODO: Notify admin - throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit save operation to file system for table: " + table); + //TODO: Notify admin + throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit save operation to file system for table: " + table); + } + } finally { + transactionLocking.releaseLock(app, table, key, LockType.WRITE); } } public void insert(final String app, final String table, final String key, final String jsonString) throws OperationException { - Path path = Paths.get(PathUtil.dataFile(app, table, key)); - if (Files.exists(path)) { - throw new OperationException(ErrorCode.PRIMARY_KEY_CONFLICT, "A record with the given primary key: " + key + " already isPresent in table: " + table); - } + transactionLocking.acquireLock(app, table, key, LockType.WRITE); try { - Files.write(path, jsonString.getBytes("UTF-8")); - if (LicenseRules.DATA_CACHING && LicenseRules.CACHE_INSERTS && cacheRules.shouldCache(app, table)) { - dataCache.cache(app, table, key, jsonString); + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + if (Files.exists(path)) { + throw new OperationException(ErrorCode.PRIMARY_KEY_CONFLICT, "A record with the given primary key: " + key + " already isPresent in table: " + table); } - } catch (IOException ex) { - //TODO: Notify admin - throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit insert operation to file system for table: " + table); + try { + Files.write(path, jsonString.getBytes("UTF-8")); + if (LicenseRules.DATA_CACHING && LicenseRules.CACHE_INSERTS && cacheRules.shouldCache(app, table)) { + dataCache.cache(app, table, key, jsonString); + } + } catch (IOException ex) { + //TODO: Notify admin + throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit insert operation to file system for table: " + table); + } + } finally { + transactionLocking.releaseLock(app, table, key, LockType.WRITE); } }