diff --git a/executor/build.gradle b/executor/build.gradle index 3f7c67e5..271d1e8c 100644 --- a/executor/build.gradle +++ b/executor/build.gradle @@ -28,6 +28,7 @@ dependencies { compile "org.elasticsearch:elasticsearch:${elasticsearchVersion}" compile "log4j:log4j:1.2.16" compile "com.beust:jcommander:1.48" + compile 'com.jayway.awaitility:awaitility:1.6.3' testCompile 'com.mashape.unirest:unirest-java:1.4.5' testCompile 'com.jayway.awaitility:awaitility:1.6.3' diff --git a/executor/src/main/java/org/apache/mesos/elasticsearch/executor/Main.java b/executor/src/main/java/org/apache/mesos/elasticsearch/executor/Main.java index ed6f613a..2e7a792c 100644 --- a/executor/src/main/java/org/apache/mesos/elasticsearch/executor/Main.java +++ b/executor/src/main/java/org/apache/mesos/elasticsearch/executor/Main.java @@ -4,6 +4,7 @@ import org.apache.mesos.Protos; import org.apache.mesos.elasticsearch.executor.elasticsearch.ElasticsearchLauncher; import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher; +import org.apache.mesos.elasticsearch.executor.elasticsearch.NodeUtil; import org.apache.mesos.elasticsearch.executor.mesos.ElasticsearchExecutor; import org.apache.mesos.elasticsearch.executor.mesos.TaskStatus; import org.elasticsearch.common.settings.Settings; @@ -19,7 +20,7 @@ private Main() { public static void main(String[] args) throws Exception { Launcher launcher = new ElasticsearchLauncher(Settings.builder()); - MesosExecutorDriver driver = new MesosExecutorDriver(new ElasticsearchExecutor(launcher, new TaskStatus())); + MesosExecutorDriver driver = new MesosExecutorDriver(new ElasticsearchExecutor(launcher, new TaskStatus(), new NodeUtil())); System.exit(driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1); } } diff --git a/executor/src/main/java/org/apache/mesos/elasticsearch/executor/elasticsearch/NodeUtil.java b/executor/src/main/java/org/apache/mesos/elasticsearch/executor/elasticsearch/NodeUtil.java new file mode 100644 index 00000000..7a60dc7c --- /dev/null +++ b/executor/src/main/java/org/apache/mesos/elasticsearch/executor/elasticsearch/NodeUtil.java @@ -0,0 +1,23 @@ +package org.apache.mesos.elasticsearch.executor.elasticsearch; + +import org.apache.log4j.Logger; +import org.elasticsearch.node.Node; + +import java.util.concurrent.ExecutionException; + +/** + */ +public class NodeUtil { + Logger LOG = Logger.getLogger(NodeUtil.class); + + public String getNodeStatus(Node node) { + try { + String status = node.client().admin().cluster().prepareHealth().execute().get().getStatus().toString().toLowerCase(); + LOG.debug("ES status: " + status); + return status; + } catch (InterruptedException | ExecutionException e) { + LOG.warn("ES not started. Retrying...", e); + return ""; + } + } +} diff --git a/executor/src/main/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutor.java b/executor/src/main/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutor.java index 2fdaa37e..2147b60d 100644 --- a/executor/src/main/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutor.java +++ b/executor/src/main/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutor.java @@ -1,12 +1,15 @@ package org.apache.mesos.elasticsearch.executor.mesos; import com.google.protobuf.InvalidProtocolBufferException; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionTimeoutException; import org.apache.log4j.Logger; import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; import org.apache.mesos.Protos; import org.apache.mesos.elasticsearch.executor.Configuration; import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher; +import org.apache.mesos.elasticsearch.executor.elasticsearch.NodeUtil; import org.apache.mesos.elasticsearch.executor.model.HostsModel; import org.apache.mesos.elasticsearch.executor.model.PortsModel; import org.apache.mesos.elasticsearch.executor.model.RunTimeSettings; @@ -16,21 +19,26 @@ import java.security.InvalidParameterException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Executor for Elasticsearch. */ @SuppressWarnings("PMD.TooManyMethods") public class ElasticsearchExecutor implements Executor { + public static final long ES_TIMEOUT = 120L; + public static final String ES_STATUS_GREEN = "green"; private final Launcher launcher; public static final Logger LOGGER = Logger.getLogger(ElasticsearchExecutor.class.getCanonicalName()); private final TaskStatus taskStatus; + private final NodeUtil nodeUtil; private Configuration configuration; private Node node; - public ElasticsearchExecutor(Launcher launcher, TaskStatus taskStatus) { + public ElasticsearchExecutor(Launcher launcher, TaskStatus taskStatus, NodeUtil nodeUtil) { this.launcher = launcher; this.taskStatus = taskStatus; + this.nodeUtil = nodeUtil; } @Override @@ -88,8 +96,19 @@ public void launchTask(final ExecutorDriver driver, final Protos.TaskInfo task) // Launch Node node = launcher.launch(); - // Send status update, running - driver.sendStatusUpdate(taskStatus.running()); + try { + Awaitility.await() + .atMost(ES_TIMEOUT, TimeUnit.SECONDS) + .pollInterval(1L, TimeUnit.SECONDS) + .until(() -> nodeUtil.getNodeStatus(node).equals(ES_STATUS_GREEN)); + + // Send status update, running + driver.sendStatusUpdate(taskStatus.running()); + } catch (ConditionTimeoutException e) { + LOGGER.error("ES node was not green within " + ES_TIMEOUT + "s."); + driver.sendStatusUpdate(taskStatus.failed()); + killTask(driver, taskID); + } } catch (InvalidParameterException e) { driver.sendStatusUpdate(taskStatus.failed()); LOGGER.error(e); diff --git a/executor/src/main/resources/elasticsearch.yml b/executor/src/main/resources/elasticsearch.yml index e3df87cf..6aa98b1b 100644 --- a/executor/src/main/resources/elasticsearch.yml +++ b/executor/src/main/resources/elasticsearch.yml @@ -317,6 +317,10 @@ discovery.type: zen # # discovery.zen.ping.timeout: 3s +discovery.zen.fd.ping_timeout: 30s +discovery.zen.fd.ping_interval: 1s +discovery.zen.fd.ping_retries: 30 + # For more information, see # diff --git a/executor/src/test/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutorTest.java b/executor/src/test/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutorTest.java index e6c3c9cd..012747a3 100644 --- a/executor/src/test/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutorTest.java +++ b/executor/src/test/java/org/apache/mesos/elasticsearch/executor/mesos/ElasticsearchExecutorTest.java @@ -4,6 +4,7 @@ import org.apache.mesos.Protos; import org.apache.mesos.elasticsearch.common.Discovery; import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher; +import org.apache.mesos.elasticsearch.executor.elasticsearch.NodeUtil; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.Node; import org.junit.Before; @@ -13,6 +14,8 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import java.util.concurrent.ExecutionException; + import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -35,10 +38,14 @@ public class ElasticsearchExecutorTest { @Mock private ExecutorDriver driver; + @Mock + private NodeUtil nodeUtil; + @Before - public void setupLauncher() { - Node node = mock(Node.class); + public void setupLauncher() throws ExecutionException, InterruptedException { + Node node = mock(Node.class, RETURNS_DEEP_STUBS); when(launcher.launch()).thenReturn(node); + when(nodeUtil.getNodeStatus(eq(node))).thenReturn(ElasticsearchExecutor.ES_STATUS_GREEN); } @Test(expected = NullPointerException.class) diff --git a/scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategy.java b/scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategy.java index d040ed37..243c23a2 100644 --- a/scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategy.java +++ b/scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategy.java @@ -22,6 +22,7 @@ public class OfferStrategy { private List acceptanceRules = asList( new OfferRule("Host already running task", this::isHostAlreadyRunningTask), new OfferRule("Hostname is unresolveable", offer -> !isHostnameResolveable(offer.getHostname())), + new OfferRule("First ES node is not responding", offer -> !isAtLeastOneESNodeRunning()), new OfferRule("Cluster size already fulfilled", offer -> clusterState.getTaskList().size() >= configuration.getElasticsearchNodes()), new OfferRule("Offer did not have 2 ports", offer -> !containsTwoPorts(offer.getResourcesList())), new OfferRule("The offer does not contain the user specified ports", offer -> !containsUserSpecifiedPorts(offer.getResourcesList())), @@ -36,6 +37,16 @@ private boolean isHostnameResolveable(String hostname) { return !address.isUnresolved(); } + private boolean isAtLeastOneESNodeRunning() { + // If this is the first, do not check + List taskList = clusterState.getTaskList(); + if (taskList.size() == 0) { + return true; + } else { + return clusterState.getStatus(taskList.get(0).getTaskId()).getStatus().getState().equals(Protos.TaskState.TASK_RUNNING); + } + } + public OfferStrategy(Configuration configuration, ClusterState clusterState) { this.clusterState = clusterState; this.configuration = configuration; diff --git a/scheduler/src/test/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategyTest.java b/scheduler/src/test/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategyTest.java index 9fcf2862..6f288414 100644 --- a/scheduler/src/test/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategyTest.java +++ b/scheduler/src/test/java/org/apache/mesos/elasticsearch/scheduler/OfferStrategyTest.java @@ -3,6 +3,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.mesos.Protos; import org.apache.mesos.elasticsearch.scheduler.state.ClusterState; +import org.apache.mesos.elasticsearch.scheduler.state.ESTaskStatus; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -16,6 +17,8 @@ import static java.util.Collections.singletonList; import static org.apache.mesos.elasticsearch.scheduler.Resources.*; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** @@ -37,8 +40,11 @@ public class OfferStrategyTest { @Before public void setUp() throws Exception { when(configuration.getFrameworkRole()).thenReturn("testRole"); + ESTaskStatus esTaskStatus = mock(ESTaskStatus.class); + when(esTaskStatus.getStatus()).thenReturn(taskStatus()); + when(clusterState.getStatus(any(Protos.TaskID.class))).thenReturn(esTaskStatus); } - + @Test public void willDeclineIfHostIsAlreadyRunningTask() throws Exception { when(clusterState.getTaskList()).thenReturn(singletonList(createTask("host1"))); @@ -224,4 +230,11 @@ private Protos.Offer.Builder baseOfferBuilder(String slaveId) { .setHostname("localhost") .setSlaveId(Protos.SlaveID.newBuilder().setValue(slaveId).build()); } + + private Protos.TaskStatus taskStatus() { + return Protos.TaskStatus.newBuilder() + .setTaskId(Protos.TaskID.newBuilder().setValue("TestId").build()) + .setState(Protos.TaskState.TASK_RUNNING) + .build(); + } } \ No newline at end of file