Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Making StateStoreCache's fetchXXX similar to CuratorStateStore (#478)
Browse files Browse the repository at this point in the history
* StateStoreCache unpacks TaskInfo on fetch call similar to CuratorStateStore

* A simpler fix
  • Loading branch information
mohitsoni authored and nickbp committed Jan 25, 2017
1 parent 738ef08 commit cc93cc7
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.mesosphere.sdk.specification.DefaultConfigFileSpec;
import com.mesosphere.sdk.specification.GoalState;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos.*;
Expand Down Expand Up @@ -199,7 +200,7 @@ public static TaskInfo.Builder setIndex(TaskInfo.Builder taskBuilder, int index)
/**
* Returns the pod instance index of the provided task, or throws {@link TaskException} if no index data was found.
*
* @throws TaskException if the index data wasn't found
* @throws TaskException if the index data wasn't found
* @throws NumberFormatException if parsing the index as an integer failed
*/
public static int getIndex(TaskInfo taskInfo) throws TaskException {
Expand Down Expand Up @@ -320,7 +321,8 @@ public static Optional<HealthCheck> getReadinessCheck(TaskInfo taskInfo) throws
* failures to parse readiness checks are interpreted as readiness check failures. If some value other
* than "true" is present in the readiness check label of the TaskStatus, the readiness check has
* failed.
* @param taskInfo A TaskInfo which may or may not have a readiness check defined.
*
* @param taskInfo A TaskInfo which may or may not have a readiness check defined.
* @param taskStatus A TaskStatus which may or may not contain a readiness check label.
* @return the result of a readiness check for the indicated TaskInfo and TaskStatus.
*/
Expand Down Expand Up @@ -512,6 +514,24 @@ public static TaskInfo packTaskInfo(TaskInfo taskInfo) {
}
}

/**
* This method is similar to {@link #unpackTaskInfo(TaskInfo)}, just applied over a {@link Collection} of
* {@link TaskInfo}s.
*
* @param packedTaskInfos Collection of TaskInfos to be unpacked.
* @return
*/
public static Collection<TaskInfo> unpackTaskInfos(Collection<TaskInfo> packedTaskInfos)
throws InvalidProtocolBufferException {
Collection<TaskInfo> unpackedTaskInfos = new ArrayList<>();
if (!CollectionUtils.isEmpty(packedTaskInfos)) {
for (TaskInfo packedTaskInfo : packedTaskInfos) {
unpackedTaskInfos.add(unpackTaskInfo(packedTaskInfo));
}
}
return unpackedTaskInfos;
}

/**
* This method reverses the work done in {@link #packTaskInfo(TaskInfo)} such that the original
* TaskInfo is regenerated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public Optional<Protos.TaskStatus> fetchStatus(String taskName) throws StateStor
"Failed to retrieve TaskStatus for TaskName: %s", taskName));
}
} catch (KeeperException.NoNodeException e) {
logger.warn("No TaskInfo found for the requested name: " + taskName + " at: " + path);
logger.warn("No TaskStatus found for the requested name: " + taskName + " at: " + path);
return Optional.empty();
} catch (Exception e) {
throw new StateStoreException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mesosphere.sdk.state;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;
Expand All @@ -11,6 +12,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
Expand All @@ -34,7 +36,13 @@ public void record(Operation operation, Offer offer) throws Exception {
private void recordTasks(List<Protos.TaskInfo> taskInfos) throws StateStoreException {
logger.info(String.format("Recording %d updated TaskInfos/TaskStatuses:", taskInfos.size()));
List<Protos.TaskStatus> taskStatuses = new ArrayList<>();
for (Protos.TaskInfo taskInfo : taskInfos) {
Collection<Protos.TaskInfo> unpackedTaskInfos;
try {
unpackedTaskInfos = CommonTaskUtils.unpackTaskInfos(taskInfos);
} catch (InvalidProtocolBufferException e) {
throw new StateStoreException(e);
}
for (Protos.TaskInfo taskInfo : unpackedTaskInfos) {
if (!taskInfo.getTaskId().getValue().equals("")) {
Protos.TaskStatus.Builder taskStatusBuilder = Protos.TaskStatus.newBuilder()
.setTaskId(taskInfo.getTaskId())
Expand All @@ -51,7 +59,7 @@ private void recordTasks(List<Protos.TaskInfo> taskInfos) throws StateStoreExcep
}
}

stateStore.storeTasks(taskInfos);
stateStore.storeTasks(unpackedTaskInfos);
for (Protos.TaskStatus taskStatus : taskStatuses) {
recordTaskStatus(taskStatus);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
package com.mesosphere.sdk.state;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.annotations.VisibleForTesting;
import com.mesosphere.sdk.curator.CuratorStateStore;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.TaskStatus;
import com.mesosphere.sdk.curator.CuratorStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Thread-safe caching layer for an underlying {@link StateStore}.
*
* <p>
* Writes are automatically forwarded to the underlying instance, while reads prioritize the local
* instance. In order to maintain consistency, there should only be one StateStoreCache object per
* process. In practice this works because there should only be one scheduler task/process
* accessing the state data at any given time.
*
* <p>
* Implementation note: All write operations always invoke the underlying storage before updating
* the local cache. This avoids creating an inconsistent cache state if writing to the underlying
* persistent store fails.
Expand Down Expand Up @@ -63,7 +62,7 @@ public static StateStore getInstance(StateStore store) {
// Disallow subsequent calls to getInstance() with different instances of StateStore.
throw new IllegalStateException(String.format(
"StateStoreCache may only be used against a single instance of StateStore. " +
"got[%s] expected[%s]", store, instance.store));
"got[%s] expected[%s]", store, instance.store));
}
return instance;
} finally {
Expand Down Expand Up @@ -98,7 +97,7 @@ public static void resetInstanceForTests() {
if (task == null) {
throw new StateStoreException(String.format(
"The following TaskInfo is not present: %s. TaskInfo must be present in " +
"order to store a TaskStatus. All Tasks: %s", status.getTaskId(), idToTask));
"order to store a TaskStatus. All Tasks: %s", status.getTaskId(), idToTask));
}
nameToStatus.put(task.getName(), status);
}
Expand Down Expand Up @@ -168,7 +167,7 @@ public void storeStatus(TaskStatus status) throws StateStoreException {
if (taskName == null) {
throw new StateStoreException(String.format(
"The following TaskInfo is not present in the StateStore: %s. " +
"TaskInfo must be present in order to store a TaskStatus.", status.getTaskId()));
"TaskInfo must be present in order to store a TaskStatus.", status.getTaskId()));
}
nameToStatus.put(taskName, status);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package com.mesosphere.sdk.state;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.mesosphere.sdk.curator.CuratorStateStore;
import com.mesosphere.sdk.offer.CommonTaskUtils;
import com.mesosphere.sdk.testutils.CuratorTestUtils;
import com.mesosphere.sdk.testutils.TaskTestUtils;
import org.apache.curator.test.TestingServer;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
import com.mesosphere.sdk.curator.CuratorStateStore;
import com.mesosphere.sdk.offer.CommonTaskUtils;
import com.mesosphere.sdk.testutils.CuratorTestUtils;
import com.mesosphere.sdk.testutils.TaskTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -21,15 +18,10 @@
import org.mockito.MockitoAnnotations;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
* Tests for {@link StateStoreCache}
Expand Down

0 comments on commit cc93cc7

Please sign in to comment.