Skip to content

Commit

Permalink
Transaction lock optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketsarang committed May 31, 2019
1 parent 7c7a76a commit 33f848e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 93 deletions.
59 changes: 29 additions & 30 deletions engine/src/main/java/com/blobcity/db/data/RowCountStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.Operation;
import org.springframework.stereotype.Component;

/**
Expand Down Expand Up @@ -51,21 +52,15 @@ public class RowCountStore {
public long getRowCount(final String app, final String table) throws OperationException {
final String mapKey = getKey(app, table);

/* Cache count value if not already cached */
if (!map.containsKey(mapKey)) {
loadCount(app, table);
}
map.computeIfAbsent(mapKey, k -> {
try {
return new AtomicCounter(getCount(app, table));
} catch(OperationException ex) {
return new AtomicCounter(0);
}
});

AtomicCounter counter = map.get(mapKey);
try {
counter.getSemaphore().acquireReadLock();
return counter.getValue();
} catch (InterruptedException ex) {
LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex);
throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR);
} finally {
counter.getSemaphore().releaseReadLock();
}
return map.get(mapKey).getValue();
}

/**
Expand All @@ -79,19 +74,19 @@ public long getRowCount(final String app, final String table) throws OperationEx
public void incrementRowCount(final String app, final String table) throws OperationException {
final String mapKey = getKey(app, table);

/* Cache count value if not already cached */
if (!map.containsKey(mapKey)) {
loadCount(app, table);
}
map.computeIfAbsent(mapKey, k -> {
try {
return new AtomicCounter(getCount(app, table));
} catch(OperationException ex) {
return new AtomicCounter(0);
}
});

AtomicCounter counter = map.get(mapKey);
counter.getSemaphore().acquireWriteLock();
try {
counter.getSemaphore().acquireWriteLock();
final long newValue = counter.incrementAndGet();
rowCountManager.writeCount(app, table, newValue);
} catch (InterruptedException ex) {
LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex);
throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR);
} finally {
counter.getSemaphore().releaseWriteLock();
}
Expand All @@ -108,19 +103,19 @@ public void incrementRowCount(final String app, final String table) throws Opera
public void decrementRowCount(final String app, final String table) throws OperationException {
final String mapKey = getKey(app, table);

/* Cache count value if not already cached */
if (!map.containsKey(mapKey)) {
loadCount(app, table);
}
map.computeIfAbsent(mapKey, k -> {
try {
return new AtomicCounter(getCount(app, table));
} catch(OperationException ex) {
return new AtomicCounter(0);
}
});

AtomicCounter counter = map.get(mapKey);
counter.getSemaphore().acquireWriteLock();
try {
counter.getSemaphore().acquireWriteLock();
final long newValue = counter.decrementAndGet();
rowCountManager.writeCount(app, table, newValue);
} catch (InterruptedException ex) {
LoggerFactory.getLogger(RowCountStore.class.getName()).error(null, ex);
throw new OperationException(ErrorCode.COLLECTION_ROW_COUNT_ERROR);
} finally {
counter.getSemaphore().releaseWriteLock();
}
Expand Down Expand Up @@ -153,4 +148,8 @@ private void loadCount(final String app, final String table) throws OperationExc
final String mapKey = getKey(app, table);
map.put(mapKey, counter);
}

private long getCount(final String app, final String table) throws OperationException {
return rowCountManager.readCount(app, table);
}
}
28 changes: 21 additions & 7 deletions engine/src/main/java/com/blobcity/db/locks/ReadWriteSemaphore.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class ReadWriteSemaphore {
private final Semaphore semaphore;
private final int readPermits;
private LockType lockType;
private long lastOperatedAt = System.currentTimeMillis();

/**
* Construct used to consider the semaphore with the permitted read concurrency or in other words with the number of
Expand All @@ -58,7 +59,7 @@ public ReadWriteSemaphore(final int readPermits) {
* <code>NONE</code> state is marked by the availability of all permits and a read or write lock can be immediately
* acquired on the semaphore.</p>
*
* @return the current {@link LockTyoe} status of the semaphore
* @return the current {@link LockType} status of the semaphore
*/
public LockType getLockType() {
if (semaphore.availablePermits() == readPermits) {
Expand Down Expand Up @@ -100,21 +101,23 @@ public int availablePermits() {
* {@link LockType} property of this class may result in having an inconsistent value.</p>
*
* <p>
* This function is a blocking call until a read lock is acquired or an {@link InterruptedExeception} is thrown</p>
* This function is a blocking call until a read lock is acquired
* </p>
*
* @throws InterruptedException if an interrupt event occurs while waiting for a single acquire on the semaphore.
*/
public void acquireReadLock() throws InterruptedException {
semaphore.acquire();
public void acquireReadLock() {
lastOperatedAt = System.currentTimeMillis();
semaphore.acquireUninterruptibly();
lockType = LockType.READ;
}

/**
* Lock blocks the item from being available for writing by anyone other than the lock holder. Others may still read
* the current value of the item
*/
public void acquireWriteLock() throws InterruptedException {
semaphore.acquire(readPermits);
public void acquireWriteLock() {
lastOperatedAt = System.currentTimeMillis();
semaphore.acquireUninterruptibly(readPermits);
lockType = LockType.WRITE;
}

Expand All @@ -125,6 +128,7 @@ public void acquireWriteLock() throws InterruptedException {
* maximum read permits.
*/
public void releaseReadLock() {
lastOperatedAt = System.currentTimeMillis();
if (lockType == LockType.READ && semaphore.availablePermits() < readPermits) {
semaphore.release();
}
Expand All @@ -135,8 +139,18 @@ public void releaseReadLock() {
* <code>WRITE</code> and if the currently available permits are not zero.
*/
public void releaseWriteLock() {
lastOperatedAt = System.currentTimeMillis();
if (lockType == LockType.WRITE && semaphore.availablePermits() == 0) {
semaphore.release(readPermits);
}
}

/**
* Gets the time at which this Semaphore was last operated at. Only lock and release operations affect the
* lastOperatedAt time reported by this function
* @return the lastOperatedAt time in milli-seconds from epoch
*/
public long getLastOperatedAt() {
return this.lastOperatedAt;
}
}
68 changes: 46 additions & 22 deletions engine/src/main/java/com/blobcity/db/locks/TransactionLocking.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,72 @@

package com.blobcity.db.locks;

import com.blobcity.db.exceptions.ErrorCode;
import com.blobcity.db.exceptions.OperationException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
*
* @author sanketsarang
*/

@Component
public class TransactionLocking {

/* Keyed on {appId}-{table}-{pk} */
private final Map<String, ReadWriteSemaphore> map = new HashMap<>();
private final Map<String, ReadWriteSemaphore> map = new ConcurrentHashMap<>();

public boolean isLocked(String app, String table, String pk) {
return map.containsKey(generateKey(app, table, pk));
}

public LockType getLockType(String app, String table, String pk) {
final String key = generateKey(app, table, pk);
if (map.containsKey(key)) {
return map.get(key).getLockType();
}
return LockType.NONE;
ReadWriteSemaphore rwSemaphore = map.get(key);
return rwSemaphore == null ? LockType.NONE : rwSemaphore.getLockType();

}

// @Lock(javax.ejb.LockType.WRITE)
public String acquireLock(String app, String table, String pk, LockType lockType) throws OperationException, InterruptedException {
public void acquireLock(String app, String table, String pk, LockType lockType) {
final String key = generateKey(app, table, pk);
ReadWriteSemaphore readWriteSemaphore;

switch (lockType) {
case READ:
readWriteSemaphore = new ReadWriteSemaphore(1);
map.put(key, readWriteSemaphore);
readWriteSemaphore.acquireReadLock();
return key;
map.compute(key, (k, value)-> {
if(value == null) {
return new ReadWriteSemaphore(1);
}
return value;
});
map.get(key).acquireReadLock();
case WRITE:
readWriteSemaphore = new ReadWriteSemaphore(1);
map.put(key, readWriteSemaphore);
readWriteSemaphore.acquireWriteLock();
return key;
default:
throw new OperationException(ErrorCode.LOCKING_ERROR, "Lock type can only be one of READ, WRITE");
map.compute(key, (k, value)-> {
if(value == null) {
return new ReadWriteSemaphore(1);
}
return value;
});
map.get(key).acquireWriteLock();
}
}

public String releaseLock(String app, String table, String pk, LockType lockType) throws OperationException {
throw new OperationException(ErrorCode.OPERATION_NOT_SUPPORTED, "Method releaseLock(...) inside TransactionLocking not yet implemeneted.");
public void releaseLock(String app, String table, String pk, LockType lockType) {
final String key = generateKey(app, table, pk);
ReadWriteSemaphore rwSemaphore = map.get(key);
if(rwSemaphore == null) {
return;
}

switch(lockType) {
case READ:
rwSemaphore.releaseReadLock();
break;
case WRITE:
rwSemaphore.releaseWriteLock();
break;
}
}

private String generateKey(String app, String table, String pk) {
Expand All @@ -77,4 +92,13 @@ private String generateKey(String app, String table, String pk) {
sb.append(pk);
return sb.toString();
}

@Scheduled(cron = "0 * * * * *")
private void cleanUp() {
final long removeBefore = System.currentTimeMillis() - 30000; //30 seconds
map.entrySet().removeIf(key -> {
ReadWriteSemaphore rws = map.get(key);
return rws.getLockType() == LockType.NONE && rws.getLastOperatedAt() < removeBefore;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,15 @@ private Query processRequest(final Query query) {
Future<Query> futureResponse = MasterExecutorService.getInstance().submit(masterExecutable);

Query response = futureResponse.get();
if(response == null) {
logger.warn("Request (" + query.getRequestId() + ") failed with unknown cause");
masterExecutable.rollback();
return new Query().ackFailure();
}
logger.info("Response for requestId: {}, response: {}", query.getRequestId(), response.toJsonString());
return response;
} catch (InterruptedException | ExecutionException e) {
logger.warn("Request (" + query.getRequestId() + ") failed on an internal exception");
masterExecutable.rollback();
return new Query().ackFailure();
} finally {
Expand Down
Loading

0 comments on commit 33f848e

Please sign in to comment.