diff --git a/README.md b/README.md index a53be1c..f8cf1ff 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,13 @@ -Steps to run the membership service - +----------------------------------------------------------------------------------------------------------------- +Steps to run the BoltDB KeyValue Store Server - +----------------------------------------------------------------------------------------------------------------- -1) Copy the boltdb-0.0.1-SNAPSHOT.jar, gson-1.7.1.jar, log4j-1.2.17.jar, dgroupmembership and boltdb.prop - files into a folder. +1) Copy the boltdb-0.0.1-SNAPSHOT.jar, gson-1.7.1.jar, log4j-1.2.17.jar, commons-collections-3.0.jar, boltdb-server, server.policy + BoltDBServer_Stub.class and boltdb.prop files into a folder. 2) cd into the folder to which you copied the above files. -3) Open the boltdb.prop file in a text editor and set the properties +3) Open the server.policy file and change the path in the policy file to the path of the current folder( where + the jar resides) +4) Open the boltdb.prop file in a text editor and set the properties * "groupmembership.contact" - This is the hostname/ipaddress of the contact machine. * "groupmembership.tfail" - The time out for marking a process as failed * "groupmembership.heartbeat.freq" - The frequency at which the local heartbeat should be @@ -12,18 +16,50 @@ Steps to run the membership service - membership list should be refreshed (mark failures and remove marked entries). * "groupmembership.gossip.freq" - The gossip frequency. * "groupmembership.lossrate" - The message loss rate to be simulated. + * "groupmembership.rfactor" - The replication factor -4) Run the command - "./dgroupmembership -contact true -id machine.1 +5) Run the command - "./boltdb-server -contact true -id machine.1 * "-contact" option is for specifying whether or not this daemon is the contact machine daemon.(In this case, it is "true". For the other daemons, specify "-contact" as "false") * "-id" is a unique id for the daemon and this unique id will be used to name the log file on this machine. -5) While running the service, if you wish to voluntarily leave the system, type "leave" and hit enter. If you - want to crash the process, press Ctrl+C. +6) On the boltdb-server> prompt - + * If you wish to voluntarily leave the system, type "leave" and hit enter. + * If you wish to print out the membership list type "shownodes" and hit enter + * If you wish to print out the contents of the key value store type "showKV" and hit enter. + * If you wish to print out 10 most recent reads and writes, type "show" and hit enter +----------------------------------------------------------------------------------------------------------------- + + +----------------------------------------------------------------------------------------------------------------- +Steps to run the BoltDB KeyValue Store Client - +----------------------------------------------------------------------------------------------------------------- + +1) Copy the boltdb-0.0.1-SNAPSHOT.jar, gson-1.7.1.jar, log4j-1.2.17.jar, commons-collections-3.0.jar, boltdb-server, server.policy + BoltDBServer_Stub.class and boltdb.prop files into a folder. +2) cd into the folder to which you copied the above files. +3) Open the client.policy file and change the path in the policy file to the path of the current folder( where + the jar resides) +4) Open the boltdb.prop file in a text editor and set the properties + * "boltdb.kvstore.server" - This is the hostname/ipaddress of the server to which the client talks. + * "boltdb.kvstore.clevel" - This is the consistency level. Possible values - ONE, QUORUM, ALL +5) Run the command - "./boltdb-client +6) On the boltdb-client> prompt - + * "insert " - Inserts the key and value onto the Distributed Key Value Store with the given consistency level + * "update " - Updates the key and value onto the Distributed Key Value Store with the given consistency level + * "lookup " - Looks Up the key in the Distributed Key Value Store with the given consistency level + * "delete " - Deletes the key from the Distributed Key Value Store with the given consistency level +----------------------------------------------------------------------------------------------------------------- + + + +----------------------------------------------------------------------------------------------------------------- Steps to run the Log Querier service - +----------------------------------------------------------------------------------------------------------------- Start the LogQuerier server - + 1) Copy the boltdb-0.0.1-SNAPSHOT.jar, dgrep and boltdb.prop files into a folder. (If you have already copied these files in the previous step - while running the GroupMembership daemon, you should be fine) 2) cd into the folder to which you copied the above files. @@ -34,6 +70,7 @@ Start the LogQuerier server - Query logs using the LogQuerier client - + 1) Copy the boltdb-0.0.1-SNAPSHOT.jar, dgrep and boltdb.prop files into a folder (If you have already copied these files in the previous step - while running the GroupMembership daemon, you should be fine) 2) cd into the folder to which you copied the above files. @@ -44,3 +81,4 @@ Query logs using the LogQuerier client - * To look for joins, type the command "./dgrep -key JOINED" * To look for crashes, type the command "./dgrep -key CRASHED" * To look for leavs, type the command "./dgrep -key LEFT" +----------------------------------------------------------------------------------------------------------------- diff --git a/boltdb/.classpath b/boltdb/.classpath index b39967e..2d87044 100644 --- a/boltdb/.classpath +++ b/boltdb/.classpath @@ -6,25 +6,25 @@ - + - + + + - + - - + - diff --git a/boltdb/boltdb.prop b/boltdb/boltdb.prop new file mode 100644 index 0000000..60a8072 --- /dev/null +++ b/boltdb/boltdb.prop @@ -0,0 +1,8 @@ +machines.address=192.17.11.8:6789,192.17.11.7:6789,192.17.11.18:6789,192.17.11.19:6789 +groupmembership.lossrate=0 +groupmembership.contact=siebl-0218-07.ews.illinois.edu +groupmembership.tfail=3 +groupmembership.heartbeat.freq=600 +groupmembership.refreshMembershipList.freq=1000 +groupmembership.gossip.freq=1000 +boltdb.server=172.16.149.48 diff --git a/boltdb/pom.xml b/boltdb/pom.xml index ec72118..e662bdb 100644 --- a/boltdb/pom.xml +++ b/boltdb/pom.xml @@ -13,7 +13,24 @@ UTF-8 - + + + + org.codehaus.mojo + rmic-maven-plugin + 1.2.1 + + + rmic-process-classes + + rmic + + + + + + + junit @@ -45,6 +62,11 @@ log4j log4j 1.2.17 + + + commons-collections + commons-collections + 3.0 diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBClient.java b/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBClient.java new file mode 100644 index 0000000..dd212d8 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBClient.java @@ -0,0 +1,223 @@ +package edu.uiuc.boltdb; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.Properties; +import java.util.StringTokenizer; + +/** + * This class represents the client component of the distributed key value store. On start up it creates + * a boltdb-client> shell where the user can type in the insert, update, lookup and delete queries. + * The BoltDBClient class creates a reference to a remote BoltDBServer object in the member variable boltDBServer. + * All the operations (insert, lookup, update and delete) are performed on this remote object reference. + * + * @author Adarsh + * + */ + +public class BoltDBClient { + + private BoltDBProtocol boltDBServer = null; + + public BoltDBClient(String rmiString) throws MalformedURLException, RemoteException, NotBoundException { + if(System.getSecurityManager() == null) { + System.setSecurityManager(new SecurityManager()); + } + boltDBServer = (BoltDBProtocol) Naming.lookup("rmi://" + rmiString + "/KVStore"); + } + + /** + * @param args + * @throws IOException + * @throws NotBoundException + */ + public static void main(String[] args) { + try { + Properties prop = new Properties(); + FileInputStream fis = new FileInputStream("./boltdb.prop"); + prop.load(fis); + fis.close(); + String rmiString = prop.getProperty("boltdb.kvstore.server"); + BoltDBClient boltDBClient = new BoltDBClient(rmiString); + boltDBClient.runClientShell(); + } catch(Exception e) { + e.printStackTrace(); + } + } + + // Method to simulate a boltdb-client shell + private void runClientShell() throws IOException { + // Simulate a unix shell + String commandString = ""; + BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); + + // Break out with Ctrl+C + while (true) { + // Read user's input + System.out.print("boltdb-client>"); + commandString = console.readLine(); + + // If the user entered a return, just loop again + if (commandString.equals("")) + continue; + handleCommand(commandString); + } + } + + private BoltDBProtocol.CONSISTENCY_LEVEL getConsistency(String clevel) { + if(clevel.equals("ONE")) return BoltDBProtocol.CONSISTENCY_LEVEL.ONE; + else if (clevel.equals("QUORUM")) return BoltDBProtocol.CONSISTENCY_LEVEL.QUORUM; + else if (clevel.equals("ALL")) return BoltDBProtocol.CONSISTENCY_LEVEL.ALL; + + return null; + } + // This method handles the commands entered by the user in the boltdb-client shell + private void handleCommand(String commandString) { + try { + StringTokenizer stk = new StringTokenizer(commandString); + if(stk.countTokens() < 3) { + printUsage(0); + return; + } + + // The Command Type + String commandType = stk.nextToken(); + + if(commandType.equals("insert")) { + String clevel = stk.nextToken(); + if (!clevel.equals("ONE") && !clevel.equals("QUORUM") && !clevel.equals("ALL")) { + printUsage(1); + return; + } + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel = getConsistency(clevel); + + String keyStr = stk.nextToken(); + long key = parseKey(keyStr); + if(key == -1) return; + String value = ""; + if(stk.hasMoreTokens()) { + while(stk.hasMoreTokens()) + value += stk.nextToken() + " "; + } + else { + printUsage(1); + return; + } + value = value.trim(); + // Perform Insert Operation on the remote server object + if(!boltDBServer.insert(key, new ValueTimeStamp(value, 0), true,consistencyLevel)) + System.out.println("Key already present"); + } else if(commandType.equals("update")) { + String clevel = stk.nextToken(); + if (!clevel.equals("ONE") && !clevel.equals("QUORUM") && !clevel.equals("ALL")) { + printUsage(2); + return; + } + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel = getConsistency(clevel); + + String keyStr = stk.nextToken(); + long key = parseKey(keyStr); + if(key == -1) return; + String value = ""; + if(stk.hasMoreTokens()) { + while(stk.hasMoreTokens()) + value += stk.nextToken() + " "; + } + else { + printUsage(2); + return; + } + value = value.trim(); + // Perform Update Operation on the remote server object + if(!boltDBServer.update(key, new ValueTimeStamp(value, 0), true, consistencyLevel)) + System.out.println("Key not present"); + } else if(commandType.equals("lookup")) { + String clevel = stk.nextToken(); + if (!clevel.equals("ONE") && !clevel.equals("QUORUM") && !clevel.equals("ALL")) { + printUsage(3); + return; + } + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel = getConsistency(clevel); + + String keyStr = stk.nextToken(); + long key = parseKey(keyStr); + if(key == -1) return; + // Perform LookUp Operation on the remote server object + ValueTimeStamp value = boltDBServer.lookup(key, true, consistencyLevel); + if(value == null) { + System.out.println("Key not present"); + return; + } + System.out.println("Look Up Result : " + value.value); + } else if(commandType.equals("delete")) { + String clevel = stk.nextToken(); + if (!clevel.equals("ONE") && !clevel.equals("QUORUM") && !clevel.equals("ALL")) { + printUsage(4); + return; + } + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel = getConsistency(clevel); + String keyStr = stk.nextToken(); + long key = parseKey(keyStr); + if(key == -1) return; + // Perform Delete Operation on the remote server object + if(boltDBServer.delete(key, true, consistencyLevel)) + System.out.println("Key Value Pair Deleted"); + else + System.out.println("Delete Failed"); + } else { + printUsage(0); + return; + } + } catch(RemoteException re) { + if(re.getCause().getCause() != null) + System.out.println(re.getCause().getCause().getMessage()); + else + System.out.println(re.getCause().getMessage()); + } + } + + // Method to parse the key entered by the user to long, and to validate the key + private long parseKey(String keyStr) { + long key; + try { + key = Long.parseLong(keyStr); + if(key < 0L || key > 1000000L) { + System.out.println("Invalid Key Range. Key should be in the Range 0 - 1000000"); + return -1L; + } + return key; + } catch(NumberFormatException nfe) { + System.out.println("Invalid Key : Please enter an Integer value for key"); + return -1L; + } + } + + // Method to print the usage of BoltDBClient + private void printUsage(int type) { + System.out.println("Invalid Command"); + System.out.println("Usage : "); + switch(type) { + case 1: System.out.println("insert "); + break; + case 2: System.out.println("update "); + break; + case 3: System.out.println("lookukp "); + break; + case 4: System.out.println("delete "); + break; + default:System.out.println("insert "); + System.out.println("update "); + System.out.println("lookup "); + System.out.println("delete "); + break; + } + System.out.println(" is an integer value in the range 0 - 1000000"); + System.out.println(); + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBProtocol.java b/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBProtocol.java new file mode 100644 index 0000000..5d72676 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBProtocol.java @@ -0,0 +1,58 @@ +package edu.uiuc.boltdb; + +import java.rmi.Remote; +import java.rmi.RemoteException; + +/** + * This interface represents the protocol between client and key-value store server. + * @author Ashwin + * + */ +public interface BoltDBProtocol extends Remote { + + /** + * The insert api is used to insert a key and a value into the store. + *'canBeForwarded' is a flag to indicate whether the request can be forwarded to + *other servers to perform the operation. + *Returns false if key is already present in the store. + * @param key + * @param value + * @param canBeForwarded + * @throws RemoteException + */ + public Boolean insert(long key, ValueTimeStamp value, boolean canBeForwarded, CONSISTENCY_LEVEL consistencyLevel) throws RemoteException; + /** + * The lookup api is used to lookup the value associated with a key. + * Throws an exception is key is not present in the store. + * @param key + * @param canBeForwarded + * @return + * @throws RemoteException + */ + public ValueTimeStamp lookup(long key, boolean canBeForwarded, CONSISTENCY_LEVEL consistencyLevel) throws RemoteException; + + /** + * The update api updates the value of the provided key with the new value. + * Throws an exception if the key is not present. + * @param key + * @param value + * @param canBeForwarded + * @throws RemoteException + */ + public Boolean update(long key, ValueTimeStamp value, boolean canBeForwarded, CONSISTENCY_LEVEL consistencyLevel) throws RemoteException; + + /** + * The delete api removes the key-value entry from the store. + * Throws an exception is the key is not present in the store. + * @param key + * @param canBeForwarded + * @throws RemoteException + */ + public Boolean delete(long key, boolean canBeForwarded, CONSISTENCY_LEVEL consistencyLevel) throws RemoteException; + + public void lookupAndInsertInto(String hostname, long startKeyRange, long endKeyRange) throws RemoteException; + + public enum CONSISTENCY_LEVEL { + ONE, QUORUM, ALL + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBServer.java b/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBServer.java new file mode 100644 index 0000000..168a0b0 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/BoltDBServer.java @@ -0,0 +1,341 @@ +package edu.uiuc.boltdb; + +import java.io.IOException; +import java.rmi.Naming; +import java.rmi.RemoteException; +import java.rmi.registry.LocateRegistry; +import java.rmi.server.UnicastRemoteObject; +import java.security.NoSuchAlgorithmException; +import java.util.Date; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.collections.buffer.CircularFifoBuffer; +import org.apache.log4j.Logger; + +import edu.uiuc.boltdb.groupmembership.GroupMembership; +import edu.uiuc.boltdb.groupmembership.beans.Operation; +import edu.uiuc.boltdb.methods.DeleteThread; +import edu.uiuc.boltdb.methods.InsertThread; +import edu.uiuc.boltdb.methods.LookupThread; +import edu.uiuc.boltdb.methods.UpdateThread; + +/** + * This class represents the server component of the distributed key value store. It implements the + * BoltDBProtocol interface. On startup, it starts the GroupMembership service. It maintains a + * ConcurrentHashMap KVStore that stores the key-value pairs that are destined for this host. Since it implements + * the BoltDBProtocol, it provides the Remote methods to insert, lookup, update and delete keys. Upon request + * from a client, the request is forwarded to all the replicas of the key(Active replication) by spawning + * k threads where k is the replication factor. + * Each request takes in consistency level as parameter and accordingly waits for one or more threads to respond before + * sending back the result to the client. + */ + +public class BoltDBServer extends UnicastRemoteObject implements BoltDBProtocol { + + private static final long serialVersionUID = 5195393553928167809L; + private static org.apache.log4j.Logger log = Logger.getRootLogger(); + + //Buffer to hold most recent reads and writes + public static CircularFifoBuffer readBuffer = new CircularFifoBuffer(10); + public static CircularFifoBuffer writeBuffer = new CircularFifoBuffer(10); + + protected BoltDBServer() throws RemoteException { + super(); + } + + /** + * @param args + */ + //The in-memory key value store ! + public static ConcurrentMap KVStore = new ConcurrentHashMap(); + + public static void main(String[] args) throws IOException { + + // Start the GroupMembership service + Runnable groupMembership = new GroupMembership(args); + Thread groupMembershipThread = new Thread(groupMembership); + groupMembershipThread.start(); + + //Create RMI registry + LocateRegistry.createRegistry(1099); + Naming.rebind ("KVStore", new BoltDBServer()); + } + + /** + * Gets the hostname of all the replicas of the key + * @param key + * @return + * @throws NoSuchAlgorithmException + */ + private String[] getTargetHosts(long key) throws NoSuchAlgorithmException { + String targetHost = null; + String[] targetHosts = new String[GroupMembership.replicationFactor]; + targetHost = GroupMembership.getSuccessorNode(GroupMembership.computeHash((new Long(key).toString()))); + if(targetHost != null) { + targetHosts[0] = targetHost; + } else { + log.error("Target host is null"); + return null; + } + + for(int i = 1; i < GroupMembership.replicationFactor; i++) { + targetHosts[i] = GroupMembership.getSuccessorNode(GroupMembership.membershipList.get(targetHosts[i-1]).hashValue); + } + return targetHosts; + } + + private int convertConsistencyLevelToInt(CONSISTENCY_LEVEL consistencyLevel) { + switch(consistencyLevel) { + case ALL: + return GroupMembership.replicationFactor; + case ONE: + return 1; + case QUORUM: + return GroupMembership.replicationFactor/2 + 1; + default: + break; + + } + return -1; + } + + /** + * Collects results from few replicas depending on the consistency level + * @param completionService + * @param consistencyLevel + * @return + * @throws InterruptedException + * @throws ExecutionException + */ + private boolean waitForReplicaReplies(ExecutorCompletionService completionService, CONSISTENCY_LEVEL consistencyLevel) throws InterruptedException, ExecutionException { + int replicaRepliesToWaitFor = convertConsistencyLevelToInt(consistencyLevel); + + for(int i = 0; i < GroupMembership.replicationFactor; i++) { + final Future future = completionService.take(); + if(future.get() == true) { + replicaRepliesToWaitFor--; + if (replicaRepliesToWaitFor == 0) { + return true; + } + } + } + return false; + } + + private ValueTimeStamp waitForReplicaRepliesForLookup(ExecutorCompletionService completionService, CONSISTENCY_LEVEL consistencyLevel) throws InterruptedException, ExecutionException { + int replicaRepliesToWaitFor = convertConsistencyLevelToInt(consistencyLevel); + ValueTimeStamp result = null; + ValueTimeStamp temp; + for(int i = 0; i < GroupMembership.replicationFactor; i++) { + final Future future = completionService.take(); + if((temp = future.get()) != null) { + if (result == null) result = temp; + else { + if(result.timeStamp < temp.timeStamp) + result = temp; + } + replicaRepliesToWaitFor--; + if (replicaRepliesToWaitFor == 0) { + return result; + } + } + } + return null; + } + /** + * The insert api is used to insert a key and a value into the store. + *'canBeForwarded' is a flag to indicate whether the request can be forwarded to + *other servers to perform the operation. + *Throws an exception if key is already present in the store. + * @param key + * @param value + * @param canBeForwarded + * @throws RemoteException + * @throws + */ + + public Boolean insert(long key, ValueTimeStamp value, boolean canBeForwarded,CONSISTENCY_LEVEL consistencyLevel) throws RemoteException { + if(!canBeForwarded) { + if(KVStore.containsKey(key)) + //throw new RemoteException("Key already present."); + return false; + KVStore.put(key, value); + if(consistencyLevel != null) + writeBuffer.add(new Operation("INSERT", consistencyLevel, new Date().toString(), key, value.value)); + return true; + } + + value.timeStamp = System.currentTimeMillis(); + try { + // Determine the target hosts using the getSuccessorNodeOf method + + String[] targetHosts = getTargetHosts(key); + + final ExecutorService pool = Executors.newFixedThreadPool(GroupMembership.replicationFactor); + final ExecutorCompletionService completionService = new ExecutorCompletionService(pool); + + for(int i = 0; i < GroupMembership.replicationFactor; i++) { + completionService.submit(new InsertThread(targetHosts[i],key,value,consistencyLevel)); + } + boolean result = waitForReplicaReplies(completionService, consistencyLevel); + pool.shutdown(); + return result; + } catch (Exception e) { + log.error("ERROR" , e); + throw new RemoteException("Error occured at Server"); + } + } + + /** + * The lookup api is used to lookup the value associated with a key. + * Throws an exception is key is not present in the store. + * @param key + * @param canBeForwarded + * @return + * @throws RemoteException + */ + public ValueTimeStamp lookup(long key, boolean canBeForwarded, CONSISTENCY_LEVEL consistencyLevel) throws RemoteException { + if(!canBeForwarded) { + if(!KVStore.containsKey(key)) + throw new RemoteException("Key not present."); + ValueTimeStamp value = KVStore.get(key); + readBuffer.add(new Operation("READ", consistencyLevel, new Date().toString(), key, value.value)); + return value; + } + + try { + String[] targetHosts = getTargetHosts(key); + + final ExecutorService pool = Executors.newFixedThreadPool(GroupMembership.replicationFactor); + final ExecutorCompletionService completionService = new ExecutorCompletionService(pool); + + for(int i = 0; i < GroupMembership.replicationFactor; i++) { + completionService.submit(new LookupThread(targetHosts[i],key,consistencyLevel)); + } + + ValueTimeStamp result = waitForReplicaRepliesForLookup(completionService, consistencyLevel); + pool.shutdown(); + return result; + } catch (Exception e) { + log.error("ERROR" , e); + throw new RemoteException("Error occured at Server"); + } + } + + /** + * The update api updates the value of the provided key with the new value. + * Throws an exception if the key is not present. + * @param key + * @param value + * @param canBeForwarded + * @throws RemoteException + */ + public Boolean update(long key, ValueTimeStamp value, boolean canBeForwarded, CONSISTENCY_LEVEL consistencyLevel) throws RemoteException { + if(!canBeForwarded) { + if(!KVStore.containsKey(key)) + throw new RemoteException("Key not present."); + if(KVStore.get(key).timeStamp < value.timeStamp) + KVStore.put(key, value); + if(consistencyLevel != null) + writeBuffer.add(new Operation("UPDATE", consistencyLevel, new Date().toString(), key, value.value)); + return true; + } + value.timeStamp = System.currentTimeMillis(); + try { + String[] targetHosts = getTargetHosts(key); + + final ExecutorService pool = Executors.newFixedThreadPool(GroupMembership.replicationFactor); + final ExecutorCompletionService completionService = new ExecutorCompletionService(pool); + + for(int i = 0; i < GroupMembership.replicationFactor; i++) { + completionService.submit(new UpdateThread(targetHosts[i],key,value,consistencyLevel)); + } + + boolean result = waitForReplicaReplies(completionService, consistencyLevel); + pool.shutdown(); + return result; + } catch (Exception e) { + log.error("ERROR" , e); + throw new RemoteException("Error occured at Server"); + } + } + + /** + * The delete api removes the key-value entry from the store. + * Throws an exception is the key is not present in the store. + * @param key + * @param canBeForwarded + * @throws RemoteException + */ + public Boolean delete(long key, boolean canBeForwarded,CONSISTENCY_LEVEL consistencyLevel) throws RemoteException { + if(!canBeForwarded) { + if(!KVStore.containsKey(key)) + //throw new RemoteException("Key not present."); + return false; + KVStore.remove(key); + if(consistencyLevel != null) + writeBuffer.add(new Operation("DELETE", consistencyLevel, new Date().toString(), key)); + return true; + } + + try { + String[] targetHosts = getTargetHosts(key); + + final ExecutorService pool = Executors.newFixedThreadPool(GroupMembership.replicationFactor); + final ExecutorCompletionService completionService = new ExecutorCompletionService(pool); + + for(int i = 0; i < GroupMembership.replicationFactor; i++) { + completionService.submit(new DeleteThread(targetHosts[i],key,consistencyLevel)); + } + + boolean result = waitForReplicaReplies(completionService, consistencyLevel); + pool.shutdown(); + return result; + } catch (Exception e) { + log.error("ERROR" , e); + throw new RemoteException("Error occured at Server"); + } + } + + /** + * Private api used only by ndoes to remap keys. + */ + public void lookupAndInsertInto(String hostname, long startKeyRange, + long endKeyRange) throws RemoteException { + BoltDBProtocol targetServer = null; + try { + targetServer = (BoltDBProtocol) Naming.lookup("rmi://" + hostname + "/KVStore"); + } catch (Exception e) { + log.error(e.getMessage()); + } + for(Entry e : KVStore.entrySet()) { + try { + long hashOfKey = GroupMembership.computeHash(e.getKey().toString()); + if (startKeyRange < endKeyRange) { + if (hashOfKey >= startKeyRange && hashOfKey <= endKeyRange) { + log.info("["+new Date()+"]Inserting " + hashOfKey + " from " + + GroupMembership.pid + " to " + hostname); + targetServer.insert(e.getKey(), e.getValue(), false,null); + } + } else { + if (hashOfKey >= startKeyRange || hashOfKey <= endKeyRange) { + log.info("["+new Date()+"]Inserting " + hashOfKey + " from " + + GroupMembership.pid + " to " + hostname); + targetServer.insert(e.getKey(), e.getValue(), false,null); + + } + } + } catch (NoSuchAlgorithmException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/ValueTimeStamp.java b/boltdb/src/main/java/edu/uiuc/boltdb/ValueTimeStamp.java new file mode 100644 index 0000000..fdff545 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/ValueTimeStamp.java @@ -0,0 +1,24 @@ +package edu.uiuc.boltdb; + +import java.io.Serializable; + +public class ValueTimeStamp implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -8293568407924175634L; + public String value; + public long timeStamp; + + public ValueTimeStamp(String value, long timeStamp) { + super(); + this.value = value; + this.timeStamp = timeStamp; + } + + @Override + public String toString() { + return new String("[" + value + " " +timeStamp+ "]"); + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/examples/MovieDBIndexer.java b/boltdb/src/main/java/edu/uiuc/boltdb/examples/MovieDBIndexer.java new file mode 100644 index 0000000..7fbc497 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/examples/MovieDBIndexer.java @@ -0,0 +1,71 @@ +package edu.uiuc.boltdb.examples; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.math.BigInteger; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.Map.Entry; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.ValueTimeStamp; + +public class MovieDBIndexer { + + /** + * @param args + * @throws IOException + * @throws NoSuchAlgorithmException + * @throws NotBoundException + */ + public static void main(String[] args) throws IOException, NoSuchAlgorithmException, NotBoundException { + // TODO Auto-generated method stub + Properties prop = new Properties(); + FileInputStream fis = new FileInputStream("./boltdb.prop"); + prop.load(fis); + fis.close(); + String rmiString = prop.getProperty("boltdb.kvstore.server"); + String rawFile = prop.getProperty("boltdb.kvstore.examples.movies.toIndex"); + + if(System.getSecurityManager() == null) { + System.setSecurityManager(new SecurityManager()); + } + BoltDBProtocol boltDBServer = (BoltDBProtocol) Naming.lookup("rmi://" + rmiString + "/KVStore"); + + HashMap hmap = new HashMap(); + BufferedReader br = new BufferedReader(new FileReader(rawFile)); + String line; + while((line = br.readLine()) != null) { + line = line.trim(); + StringTokenizer stk = new StringTokenizer(line); + while(stk.hasMoreTokens()) { + long key = computeHash(stk.nextToken()); + String curValue = hmap.get(key); + String value = ((curValue == null)?"":curValue) + "," + line; + hmap.put(key, value); + } + } + Iterator> itr = hmap.entrySet().iterator(); + while(itr.hasNext()) { + Entry entry = itr.next(); + boltDBServer.insert(entry.getKey(), new ValueTimeStamp(entry.getValue(),0), true,BoltDBProtocol.CONSISTENCY_LEVEL.ALL); + System.out.println("Successfully inserted " + entry.getKey() + " --> " + entry.getValue()); + } + br.close(); + } + public static long computeHash(String pid) throws NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("MD5"); + BigInteger bigInt = new BigInteger(1, md.digest(pid.getBytes())); + return Math.abs(bigInt.longValue()) % 1000001L; + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/examples/MovieSearchClient.java b/boltdb/src/main/java/edu/uiuc/boltdb/examples/MovieSearchClient.java new file mode 100644 index 0000000..c8f2bbd --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/examples/MovieSearchClient.java @@ -0,0 +1,111 @@ +package edu.uiuc.boltdb.examples; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.net.MalformedURLException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Properties; +import java.util.StringTokenizer; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.ValueTimeStamp; +import edu.uiuc.boltdb.groupmembership.GroupMembership; + +/** + * + * @author Adarsh + * + */ + +public class MovieSearchClient { + + private BoltDBProtocol boltDBServer = null; + + public MovieSearchClient(String rmiString) throws MalformedURLException, RemoteException, NotBoundException { + if(System.getSecurityManager() == null) { + System.setSecurityManager(new SecurityManager()); + } + boltDBServer = (BoltDBProtocol) Naming.lookup("rmi://" + rmiString + "/KVStore"); + } + + /** + * @param args + * @throws IOException + * @throws NotBoundException + */ + public static void main(String[] args) { + try { + Properties prop = new Properties(); + FileInputStream fis = new FileInputStream("./boltdb.prop"); + prop.load(fis); + fis.close(); + String rmiString = prop.getProperty("boltdb.kvstore.server"); + MovieSearchClient movieSearchClient = new MovieSearchClient(rmiString); + movieSearchClient.runClient(); + } catch(Exception e) { + e.printStackTrace(); + } + } + + private void runClient() throws IOException, NoSuchAlgorithmException { + String keyWordString = ""; + BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); + + System.out.println(); + System.out.println("-----------------------------------------------"); + System.out.println("MOVIE SEARCH"); + System.out.println("-----------------------------------------------"); + + while (true) { + System.out.print("Enter the keyword to search: "); + keyWordString = console.readLine(); + + // If the user entered a return, just loop again + if (keyWordString.equals("")) { + System.out.println("You did not enter anything"); + System.out.println(); + continue; + } + searchMovie(keyWordString); + } + } + + private void searchMovie(String keyWordString) throws NoSuchAlgorithmException { + try { + StringTokenizer stk = new StringTokenizer(keyWordString); + String keyWord = stk.nextToken(); + long key = computeHash(keyWord); + + ValueTimeStamp result = boltDBServer.lookup(key, true, BoltDBProtocol.CONSISTENCY_LEVEL.ONE); + + stk = new StringTokenizer(result.value, ","); + + System.out.println(); + System.out.println("-----------------------------------------------"); + System.out.println("Found " + stk.countTokens() + " movie titles matchin " + " \"" + keyWord + "\""); + System.out.println("-----------------------------------------------"); + while(stk.hasMoreTokens()) + System.out.println(stk.nextToken()); + System.out.println("-----------------------------------------------"); + + } catch(RemoteException re) { + if(re.getCause().getCause() != null) + System.out.println(re.getCause().getCause().getMessage()); + else + System.out.println(re.getCause().getMessage()); + } + } + + public static long computeHash(String pid) throws NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("MD5"); + BigInteger bigInt = new BigInteger(1, md.digest(pid.getBytes())); + return Math.abs(bigInt.longValue()) % 1000001L; + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/examples/StaleClient.java b/boltdb/src/main/java/edu/uiuc/boltdb/examples/StaleClient.java new file mode 100644 index 0000000..9cba93e --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/examples/StaleClient.java @@ -0,0 +1,111 @@ +package edu.uiuc.boltdb.examples; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.ValueTimeStamp; +import edu.uiuc.boltdb.groupmembership.HeartbeatIncrementerThread; + +public class StaleClient { + + private static volatile long i = 0; + static int stalelookups = 0; + static long noOfwrites = 0; + static long noOfreads = 0; + /** + * @param args + * @throws IOException + * @throws NotBoundException + */ + public static void main(String[] args) throws IOException, NotBoundException { + // TODO Auto-generated method stub + Properties prop = new Properties(); + FileInputStream fis = new FileInputStream("./boltdb.prop"); + prop.load(fis); + fis.close(); + String rmiString = prop.getProperty("boltdb.kvstore.server"); + if(System.getSecurityManager() == null) { + System.setSecurityManager(new SecurityManager()); + } + final BoltDBProtocol boltDBServer = (BoltDBProtocol) Naming.lookup("rmi://" + rmiString + "/KVStore"); + boltDBServer.insert(10, new ValueTimeStamp(String.valueOf(i), 0), true, BoltDBProtocol.CONSISTENCY_LEVEL.ONE); + final int readwait = Integer.parseInt(prop.getProperty("staleclient.readwait")); + final int writewait = Integer.parseInt(prop.getProperty("staleclient.writewait")); + + Runnable writeClient = new Runnable() { + + public void run() { + while(true) { + try { + boltDBServer.update(10, new ValueTimeStamp(String.valueOf(++i), 0), true, BoltDBProtocol.CONSISTENCY_LEVEL.ONE); + noOfwrites++; + Thread.sleep(writewait); + } catch (RemoteException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + + Runnable readClient = new Runnable() { + + public void run() { + while(true) { + try { + int result = Integer.parseInt(boltDBServer.lookup(10, true, BoltDBProtocol.CONSISTENCY_LEVEL.ONE).value); + noOfreads++; + if(result != i) { + stalelookups++; + } + Thread.sleep(readwait); + } catch (RemoteException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + + Runnable printStaleValue = new Runnable() { + + public void run() { + // TODO Auto-generated method stub + System.out.println("Stale values per sec:"+stalelookups +" Writes:"+noOfwrites+" Reads:"+noOfreads); + stalelookups = 0; + noOfreads = 0; + noOfwrites = 0; + } + }; + + ScheduledExecutorService scheduler = Executors + .newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(printStaleValue, 0, + Integer.parseInt(prop + .getProperty("staleclient.printfreq")), + TimeUnit.MILLISECONDS); + + Thread writeThread = new Thread(writeClient); + Thread readThread = new Thread(readClient); + writeThread.start(); + readThread.start(); + + + } + +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/GroupMembership.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/GroupMembership.java index ee5f2fc..4c6b8ca 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/GroupMembership.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/GroupMembership.java @@ -4,9 +4,21 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.math.BigInteger; import java.net.InetAddress; +import java.net.MalformedURLException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.ArrayList; import java.util.Date; +import java.util.Iterator; +import java.util.Map; import java.util.Properties; +import java.util.Map.Entry; +import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -17,48 +29,59 @@ import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; +import edu.uiuc.boltdb.*; import edu.uiuc.boltdb.groupmembership.beans.*; -/**This class is the entry point for Distributed Group Membership. - * It takes two arguments - - * 1) -contact : true if its the contact machine,false otherwise - * 2) -id : this is an argument to identify a machine and will be used to name its log file +/** + * This class is the entry point for Distributed Group Membership. It takes two + * arguments - 1) -contact : true if its the contact machine,false + * otherwise 2) -id : this is an argument to identify a machine and + * will be used to name its log file * - * Data structures : - * The membership list is maintained inside a ConcurrentHashMap. - * There are two advantages of doing this : - * 1. Thread Safety : Both read/write on the membership list is thread safe. - * 2. Fast : only a portion of the map is locked while writing into it and not the whole map. - * This means other READ threads can continue accessing the map while another writes to it. - * - * Parameters : - * This class needs 6 parameters to function and all of these come from a property file boltdb.prop. - * The parameters are contact hostname,gossip frequency, heartbeat increment frequency, tfail, refresh membership list frequency, lossrate(for 4th credit). + * Data structures : The membership list is maintained inside a + * ConcurrentHashMap. There are two advantages of doing + * this : 1. Thread Safety : Both read/write on the membership list is thread + * safe. 2. Fast : only a portion of the map is locked while writing into it and + * not the whole map. This means other READ threads can continue accessing the + * map while another writes to it. * - * The class initializes and starts the following threads : - * 1. ReceiveGossipThread - * 2. HeartbeatIncrementerThread - * 3. RefreshMembershipListThread - * 4. SendGossipThread + * Parameters : This class needs 6 parameters to function and all of these come + * from a property file boltdb.prop. The parameters are contact hostname,gossip + * frequency, heartbeat increment frequency, tfail, refresh membership list + * frequency, lossrate(for 4th credit). + * + * The class initializes and starts the following threads : 1. + * ReceiveGossipThread 2. HeartbeatIncrementerThread 3. + * RefreshMembershipListThread 4. SendGossipThread + * + * More detils about each of the thread can be found in the javadoc of its + * class. * - * More detils about each of the thread can be found in the javadoc of its class. * @author ashwin(ashanka2) - * + * */ -public class GroupMembership -{ +public class GroupMembership implements Runnable { private static org.apache.log4j.Logger log = Logger.getRootLogger(); - public static ConcurrentHashMap membershipList = new ConcurrentHashMap(); + public static volatile ConcurrentHashMap membershipList = new ConcurrentHashMap(); public static String pid = new String(); + public static long startTime; public static String pidDelimiter = "--"; public static long bandwidth = 0; - + public static int replicationFactor = 1; + private String[] args; + public static int tFail = 3; + + public GroupMembership(String args[]) { + this.args = args; + } + /** - * Initialize the logger. Set the name of the log file with the machineid passed as parameter. + * Initialize the logger. Set the name of the log file with the machineid + * passed as parameter. + * * @param serverId */ - public static void initializeLogger(String serverId) - { + public static void initializeLogger(String serverId) { FileAppender fa = new FileAppender(); fa.setName("FileLogger"); fa.setFile(serverId + ".log"); @@ -68,88 +91,412 @@ public static void initializeLogger(String serverId) fa.activateOptions(); log.addAppender(fa); } - - public static void main(String[] args) throws IOException, InterruptedException - { - //Command line parsing - if(args.length < 1 || !(args[0].equals("-contact"))) - { - System.out.println("Usage: groupmembership -contact [-id ]"); + + public void run() { + // Command line parsing + if (args.length < 1 || !(args[0].equals("-contact"))) { + System.out + .println("Usage: groupmembership -contact [-id ]"); System.exit(1); } - + boolean isContact = false; - if(args[1].equals("true")) + if (args[1].equals("true")) isContact = true; - - pid += InetAddress.getLocalHost().getHostName() + GroupMembership.pidDelimiter + (new Date().toString()); - if (args.length > 2 && args[2].equals("-id")) - { - pid += "-" + args[3]; - initializeLogger(args[3]); + try { + pid += InetAddress.getLocalHost().getHostName() + + GroupMembership.pidDelimiter + (new Date().toString()); + if (args.length > 2 && args[2].equals("-id")) { + pid += "-" + args[3]; + initializeLogger(args[3]); + } + + // Compute the hashvalue of yourself(server) + long hashValue = computeHash(pid); + + startTime = System.currentTimeMillis(); + + // Insert the current machine into the membership list with + // heartbeat=1. This single entry is going to be sent to the contact + // node for joining the cluster. + GroupMembership.membershipList.putIfAbsent( + GroupMembership.pid, + new MembershipBean( + InetAddress.getLocalHost().getHostName(), 1, System + .currentTimeMillis(), hashValue, false)); + + // Load all the parameters needed from the property file. + Properties prop = new Properties(); + FileInputStream fis = new FileInputStream("./boltdb.prop"); + prop.load(fis); + fis.close(); + + // Set the replicaton factor from the properties file + replicationFactor = Integer.parseInt(prop + .getProperty("groupmembership.rfactor")); + + // Start the thread that listens to gossip messages. + Thread receiveGossip = new Thread(new ReceiveGossipThread()); + receiveGossip.start(); + + // JOINING : This is the JOIN part . So,if this node is not the + // contact machine,then try to connect to the contact machine + // and send your membership list which contains just one entry ie + // current machine's details. + if (!isContact) { + int maxTries = 10; + while (maxTries-- > 0) { + new SendMembershipListThread( + prop.getProperty("groupmembership.contact"), 8764) + .start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (GroupMembership.membershipList.size() > 1) + break; + } + } + + // Get the tFail from property file + tFail = Integer.parseInt(prop.getProperty("groupmembership.tfail")); + + // ScheduledExecutorService is used to schedule all the threads + // mentioned in the class javadoc with frequency mentioned in + // property file + ScheduledExecutorService scheduler = Executors + .newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(new HeartbeatIncrementerThread(), 0, + Integer.parseInt(prop + .getProperty("groupmembership.heartbeat.freq")), + TimeUnit.MILLISECONDS); + scheduler + .scheduleAtFixedRate( + new RefreshMembershipListThread(tFail), + 0, + Integer.parseInt(prop + .getProperty("groupmembership.refreshMembershipList.freq")), + TimeUnit.MILLISECONDS); + scheduler.scheduleAtFixedRate( + new SendGossipThread(Integer.parseInt(prop + .getProperty("groupmembership.lossrate"))), 0, + Integer.parseInt(prop + .getProperty("groupmembership.gossip.freq")), + TimeUnit.MILLISECONDS); + // scheduler.scheduleAtFixedRate(new LogBandwidthThread(), 0, 60000, + // TimeUnit.MILLISECONDS); + + // VOLUTARY LEAVE : This is the code for voluntary leave part. + // Basically we wait for user to input the string "leave". + // Once the user enters "leave",all the threads are stopped. The + // heartbeat of the current node is set to -1 and + // one last gossip happens. + // Please note that we don't send the last message to everyone in + // the list. + BufferedReader bufferRead = new BufferedReader( + new InputStreamReader(System.in)); + while (true) { + // Read user's input + System.out.print("boltdb-server>"); + String commandString = bufferRead.readLine(); + if (commandString.equals("")) + continue; + if (commandString.equals("leave")) { + receiveGossip.stop(); + scheduler.shutdownNow(); + scheduler.awaitTermination(100, TimeUnit.MILLISECONDS); + MembershipBean mBean = membershipList.get(pid); + mBean.hearbeatLastReceived = -1; + mBean.timeStamp = System.currentTimeMillis(); + membershipList.put(pid, mBean); + Thread gossipOneLastTime = new Thread(new SendGossipThread( + 0)); + gossipOneLastTime.start(); + break; + } + /* + * The show command prints the current membershipList entries + * and the current KVStore entries on the console. + */ + else if (commandString.equals("shownodes")) { + System.out + .println("-------------------------------------------------"); + System.out.println("Membership List : "); + System.out + .println("-------------------------------------------------"); + for (Map.Entry entry : membershipList + .entrySet()) { + System.out.println(entry.getValue().hostname + " " + + entry.getValue().hashValue); + } + System.out + .println("-------------------------------------------------"); + System.out.println(); + } else if (commandString.equals("showKV")) { + System.out + .println("-------------------------------------------------"); + System.out.println("Key Value Store : "); + System.out + .println("-------------------------------------------------"); + long myHash = GroupMembership.membershipList + .get(GroupMembership.pid).hashValue; + long myPredecessor = GroupMembership.membershipList + .get(GroupMembership.getPredecessorNode(myHash)).hashValue; + int primaryCount = 0, replicaCount = 0; + for (Map.Entry entry : BoltDBServer.KVStore + .entrySet()) { + long hashOfKey = computeHash((new Long(entry.getKey())) + .toString()); + System.out.print(entry.getKey() + " ---> " + + entry.getValue() + " | Hash of Key - " + + hashOfKey + " "); + if (myHash > myPredecessor) { + if (hashOfKey > myPredecessor + && hashOfKey <= myHash) { + System.out.println("Primary"); + primaryCount++; + } else { + System.out.println("Replica"); + replicaCount++; + } + } else { + if (hashOfKey > myPredecessor + || hashOfKey <= myHash) { + System.out.println("Primary"); + primaryCount++; + } else { + System.out.println("Replica"); + replicaCount++; + } + } + } + System.out + .println("-------------------------------------------------"); + System.out.println("Primary - " + primaryCount + + " Replicas - " + replicaCount + " Total - " + + (primaryCount + replicaCount)); + System.out.println(); + } else if (commandString.equals("ring")) { + System.out + .println("-------------------------------------------------"); + System.out.println("Node Ring : "); + int noOfNodes = membershipList.size(); + long node = computeHash(this.pid); + while (noOfNodes-- > 0) { + System.out.print(node + " --> "); + node = computeHash(getSuccessorNode(node)); + } + System.out.println("looparound"); + System.out + .println("-------------------------------------------------"); + System.out.println(); + } else if (commandString.equals("show")) { + System.out + .println("-------------------------------------------------"); + System.out.println("RECENT READS"); + Iterator itr = BoltDBServer.readBuffer.iterator(); + while (itr.hasNext()) { + System.out.println((Operation) itr.next()); + } + System.out + .println("-------------------------------------------------"); + System.out.println("RECENT WRITES"); + itr = BoltDBServer.writeBuffer.iterator(); + while (itr.hasNext()) { + System.out.println((Operation) itr.next()); + } + } + } + } catch (Exception e) { + e.printStackTrace(); } - - //Insert the current machine into the membership list with heartbeat=1. This single entry is going to be sent to the contact node for joining the cluster. - GroupMembership.membershipList.putIfAbsent(GroupMembership.pid, new MembershipBean(InetAddress.getLocalHost().getHostName(), 1, System.currentTimeMillis(), false)); - - //Load all the paramters needed from the property file. - Properties prop = new Properties(); - FileInputStream fis = new FileInputStream("./boltdb.prop"); - prop.load(fis); - fis.close(); - - //Start the thread that listens to gossip messages. - Thread receiveGossip = new Thread(new ReceiveGossipThread()); - receiveGossip.start(); - - //JOINING : This is the JOIN part . So,if this node is not the contact machine,then try to connect to the contact machine - //and send your membership list which contains just one entry ie current machine's details. - if (!isContact) - { - int maxTries = 10; - while(maxTries-- > 0) - { - new SendMembershipListThread(prop.getProperty("groupmembership.contact"), 8764).start(); + + } + + /** + * Computes the MD5 hash of the pid,transforms it into an integer and hashes + * it in the range 0-1 million. + * + * @param pid + * @return + * @throws NoSuchAlgorithmException + */ + public static long computeHash(String pid) throws NoSuchAlgorithmException { + if (pid.length() <= 6 && !pid.isEmpty()) + return Long.parseLong(pid); + + MessageDigest md = MessageDigest.getInstance("MD5"); + BigInteger bigInt = new BigInteger(1, md.digest(pid.getBytes())); + return Math.abs(bigInt.longValue()) % 1000001L; + } + + /** + * Returns the node which is the successor of a key. + * + * @param keyHash + * @return + */ + public static String getSuccessorNode(long aNode) { + Iterator> itr = GroupMembership.membershipList + .entrySet().iterator(); + // Set the minimum clockwise distance to be maximum possible value + long minClockwiseDistance = 1000000L; + String successorNode = new String(); + while (itr.hasNext()) { + Entry entry = itr.next(); + // Ignore if the entry is yourself(Server) + if (entry.getValue().hashValue == aNode) + continue; + long hashCurrent = entry.getValue().hashValue; + // compute the clockwise distance + long clockWiseDistance = aNode > hashCurrent ? 1000000l - (aNode - hashCurrent) + : hashCurrent - aNode; + // Update minimum clockwise distance if required + if (minClockwiseDistance > clockWiseDistance) { + minClockwiseDistance = clockWiseDistance; + successorNode = entry.getKey(); + } + } + return successorNode; + } + + /** + * Returns the node which is the predecessor of a key. + * + * @param keyHash + * @return + */ + public static String getPredecessorNode(long aNode) { + Iterator> itr = GroupMembership.membershipList + .entrySet().iterator(); + // Set the maximum clockwise distance to be minimum possible value + long maxClockwiseDistance = 0L; + String predecessorNode = new String(); + while (itr.hasNext()) { + Entry entry = itr.next(); + // Ignore if the entry is yourself(Server) + if (entry.getValue().hashValue == aNode) + continue; + long hashCurrent = entry.getValue().hashValue; + // compute the clockwise distance + long clockWiseDistance = aNode > hashCurrent ? 1000000L - (aNode - hashCurrent) + : hashCurrent - aNode; + // Update minimum clockwise distance if required + if (maxClockwiseDistance < clockWiseDistance) { + maxClockwiseDistance = clockWiseDistance; + predecessorNode = entry.getKey(); + } + } + return predecessorNode; + } + + /* + * This method is called by a node to check if its in the successor rereplication segment + */ + public static int inSuccReReplicationSeg(long thisNode, long failedNode) + throws NoSuchAlgorithmException { + int k = replicationFactor; + while (k-- > 0) { + if ((failedNode = computeHash(getSuccessorNode(failedNode))) == thisNode) + return (replicationFactor - k); + } + return -1; + } + + /* + * This method is called by a node to check if its in the predecessor rereplication segment + */ + + public static int inPredReReplicationSeg(long thisNode, long failedNode) + throws NoSuchAlgorithmException { + int k = replicationFactor; + while (k-- > 1) { + if ((failedNode = computeHash(getPredecessorNode(failedNode))) == thisNode) + return (replicationFactor - k); + } + return -1; + } + + /* + * This method is used to get the kth successor of a node + */ + + public static String getKthSuccessorNode(long aNode, int k) + throws NoSuchAlgorithmException { + String successorNode = new String(); + while (k-- > 0) { + successorNode = getSuccessorNode(aNode); + aNode = computeHash(successorNode); + } + return successorNode; + } + + /* + * This method is called to handle a crash + */ + + public synchronized static void handleCrash(long hashCrashedNode) + throws NoSuchAlgorithmException, MalformedURLException, + NotBoundException { + if (membershipList.size() <= 3) + return; + long myhash = membershipList.get(GroupMembership.pid).hashValue; + int successorPosition = inSuccReReplicationSeg(myhash, hashCrashedNode); + if (successorPosition != -1) { + long startKey, endKey; + String targetNode; + if (successorPosition == replicationFactor) { + startKey = membershipList + .get(getPredecessorNode(hashCrashedNode)).hashValue + 1; + endKey = hashCrashedNode; + targetNode = getSuccessorNode(hashCrashedNode); + } else { + targetNode = getKthPredecessorNode(hashCrashedNode, + replicationFactor - successorPosition); + startKey = membershipList.get(getPredecessorNode(membershipList + .get(targetNode).hashValue)).hashValue + 1; + endKey = membershipList.get(targetNode).hashValue; + } + + BoltDBProtocol targetRMIServer = null; + int i = 0; + while (i++ < replicationFactor - 1) { try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + targetRMIServer = (BoltDBProtocol) Naming.lookup("rmi://" + + membershipList.get(targetNode).hostname + + "/KVStore"); + targetRMIServer.lookupAndInsertInto( + membershipList.get(pid).hostname, startKey, endKey); + break; + } catch (RemoteException e1) { + System.out.println("Exception while connecting to " + + targetNode + " " + e1.getMessage()); + targetNode = getSuccessorNode(GroupMembership.membershipList + .get(targetNode).hashValue); + continue; } - if(GroupMembership.membershipList.size() > 1) break; } - } - - //Get the tFail from property file - int tFail = Integer.parseInt(prop.getProperty("groupmembership.tfail")); - - //ScheduledExecutorService is used to schedule all the threads mentioned in the class javadoc with frequency mentioned in property file - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - scheduler.scheduleAtFixedRate(new HeartbeatIncrementerThread(), 0, Integer.parseInt(prop.getProperty("groupmembership.heartbeat.freq")), TimeUnit.MILLISECONDS); - scheduler.scheduleAtFixedRate(new RefreshMembershipListThread(tFail), 0, Integer.parseInt(prop.getProperty("groupmembership.refreshMembershipList.freq")), TimeUnit.MILLISECONDS); - scheduler.scheduleAtFixedRate(new SendGossipThread(Integer.parseInt(prop.getProperty("groupmembership.lossrate"))), 0, Integer.parseInt(prop.getProperty("groupmembership.gossip.freq")), TimeUnit.MILLISECONDS); - //scheduler.scheduleAtFixedRate(new LogBandwidthThread(), 0, 60000, TimeUnit.MILLISECONDS); - - //VOLUTARY LEAVE : This is the code for voluntary leave part. Basically we wait for user to input the string "leave". - //Once the user enters "leave",all the threads are stopped. The heartbeat of the current node is set to -1 and - //one last gossip happens. - //Please note that we don't send the last message to everyone in the list. - BufferedReader bufferRead = new BufferedReader(new InputStreamReader(System.in)); - while(true) { - String s = bufferRead.readLine(); - if(s.equals("leave")) { - receiveGossip.stop(); - scheduler.shutdownNow(); - scheduler.awaitTermination(100, TimeUnit.MILLISECONDS); - MembershipBean mBean = membershipList.get(pid); - mBean.hearbeatLastReceived = -1; - mBean.timeStamp = System.currentTimeMillis(); - membershipList.put(pid, mBean); - Thread gossipOneLastTime = new Thread(new SendGossipThread(0)); - gossipOneLastTime.start(); - break; + + if (targetRMIServer == null) { + System.out.println("Problem replicating keys during crash"); + log.error("Problem replicating keys during crash"); + return; } } + } + /* + * This method gets the kth predecessor of a node + */ + public static String getKthPredecessorNode(long aNode, int k) + throws NoSuchAlgorithmException { + String predecessorNode = new String(); + while (k-- > 0) { + predecessorNode = getPredecessorNode(aNode); + aNode = computeHash(predecessorNode); + } + return predecessorNode; } } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/HeartbeatIncrementerThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/HeartbeatIncrementerThread.java index 3dbf562..ebee07b 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/HeartbeatIncrementerThread.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/HeartbeatIncrementerThread.java @@ -6,33 +6,26 @@ /** * This class increments the hearbeat of the current node. - * + * */ -public class HeartbeatIncrementerThread implements Runnable -{ - //@Override - public void run() - { - try - { - MembershipBean entry = GroupMembership.membershipList.get(GroupMembership.pid); - if(entry == null) - { - GroupMembership.membershipList.putIfAbsent(GroupMembership.pid, new MembershipBean(InetAddress.getLocalHost().getHostAddress(), 1, System.currentTimeMillis(), false)); - } - else - { - //Dont update the heartbeat if its less than zero which means the node has voluntarily left. - if(entry.hearbeatLastReceived <= 0) return; - //Increment the heartbeat - entry.hearbeatLastReceived++; - //Update the timestamp - entry.timeStamp = System.currentTimeMillis(); - GroupMembership.membershipList.put(GroupMembership.pid, entry); - } - } - catch(Exception e) - { +public class HeartbeatIncrementerThread implements Runnable { + // @Override + public void run() { + try { + MembershipBean entry = GroupMembership.membershipList + .get(GroupMembership.pid); + + // Dont update the heartbeat if its less than zero which means the + // node has voluntarily left. + if (entry.hearbeatLastReceived <= 0) + return; + // Increment the heartbeat + entry.hearbeatLastReceived++; + // Update the timestamp + entry.timeStamp = System.currentTimeMillis(); + GroupMembership.membershipList.put(GroupMembership.pid, entry); + + } catch (Exception e) { System.out.println("EXCEPTION:In HeartbeatIncrementerThread"); } } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/MergeThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/MergeThread.java index 705491a..6975508 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/MergeThread.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/MergeThread.java @@ -2,10 +2,19 @@ import java.io.IOException; import java.lang.reflect.Type; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.UnknownHostException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.security.NoSuchAlgorithmException; +import java.security.acl.Group; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import org.apache.log4j.Logger; @@ -13,6 +22,9 @@ import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.BoltDBServer; +import edu.uiuc.boltdb.ValueTimeStamp; import edu.uiuc.boltdb.groupmembership.beans.MembershipBean; import edu.uiuc.boltdb.groupmembership.beans.UDPBean; @@ -28,11 +40,12 @@ public class MergeThread implements Runnable Map incomingMembershipList = null; String receivedJson = new String(); String sentHost; - - public MergeThread(String sentHost, String json) + Object lock; + public MergeThread(String sentHost, String json, Object lock) { this.sentHost = sentHost; this.receivedJson = json; + this.lock = lock; } public void run() @@ -46,8 +59,13 @@ public void run() System.out.println("Problem receiving gossip"); return; } - mergeIncomingMembershipList(); - //System.out.println("\nAFTER MERGE : "+GroupMembership.membershipList); + try { + synchronized (lock) { + mergeIncomingMembershipList(); + } + } catch (Exception e) { + e.printStackTrace(); + } } /** @@ -63,8 +81,13 @@ private void getGossipFromClient() throws IOException /** * Merge the incoming membership list into the current node's membership list + * @throws UnknownHostException + * @throws NotBoundException + * @throws RemoteException + * @throws MalformedURLException + * @throws NoSuchAlgorithmException */ - private void mergeIncomingMembershipList() + private void mergeIncomingMembershipList() throws UnknownHostException, MalformedURLException, RemoteException, NotBoundException, NoSuchAlgorithmException { Iterator> iterator = incomingMembershipList.entrySet().iterator(); //Iterate over each entry of incoming membershiplist @@ -89,8 +112,8 @@ private void mergeIncomingMembershipList() //VOLUNTARILY LEAVE : If the incoming entry has heartbeat less than zero,then log 'VOLUNTARILY LEFT' message. //Also update current node's membership list if(receivedMBean.hearbeatLastReceived <= 0 && currentMBean.hearbeatLastReceived > 0) { - System.out.println("VOLUNTARILY LEFT : " + receivedPid+ " at "+(new Date()).toString()); - log.info("VOLUNTARILY LEFT - - - " + receivedPid); + //System.out.println("VOLUNTARILY LEFT : " + receivedPid+ " at "+(new Date()).toString()); + log.info("["+new Date()+"]VOLUNTARILY LEFT - - - " + receivedPid); currentMBean.hearbeatLastReceived = -1; currentMBean.timeStamp = System.currentTimeMillis(); currentMBean.toBeDeleted = false; @@ -99,30 +122,181 @@ private void mergeIncomingMembershipList() } else if (receivedMBean.hearbeatLastReceived <= 0 || currentMBean.hearbeatLastReceived <= 0) continue; //If the incoming entry's heartbeat is greater than current node's membership list,then update the list. - if(receivedMBean.hearbeatLastReceived > currentMBean.hearbeatLastReceived) - { - currentMBean.hearbeatLastReceived = receivedMBean.hearbeatLastReceived; - currentMBean.timeStamp = System.currentTimeMillis(); - if(currentMBean.toBeDeleted) { - System.out.println("JOINED : " + receivedPid); - currentMBean.toBeDeleted = false; - } - GroupMembership.membershipList.put(receivedPid, currentMBean); - } - } - else + if(receivedMBean.hearbeatLastReceived > currentMBean.hearbeatLastReceived) + { + currentMBean.hearbeatLastReceived = receivedMBean.hearbeatLastReceived; + currentMBean.timeStamp = System.currentTimeMillis(); + if(currentMBean.toBeDeleted) { + //System.out.println("JOINED : " + receivedPid); + log.info("["+new Date()+"]JOINED : " + receivedPid); + currentMBean.toBeDeleted = false; + } + GroupMembership.membershipList.put(receivedPid, currentMBean); + } + } + else if(!GroupMembership.membershipList.containsKey(receivedPid) && receivedMBean.hearbeatLastReceived > 0) { //JOIN : If the incoming entry is not in our membership list then it means a new node has joined. - if(receivedMBean.hearbeatLastReceived <= 0) continue; - String receivedHost = receivedPid.split(GroupMembership.pidDelimiter)[0]; - MembershipBean mBean = new MembershipBean(receivedHost, receivedMBean.hearbeatLastReceived, System.currentTimeMillis(), false); - MembershipBean returnVal = GroupMembership.membershipList.putIfAbsent(receivedPid, mBean); - if (returnVal == null) - { - System.out.println("JOINED : " + receivedPid+" at "+(new Date()).toString()); - log.info("JOINED - - - " + receivedPid); + //if(receivedMBean.hearbeatLastReceived <= 0) continue; + String receivedHost = receivedPid + .split(GroupMembership.pidDelimiter)[0]; + MembershipBean mBean = new MembershipBean(receivedHost, + receivedMBean.hearbeatLastReceived, + System.currentTimeMillis(), receivedMBean.hashValue, + false); + + //System.out.println("JOINED : " + receivedPid+" at "+(new + // Date()).toString()); + log.info("["+new Date()+"]JOINED - - - " + receivedPid); + // Get the successor of newly joined node + if((System.currentTimeMillis() - GroupMembership.startTime) > (GroupMembership.tFail * 1000)) { + + boolean amISuccessor = amITheSuccesorOf(receivedMBean.hashValue); + if (amISuccessor) { + moveKeysSucc(receivedHost, mBean.hashValue); + } else if (GroupMembership.membershipList.size() >= 3) { + int amIInPredReReplicationSeg = GroupMembership + .inPredReReplicationSeg( + GroupMembership.membershipList + .get(GroupMembership.pid).hashValue, + mBean.hashValue); + if (amIInPredReReplicationSeg != -1) { + moveKeysPred(receivedHost, mBean.hashValue, + amIInPredReReplicationSeg); + } + } + } + GroupMembership.membershipList.putIfAbsent(receivedPid, mBean); + + } + } + } + + /** + * Check if I am the successor of the newly joined node. + * @param receivedPid + * @return + * @throws UnknownHostException + */ + private boolean amITheSuccesorOf(long hashOfNewlyJoinedNode) throws UnknownHostException { + if(GroupMembership.membershipList.get(GroupMembership.getSuccessorNode(hashOfNewlyJoinedNode)).hostname.equals(InetAddress.getLocalHost().getHostName())) return true; + return false; + } + + /** + * Move keys from yourself to the newly joined node just like in the chord paper + * @param targetHost + * @param hashOfNewJoinedNode + * @throws MalformedURLException + * @throws RemoteException + * @throws NotBoundException + * @throws NoSuchAlgorithmException + */ + private void moveKeysSucc(String targetHost, long hashOfNewJoinedNode) throws MalformedURLException, RemoteException, NotBoundException, NoSuchAlgorithmException { + //get the rmiserver handle from the rmi registry + BoltDBProtocol targetRMIServer = (BoltDBProtocol) Naming.lookup("rmi://" + targetHost + "/KVStore"); + Iterator> itr = BoltDBServer.KVStore.entrySet().iterator(); + long myHash = GroupMembership.membershipList.get(GroupMembership.pid).hashValue; + + // Get the rmiserver handle for successor's successor from the rmi registry + String succSuccessorHost = GroupMembership.membershipList.get(GroupMembership.getKthSuccessorNode(myHash, 2)).hostname; + BoltDBProtocol succSuccRMIServer = (BoltDBProtocol) Naming.lookup("rmi://" + succSuccessorHost + "/KVStore"); + long predecessorHash = myHash; + if(GroupMembership.membershipList.size() > 1) + predecessorHash = GroupMembership.membershipList.get(GroupMembership.getKthPredecessorNode(myHash, 1)).hashValue; + + while(itr.hasNext()) { + Entry entry = itr.next(); + long hashOfKey = GroupMembership.computeHash(entry.getKey().toString()); + if (predecessorHash == myHash + || amIPrimaryReplicaFor(hashOfKey, myHash, predecessorHash)) { + // If hash of current server is greater than hash of newly + // joined server + // then move all the keys greater than hash of current server + // and less than newly joined server. + if (myHash > hashOfNewJoinedNode) { + if (hashOfKey > myHash || hashOfKey <= hashOfNewJoinedNode) { + log.info("["+new Date()+"]Inserting key :" + entry.getKey() + + " value:" + entry.getValue() + " from Me to " + + targetHost); + targetRMIServer.insert(entry.getKey(), + entry.getValue(), false, null); + // BoltDBServer.KVStore.remove(entry.getKey()); + // Delete this key in successor's successor + + if (GroupMembership.membershipList.size() >= 3) { + log.info("["+new Date()+"]Deleting key :" + + entry.getKey() + " from " + + succSuccessorHost); + succSuccRMIServer.delete(entry.getKey(), false, null); + } + } + } + // If hash of current server is less than hash of newly joined + // server + // then move all the keys in between the two servers hashes. + else { + if (hashOfKey > myHash && hashOfKey <= hashOfNewJoinedNode) { + log.info("["+new Date()+"]Inserting key :" + entry.getKey() + + " value:" + entry.getValue() + " from Me to " + + targetHost); + targetRMIServer.insert(entry.getKey(), + entry.getValue(), false,null); + // BoltDBServer.KVStore.remove(entry.getKey()); + // Delete this key in successor's successor + if (GroupMembership.membershipList.size() >= 3) { + log.info("["+new Date()+"]Deleting key :" + + entry.getKey() + " from " + + succSuccessorHost); + succSuccRMIServer.delete(entry.getKey(), false, null); + } + } } } } } + + private boolean amIPrimaryReplicaFor(long key,long myHash,long predecessorHash) { + if (myHash > predecessorHash) { + if (key > predecessorHash && key <= myHash) return true; + } else { + if (key > predecessorHash || key <= myHash) return true; + } + return false; + } + private void moveKeysPred(String targetHost, long hashOfNewJoinedNode, int predecessorPosition) throws MalformedURLException, RemoteException, NotBoundException, NoSuchAlgorithmException { + //get the rmiserver handle from the rmi registry + BoltDBProtocol targetRMIServer = (BoltDBProtocol) Naming.lookup("rmi://" + targetHost + "/KVStore"); + long myHash = GroupMembership.membershipList.get(GroupMembership.pid).hashValue; + + // Get the rmiserver handle for (k-p)th successor from the rmi registry + String kpthSuccHost = GroupMembership.membershipList.get(GroupMembership.getKthSuccessorNode(hashOfNewJoinedNode, (GroupMembership.replicationFactor - predecessorPosition))).hostname; + BoltDBProtocol kpthSuccRMIServer = (BoltDBProtocol) Naming.lookup("rmi://" + kpthSuccHost + "/KVStore"); + + long myPredecessor = GroupMembership.membershipList.get(GroupMembership.getPredecessorNode(myHash)).hashValue; + Iterator> itr = BoltDBServer.KVStore.entrySet().iterator(); + while(itr.hasNext()) { + Entry entry = itr.next(); + long hashOfKey = GroupMembership.computeHash(entry.getKey().toString()); + if (myHash > myPredecessor) { + if ( hashOfKey > myPredecessor && hashOfKey <= myHash) { + log.info("["+new Date()+"]Inserting key :" + entry.getKey() + " from Me to " + targetHost); + targetRMIServer.insert(entry.getKey(), entry.getValue(),false, null); + + log.info("["+new Date()+"]Deleting key :" + entry.getKey() + " from " + kpthSuccHost); + kpthSuccRMIServer.delete(entry.getKey(), false, null); + } + } + else { + if ( hashOfKey > myPredecessor || hashOfKey <= myHash) { + log.info("["+new Date()+"]Inserting key :" + entry.getKey() + " from Me to " + targetHost); + targetRMIServer.insert(entry.getKey(), entry.getValue(),false, null); + + log.info("["+new Date()+"]Deleting key :" + entry.getKey() + " from " + kpthSuccHost); + kpthSuccRMIServer.delete(entry.getKey(), false, null); + + } + } + } + } } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/ReceiveGossipThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/ReceiveGossipThread.java index d77d9f4..fc6888d 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/ReceiveGossipThread.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/ReceiveGossipThread.java @@ -12,10 +12,11 @@ public class ReceiveGossipThread implements Runnable { private DatagramSocket serverSocket; private int gossipPort = 8764; - + private Object lock; public ReceiveGossipThread() throws IOException { serverSocket = new DatagramSocket(gossipPort); + lock = new Object(); } public void run() @@ -38,7 +39,7 @@ public void run() //System.out.println("Packet received:"+(receive.getLength() + 8)); String receivedJson = new String(receive.getData()); String sentHost = receive.getAddress().getHostName(); - MergeThread mergeThread = new MergeThread(sentHost,receivedJson.trim()); + MergeThread mergeThread = new MergeThread(sentHost,receivedJson.trim(),lock); new Thread(mergeThread).start(); } } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/RefreshMembershipListThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/RefreshMembershipListThread.java index c1cda11..a442aa1 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/RefreshMembershipListThread.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/RefreshMembershipListThread.java @@ -1,5 +1,8 @@ package edu.uiuc.boltdb.groupmembership; +import java.net.MalformedURLException; +import java.rmi.NotBoundException; +import java.security.NoSuchAlgorithmException; import java.util.Date; import java.util.Iterator; import java.util.Map; @@ -42,19 +45,26 @@ public void run() } //Remove entry which is marked toBeDeleted - if (membershipBean.toBeDeleted) + if (membershipBean.toBeDeleted && ((System.currentTimeMillis() - membershipBean.timeStamp) >= (2 * tFail * 1000))) { + try { + GroupMembership.handleCrash(membershipBean.hashValue); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } GroupMembership.membershipList.remove(entry.getKey()); } //If the membership list entry has timed-out then mark it toBeDeleted - else if (System.currentTimeMillis() - membershipBean.timeStamp >= tFail * 1000) + else if (System.currentTimeMillis() - membershipBean.timeStamp >= tFail * 1000 && !membershipBean.toBeDeleted) { membershipBean.toBeDeleted = true; if (membershipBean.hearbeatLastReceived > 0) { - System.out.println("CRASHED : " + entry.getKey() +" at " + new Date().toString()); - log.info("CRASHED - - - " + entry.getKey()); + //System.out.println("CRASHED : " + entry.getKey() +" at " + new Date().toString()); + log.info("["+new Date()+"]CRASHED - - - " + entry.getKey()); } } + } } } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/SendMembershipListThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/SendMembershipListThread.java index a0f3e14..8f288cc 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/SendMembershipListThread.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/SendMembershipListThread.java @@ -49,7 +49,7 @@ public void run() Map.Entry entry = iterator.next(); if(entry.getValue().toBeDeleted) continue; - listToSend.put(entry.getKey(), new UDPBean(entry.getValue().hearbeatLastReceived)); + listToSend.put(entry.getKey(), new UDPBean(entry.getValue().hearbeatLastReceived,entry.getValue().hashValue)); } String json = gson.toJson(listToSend, typeOfHashMap); byte[] jsonBytes = json.getBytes(); diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/MembershipBean.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/MembershipBean.java index 3b03a67..c618037 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/MembershipBean.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/MembershipBean.java @@ -10,16 +10,17 @@ public class MembershipBean extends UDPBean { public boolean toBeDeleted; public MembershipBean(String ipaddress, long hearbeatLastReceived, long timeStamp, - boolean toBeDeleted) { - super(hearbeatLastReceived); + long hashValue, boolean toBeDeleted) { + super(hearbeatLastReceived, hashValue); this.hostname = ipaddress; this.timeStamp = timeStamp; + this.hashValue = hashValue; this.toBeDeleted = toBeDeleted; } @Override public String toString() { - return new String("[" + hostname + " " +hearbeatLastReceived+" "+timeStamp+" "+toBeDeleted+"]"); + return new String("[" + hostname + " " +hearbeatLastReceived+" "+timeStamp+" "+toBeDeleted+ " " + hashValue + "]"); } } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/Operation.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/Operation.java new file mode 100644 index 0000000..396c682 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/Operation.java @@ -0,0 +1,43 @@ +package edu.uiuc.boltdb.groupmembership.beans; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.BoltDBProtocol.CONSISTENCY_LEVEL; + +public class Operation { + + String operation; + BoltDBProtocol.CONSISTENCY_LEVEL consistency; + String time; + long key; + String value; + + public Operation(String operation, BoltDBProtocol.CONSISTENCY_LEVEL consistency, String time, + long key, String value) { + super(); + this.operation = operation; + this.consistency = consistency; + this.time = time; + this.key = key; + this.value = value; + } + + + public Operation(String operation, CONSISTENCY_LEVEL consistency, + String time, long key) { + super(); + this.operation = operation; + this.consistency = consistency; + this.time = time; + this.key = key; + } + + + @Override + public String toString() { + if (operation.equals("DELETE")) { + return "[OPERATION: "+operation+" CONSISTENCY: "+consistency+" TIME: "+time+" KEY: "+key+"]"; + } + return "[OPERATION: "+operation+" CONSISTENCY: "+consistency+" TIME: "+time+" KEY: "+key+" VALUE: "+value+"]"; + } + +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/UDPBean.java b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/UDPBean.java index d380022..73550be 100644 --- a/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/UDPBean.java +++ b/boltdb/src/main/java/edu/uiuc/boltdb/groupmembership/beans/UDPBean.java @@ -7,10 +7,11 @@ public class UDPBean { public long hearbeatLastReceived; - - public UDPBean(long hearbeatLastReceived) { + public long hashValue; + public UDPBean(long hearbeatLastReceived, long hashValue) { super(); this.hearbeatLastReceived = hearbeatLastReceived; + this.hashValue = hashValue; } diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/methods/DeleteThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/methods/DeleteThread.java new file mode 100644 index 0000000..e3d7e05 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/methods/DeleteThread.java @@ -0,0 +1,48 @@ +package edu.uiuc.boltdb.methods; + +import java.net.InetAddress; +import java.rmi.Naming; +import java.util.Date; +import java.util.concurrent.Callable; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.BoltDBServer; +import edu.uiuc.boltdb.BoltDBProtocol.CONSISTENCY_LEVEL; +import edu.uiuc.boltdb.groupmembership.GroupMembership; +import edu.uiuc.boltdb.groupmembership.beans.Operation; + +public class DeleteThread implements Callable { + private String targetHost; + private Long key; + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel; + + public DeleteThread(String targetHost, Long key, + CONSISTENCY_LEVEL consistencyLevel) { + super(); + this.targetHost = targetHost; + this.key = key; + this.consistencyLevel = consistencyLevel; + } + + public Boolean call() { + try { + if (GroupMembership.membershipList.get(targetHost).hostname.equals(InetAddress.getLocalHost().getHostName())) { + if(!BoltDBServer.KVStore.containsKey(key)) + return false; + //throw new RemoteException("Key not present."); + BoltDBServer.KVStore.remove(key); + if(consistencyLevel != null) + BoltDBServer.writeBuffer.add(new Operation("DELETE", consistencyLevel, new Date().toString(), key)); + return true; + } else { + BoltDBProtocol targetServer = (BoltDBProtocol) Naming + .lookup("rmi://" + GroupMembership.membershipList.get(targetHost).hostname + "/KVStore"); + return targetServer.delete(key, false, consistencyLevel); + } + } catch(Exception e) { + System.out.println("Exception in delete thread"); + return false; + } + } + +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/methods/InsertThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/methods/InsertThread.java new file mode 100644 index 0000000..b0dab21 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/methods/InsertThread.java @@ -0,0 +1,53 @@ +package edu.uiuc.boltdb.methods; + +import java.net.InetAddress; +import java.rmi.Naming; +import java.rmi.RemoteException; +import java.util.Date; +import java.util.concurrent.Callable; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.BoltDBProtocol.CONSISTENCY_LEVEL; +import edu.uiuc.boltdb.BoltDBServer; +import edu.uiuc.boltdb.ValueTimeStamp; +import edu.uiuc.boltdb.groupmembership.GroupMembership; +import edu.uiuc.boltdb.groupmembership.beans.Operation; + +public class InsertThread implements Callable { + private String targetHost; + private Long key; + private ValueTimeStamp value; + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel; + + public InsertThread(String targetHost, Long key, ValueTimeStamp value, + CONSISTENCY_LEVEL consistencyLevel) { + super(); + this.targetHost = targetHost; + this.key = key; + this.value = value; + this.consistencyLevel = consistencyLevel; + } + + + public Boolean call() { + try { + if (GroupMembership.membershipList.get(targetHost).hostname.equals(InetAddress.getLocalHost().getHostName())) { + if (BoltDBServer.KVStore.containsKey(key)) + return false; + //throw new RemoteException("Key already present."); + BoltDBServer.KVStore.put(key, value); + if(consistencyLevel != null) + BoltDBServer.writeBuffer.add(new Operation("INSERT", consistencyLevel, new Date().toString(), key, value.value)); + return true; + } else { + BoltDBProtocol targetServer = (BoltDBProtocol) Naming + .lookup("rmi://" + GroupMembership.membershipList.get(targetHost).hostname + "/KVStore"); + return targetServer.insert(key, value, false, consistencyLevel); + } + } catch(Exception e) { + System.out.println("Exception in insert thread"); + return false; + } + } + +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/methods/LookupThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/methods/LookupThread.java new file mode 100644 index 0000000..f4b1beb --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/methods/LookupThread.java @@ -0,0 +1,51 @@ +package edu.uiuc.boltdb.methods; + +import java.net.InetAddress; +import java.rmi.Naming; +import java.rmi.RemoteException; +import java.util.Date; +import java.util.concurrent.Callable; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.BoltDBServer; +import edu.uiuc.boltdb.BoltDBProtocol.CONSISTENCY_LEVEL; +import edu.uiuc.boltdb.ValueTimeStamp; +import edu.uiuc.boltdb.groupmembership.GroupMembership; +import edu.uiuc.boltdb.groupmembership.beans.Operation; + +public class LookupThread implements Callable { + private String targetHost; + private Long key; + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel; + + public LookupThread(String targetHost, Long key, + CONSISTENCY_LEVEL consistencyLevel) { + super(); + this.targetHost = targetHost; + this.key = key; + this.consistencyLevel = consistencyLevel; + } + + public ValueTimeStamp call() { + try { + if (GroupMembership.membershipList.get(targetHost).hostname + .equals(InetAddress.getLocalHost().getHostName())) { + if(!BoltDBServer.KVStore.containsKey(key)) + return null; + //throw new RemoteException("Key not present."); + ValueTimeStamp value = BoltDBServer.KVStore.get(key); + BoltDBServer.readBuffer.add(new Operation("READ", consistencyLevel, new Date().toString(), key, value.value)); + return value; + } else { + BoltDBProtocol targetServer = (BoltDBProtocol) Naming + .lookup("rmi://" + + GroupMembership.membershipList + .get(targetHost).hostname + "/KVStore"); + return targetServer.lookup(key, false, consistencyLevel); + } + } catch (Exception e) { + System.out.println("Exception in lookup thread"); + return null; + } + } +} diff --git a/boltdb/src/main/java/edu/uiuc/boltdb/methods/UpdateThread.java b/boltdb/src/main/java/edu/uiuc/boltdb/methods/UpdateThread.java new file mode 100644 index 0000000..63f3160 --- /dev/null +++ b/boltdb/src/main/java/edu/uiuc/boltdb/methods/UpdateThread.java @@ -0,0 +1,55 @@ +package edu.uiuc.boltdb.methods; + +import java.net.InetAddress; +import java.rmi.Naming; +import java.rmi.RemoteException; +import java.util.Date; +import java.util.concurrent.Callable; + +import edu.uiuc.boltdb.BoltDBProtocol; +import edu.uiuc.boltdb.BoltDBServer; +import edu.uiuc.boltdb.BoltDBProtocol.CONSISTENCY_LEVEL; +import edu.uiuc.boltdb.groupmembership.GroupMembership; +import edu.uiuc.boltdb.groupmembership.beans.Operation; +import edu.uiuc.boltdb.ValueTimeStamp; + +public class UpdateThread implements Callable { + + private String targetHost; + private Long key; + private ValueTimeStamp value; + BoltDBProtocol.CONSISTENCY_LEVEL consistencyLevel; + + public UpdateThread(String targetHost, Long key, ValueTimeStamp value, + CONSISTENCY_LEVEL consistencyLevel) { + super(); + this.targetHost = targetHost; + this.key = key; + this.value = value; + this.consistencyLevel = consistencyLevel; + } + + public Boolean call() { + try { + if (GroupMembership.membershipList.get(targetHost).hostname.equals(InetAddress.getLocalHost().getHostName())) { + if(!BoltDBServer.KVStore.containsKey(key)) + return false; + //throw new RemoteException("Key not present."); + + if(BoltDBServer.KVStore.get(key).timeStamp < value.timeStamp) + BoltDBServer.KVStore.put(key, value); + if(consistencyLevel != null) + BoltDBServer.writeBuffer.add(new Operation("UPDATE", consistencyLevel, new Date().toString(), key, value.value)); + return true; + } else { + BoltDBProtocol targetServer = (BoltDBProtocol) Naming + .lookup("rmi://" + GroupMembership.membershipList.get(targetHost).hostname + "/KVStore"); + return targetServer.update(key, value, false, consistencyLevel); + } + } catch(Exception e) { + System.out.println("Exception in update thread"); + return false; + } + } + +}