From 33f848ee18ac1e2032d10041cfd4c819d0ef5e55 Mon Sep 17 00:00:00 2001 From: Sanket Sarang Date: Fri, 31 May 2019 14:36:44 +0530 Subject: [PATCH] Transaction lock optimisations --- .../com/blobcity/db/data/RowCountStore.java | 59 ++++++------ .../blobcity/db/locks/ReadWriteSemaphore.java | 28 ++++-- .../blobcity/db/locks/TransactionLocking.java | 68 ++++++++----- .../db/requests/RequestHandlingBean.java | 6 ++ .../blobcity/db/storage/BSqlFileManager.java | 95 ++++++++++++------- 5 files changed, 163 insertions(+), 93 deletions(-) diff --git a/engine/src/main/java/com/blobcity/db/data/RowCountStore.java b/engine/src/main/java/com/blobcity/db/data/RowCountStore.java index e15c6d7..dcd39aa 100644 --- a/engine/src/main/java/com/blobcity/db/data/RowCountStore.java +++ b/engine/src/main/java/com/blobcity/db/data/RowCountStore.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.expression.Operation; import org.springframework.stereotype.Component; /** @@ -51,21 +52,15 @@ public class RowCountStore { public long getRowCount(final String app, final String table) throws OperationException { final String mapKey = getKey(app, table); - /* Cache count value if not already cached */ - if (!map.containsKey(mapKey)) { - loadCount(app, table); - } + map.computeIfAbsent(mapKey, k -> { + try { + return new AtomicCounter(getCount(app, table)); + } catch(OperationException ex) { + return new AtomicCounter(0); + } + }); - AtomicCounter counter = map.get(mapKey); - try { - counter.getSemaphore().acquireReadLock(); - return counter.getValue(); - } catch (InterruptedException ex) { - LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex); - throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR); - } finally { - counter.getSemaphore().releaseReadLock(); - } + return map.get(mapKey).getValue(); } /** @@ -79,19 +74,19 @@ public long getRowCount(final String app, final String table) throws OperationEx public void incrementRowCount(final String app, final String table) throws OperationException { final String mapKey = getKey(app, table); - /* Cache count value if not already cached */ - if (!map.containsKey(mapKey)) { - loadCount(app, table); - } + map.computeIfAbsent(mapKey, k -> { + try { + return new AtomicCounter(getCount(app, table)); + } catch(OperationException ex) { + return new AtomicCounter(0); + } + }); AtomicCounter counter = map.get(mapKey); + counter.getSemaphore().acquireWriteLock(); try { - counter.getSemaphore().acquireWriteLock(); final long newValue = counter.incrementAndGet(); rowCountManager.writeCount(app, table, newValue); - } catch (InterruptedException ex) { - LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex); - throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR); } finally { counter.getSemaphore().releaseWriteLock(); } @@ -108,19 +103,19 @@ public void incrementRowCount(final String app, final String table) throws Opera public void decrementRowCount(final String app, final String table) throws OperationException { final String mapKey = getKey(app, table); - /* Cache count value if not already cached */ - if (!map.containsKey(mapKey)) { - loadCount(app, table); - } + map.computeIfAbsent(mapKey, k -> { + try { + return new AtomicCounter(getCount(app, table)); + } catch(OperationException ex) { + return new AtomicCounter(0); + } + }); AtomicCounter counter = map.get(mapKey); + counter.getSemaphore().acquireWriteLock(); try { - counter.getSemaphore().acquireWriteLock(); final long newValue = counter.decrementAndGet(); rowCountManager.writeCount(app, table, newValue); - } catch (InterruptedException ex) { - LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex); - throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR); } finally { counter.getSemaphore().releaseWriteLock(); } @@ -153,4 +148,8 @@ private void loadCount(final String app, final String table) throws OperationExc final String mapKey = getKey(app, table); map.put(mapKey, counter); } + + private long getCount(final String app, final String table) throws OperationException { + return rowCountManager.readCount(app, table); + } } diff --git a/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java b/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java index 27e7403..139fcd4 100644 --- a/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java +++ b/engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java @@ -32,6 +32,7 @@ public final class ReadWriteSemaphore { private final Semaphore semaphore; private final int readPermits; private LockType lockType; + private long lastOperatedAt = System.currentTimeMillis(); /** * Construct used to consider the semaphore with the permitted read concurrency or in other words with the number of @@ -58,7 +59,7 @@ public ReadWriteSemaphore(final int readPermits) { * NONE state is marked by the availability of all permits and a read or write lock can be immediately * acquired on the semaphore.

* - * @return the current {@link LockTyoe} status of the semaphore + * @return the current {@link LockType} status of the semaphore */ public LockType getLockType() { if (semaphore.availablePermits() == readPermits) { @@ -100,12 +101,13 @@ public int availablePermits() { * {@link LockType} property of this class may result in having an inconsistent value.

* *

- * This function is a blocking call until a read lock is acquired or an {@link InterruptedExeception} is thrown

+ * This function is a blocking call until a read lock is acquired + *

* - * @throws InterruptedException if an interrupt event occurs while waiting for a single acquire on the semaphore. */ - public void acquireReadLock() throws InterruptedException { - semaphore.acquire(); + public void acquireReadLock() { + lastOperatedAt = System.currentTimeMillis(); + semaphore.acquireUninterruptibly(); lockType = LockType.READ; } @@ -113,8 +115,9 @@ public void acquireReadLock() throws InterruptedException { * Lock blocks the item from being available for writing by anyone other than the lock holder. Others may still read * the current value of the item */ - public void acquireWriteLock() throws InterruptedException { - semaphore.acquire(readPermits); + public void acquireWriteLock() { + lastOperatedAt = System.currentTimeMillis(); + semaphore.acquireUninterruptibly(readPermits); lockType = LockType.WRITE; } @@ -125,6 +128,7 @@ public void acquireWriteLock() throws InterruptedException { * maximum read permits. */ public void releaseReadLock() { + lastOperatedAt = System.currentTimeMillis(); if (lockType == LockType.READ && semaphore.availablePermits() < readPermits) { semaphore.release(); } @@ -135,8 +139,18 @@ public void releaseReadLock() { * WRITE and if the currently available permits are not zero. */ public void releaseWriteLock() { + lastOperatedAt = System.currentTimeMillis(); if (lockType == LockType.WRITE && semaphore.availablePermits() == 0) { semaphore.release(readPermits); } } + + /** + * Gets the time at which this Semaphore was last operated at. Only lock and release operations affect the + * lastOperatedAt time reported by this function + * @return the lastOperatedAt time in milli-seconds from epoch + */ + public long getLastOperatedAt() { + return this.lastOperatedAt; + } } diff --git a/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java b/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java index 8762728..ff04d4c 100644 --- a/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java +++ b/engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java @@ -16,21 +16,22 @@ package com.blobcity.db.locks; -import com.blobcity.db.exceptions.ErrorCode; import com.blobcity.db.exceptions.OperationException; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * * @author sanketsarang */ + @Component public class TransactionLocking { /* Keyed on {appId}-{table}-{pk} */ - private final Map map = new HashMap<>(); + private final Map map = new ConcurrentHashMap<>(); public boolean isLocked(String app, String table, String pk) { return map.containsKey(generateKey(app, table, pk)); @@ -38,35 +39,49 @@ public boolean isLocked(String app, String table, String pk) { public LockType getLockType(String app, String table, String pk) { final String key = generateKey(app, table, pk); - if (map.containsKey(key)) { - return map.get(key).getLockType(); - } - return LockType.NONE; + ReadWriteSemaphore rwSemaphore = map.get(key); + return rwSemaphore == null ? LockType.NONE : rwSemaphore.getLockType(); + } -// @Lock(javax.ejb.LockType.WRITE) - public String acquireLock(String app, String table, String pk, LockType lockType) throws OperationException, InterruptedException { + public void acquireLock(String app, String table, String pk, LockType lockType) { final String key = generateKey(app, table, pk); - ReadWriteSemaphore readWriteSemaphore; switch (lockType) { case READ: - readWriteSemaphore = new ReadWriteSemaphore(1); - map.put(key, readWriteSemaphore); - readWriteSemaphore.acquireReadLock(); - return key; + map.compute(key, (k, value)-> { + if(value == null) { + return new ReadWriteSemaphore(1); + } + return value; + }); + map.get(key).acquireReadLock(); case WRITE: - readWriteSemaphore = new ReadWriteSemaphore(1); - map.put(key, readWriteSemaphore); - readWriteSemaphore.acquireWriteLock(); - return key; - default: - throw new OperationException(ErrorCode.LOCKING_ERROR, "Lock type can only be one of READ, WRITE"); + map.compute(key, (k, value)-> { + if(value == null) { + return new ReadWriteSemaphore(1); + } + return value; + }); + map.get(key).acquireWriteLock(); } } - public String releaseLock(String app, String table, String pk, LockType lockType) throws OperationException { - throw new OperationException(ErrorCode.OPERATION_NOT_SUPPORTED, "Method releaseLock(...) inside TransactionLocking not yet implemeneted."); + public void releaseLock(String app, String table, String pk, LockType lockType) { + final String key = generateKey(app, table, pk); + ReadWriteSemaphore rwSemaphore = map.get(key); + if(rwSemaphore == null) { + return; + } + + switch(lockType) { + case READ: + rwSemaphore.releaseReadLock(); + break; + case WRITE: + rwSemaphore.releaseWriteLock(); + break; + } } private String generateKey(String app, String table, String pk) { @@ -77,4 +92,13 @@ private String generateKey(String app, String table, String pk) { sb.append(pk); return sb.toString(); } + + @Scheduled(cron = "0 * * * * *") + private void cleanUp() { + final long removeBefore = System.currentTimeMillis() - 30000; //30 seconds + map.entrySet().removeIf(key -> { + ReadWriteSemaphore rws = map.get(key); + return rws.getLockType() == LockType.NONE && rws.getLastOperatedAt() < removeBefore; + }); + } } diff --git a/engine/src/main/java/com/blobcity/db/requests/RequestHandlingBean.java b/engine/src/main/java/com/blobcity/db/requests/RequestHandlingBean.java index a328632..a7dabfd 100644 --- a/engine/src/main/java/com/blobcity/db/requests/RequestHandlingBean.java +++ b/engine/src/main/java/com/blobcity/db/requests/RequestHandlingBean.java @@ -282,9 +282,15 @@ private Query processRequest(final Query query) { Future futureResponse = MasterExecutorService.getInstance().submit(masterExecutable); Query response = futureResponse.get(); + if(response == null) { + logger.warn("Request (" + query.getRequestId() + ") failed with unknown cause"); + masterExecutable.rollback(); + return new Query().ackFailure(); + } logger.info("Response for requestId: {}, response: {}", query.getRequestId(), response.toJsonString()); return response; } catch (InterruptedException | ExecutionException e) { + logger.warn("Request (" + query.getRequestId() + ") failed on an internal exception"); masterExecutable.rollback(); return new Query().ackFailure(); } finally { diff --git a/engine/src/main/java/com/blobcity/db/storage/BSqlFileManager.java b/engine/src/main/java/com/blobcity/db/storage/BSqlFileManager.java index bd05353..bcd4bf3 100644 --- a/engine/src/main/java/com/blobcity/db/storage/BSqlFileManager.java +++ b/engine/src/main/java/com/blobcity/db/storage/BSqlFileManager.java @@ -22,6 +22,8 @@ import com.blobcity.db.exceptions.ErrorCode; import com.blobcity.db.exceptions.OperationException; import com.blobcity.db.license.LicenseRules; +import com.blobcity.db.locks.LockType; +import com.blobcity.db.locks.TransactionLocking; import com.blobcity.db.sql.util.PathUtil; import com.blobcity.db.util.FileNameEncoding; import java.io.BufferedReader; @@ -41,6 +43,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import com.blobcity.util.lambda.Counter; @@ -48,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; /** @@ -66,6 +71,8 @@ public class BSqlFileManager { private DataCache dataCache; @Autowired private CacheRules cacheRules; + @Autowired + private TransactionLocking transactionLocking; /** *

@@ -87,15 +94,20 @@ public String select(final String app, final String table, final String key) thr } } - Path path = Paths.get(PathUtil.dataFile(app, table, key)); + transactionLocking.acquireLock(app, table, key, LockType.READ); try { - result = new String(Files.readAllBytes(path), "UTF-8"); - if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { - dataCache.cache(app, table, key, result); + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + try { + result = new String(Files.readAllBytes(path), "UTF-8"); + if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { + dataCache.cache(app, table, key, result); + } + return result; + } catch (IOException e) { + throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); } - return result; - } catch (IOException e) { - throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); + } finally { + transactionLocking.releaseLock(app, table, key, LockType.READ); } /* Old and stable implementation. Was not working for special characters */ @@ -285,18 +297,23 @@ public boolean exists(final String app, final String table, final String key) th * @param key The row mapped to the key to delete */ public void remove(final String app, final String table, String key) throws OperationException { - Path path = Paths.get(PathUtil.dataFile(app, table, key)); - if (!Files.exists(path)) { - throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); - } + transactionLocking.acquireLock(app, table, key, LockType.WRITE); try { - if (Files.deleteIfExists(path) && LicenseRules.DATA_CACHING) { - dataCache.invalidate(app, table, key); + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + if (!Files.exists(path)) { + throw new OperationException(ErrorCode.PRIMARY_KEY_INEXISTENT, "A record with the given primary key: " + key + " could not be found in table: " + table); } - } catch (IOException ex) { + try { + if (Files.deleteIfExists(path) && LicenseRules.DATA_CACHING) { + dataCache.invalidate(app, table, key); + } + } catch (IOException ex) { - //TODO: Notify admin - throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not delete record in table: " + table); + //TODO: Notify admin + throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not delete record in table: " + table); + } + } finally { + transactionLocking.releaseLock(app, table, key, LockType.WRITE); } } @@ -310,32 +327,42 @@ public void remove(final String app, final String table, String key) throws Oper * @throws com.blobcity.db.exceptions.OperationException */ public void save(final String app, final String table, final String key, final String jsonString) throws OperationException { - Path path = Paths.get(PathUtil.dataFile(app, table, key)); + transactionLocking.acquireLock(app, table, key, LockType.WRITE); try { - Files.write(path, jsonString.getBytes("UTF-8")); - if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { - dataCache.cache(app, table, key, jsonString); - } - } catch (IOException ex) { + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + try { + Files.write(path, jsonString.getBytes("UTF-8")); + if (LicenseRules.DATA_CACHING && cacheRules.shouldCache(app, table)) { + dataCache.cache(app, table, key, jsonString); + } + } catch (IOException ex) { - //TODO: Notify admin - throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit save operation to file system for table: " + table); + //TODO: Notify admin + throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit save operation to file system for table: " + table); + } + } finally { + transactionLocking.releaseLock(app, table, key, LockType.WRITE); } } public void insert(final String app, final String table, final String key, final String jsonString) throws OperationException { - Path path = Paths.get(PathUtil.dataFile(app, table, key)); - if (Files.exists(path)) { - throw new OperationException(ErrorCode.PRIMARY_KEY_CONFLICT, "A record with the given primary key: " + key + " already isPresent in table: " + table); - } + transactionLocking.acquireLock(app, table, key, LockType.WRITE); try { - Files.write(path, jsonString.getBytes("UTF-8")); - if (LicenseRules.DATA_CACHING && LicenseRules.CACHE_INSERTS && cacheRules.shouldCache(app, table)) { - dataCache.cache(app, table, key, jsonString); + Path path = Paths.get(PathUtil.dataFile(app, table, key)); + if (Files.exists(path)) { + throw new OperationException(ErrorCode.PRIMARY_KEY_CONFLICT, "A record with the given primary key: " + key + " already isPresent in table: " + table); } - } catch (IOException ex) { - //TODO: Notify admin - throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit insert operation to file system for table: " + table); + try { + Files.write(path, jsonString.getBytes("UTF-8")); + if (LicenseRules.DATA_CACHING && LicenseRules.CACHE_INSERTS && cacheRules.shouldCache(app, table)) { + dataCache.cache(app, table, key, jsonString); + } + } catch (IOException ex) { + //TODO: Notify admin + throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "An internal operation error occured. Could not commit insert operation to file system for table: " + table); + } + } finally { + transactionLocking.releaseLock(app, table, key, LockType.WRITE); } }