Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more thread pools #465

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ allprojects {
retry {
if (System.getenv().containsKey("CI")) {
maxRetries = 1
maxFailures = 3
maxFailures = 4
failOnPassedAfterRetry = false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
Expand Down Expand Up @@ -185,9 +187,23 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(settings),
TimeValue.timeValueMinutes(5),
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ private CommonValue() {}
*/
/** Flow Framework plugin thread pool name prefix */
public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework.";
/** The provision workflow thread pool name */
/** The general workflow thread pool name for most calls */
public static final String WORKFLOW_THREAD_POOL = "opensearch_workflow";
/** The workflow thread pool name for provisioning */
public static final String PROVISION_WORKFLOW_THREAD_POOL = "opensearch_provision_workflow";
/** The workflow thread pool name for deprovisioning */
public static final String DEPROVISION_WORKFLOW_THREAD_POOL = "opensearch_deprovision_workflow";

/*
* Field names common to multiple classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

Expand Down Expand Up @@ -102,7 +102,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
context.restore();

// Retrieve resources from workflow state and deprovision
threadPool.executor(WORKFLOW_THREAD_POOL)
threadPool.executor(DEPROVISION_WORKFLOW_THREAD_POOL)
.execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener));
}, exception -> {
String message = "Failed to get workflow state for workflow " + workflowId;
Expand Down Expand Up @@ -143,6 +143,7 @@ private void executeDeprovisionSequence(
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
Collections.emptyList(),
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
flowFrameworkSettings.getRequestTimeout()
)
);
Expand Down Expand Up @@ -196,6 +197,7 @@ private void executeDeprovisionSequence(
pn.input(),
pn.predecessors(),
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
pn.nodeTimeout()
);
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_END_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

/**
* Transport Action to provision a workflow from a stored use case template
Expand Down Expand Up @@ -180,7 +180,7 @@
*/
private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowSequence, ActionListener<WorkflowResponse> listener) {
try {
threadPool.executor(WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); });
threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); });

Check warning on line 183 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L183

Added line #L183 was not covered by tests
} catch (Exception exception) {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler.ScheduledCancellable;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

/**
* Representation of a process node in a workflow graph.
Expand All @@ -37,6 +33,7 @@ public class ProcessNode {
private final WorkflowData input;
private final List<ProcessNode> predecessors;
private final ThreadPool threadPool;
private final String threadPoolName;
private final TimeValue nodeTimeout;

private final PlainActionFuture<WorkflowData> future = PlainActionFuture.newFuture();
Expand All @@ -50,6 +47,7 @@ public class ProcessNode {
* @param input Input required by the node encoded in a {@link WorkflowData} instance.
* @param predecessors Nodes preceding this one in the workflow
* @param threadPool The OpenSearch thread pool
* @param threadPoolName The thread pool to use
* @param nodeTimeout The timeout value for executing on this node
*/
public ProcessNode(
Expand All @@ -59,6 +57,7 @@ public ProcessNode(
WorkflowData input,
List<ProcessNode> predecessors,
ThreadPool threadPool,
String threadPoolName,
TimeValue nodeTimeout
) {
this.id = id;
Expand All @@ -67,6 +66,7 @@ public ProcessNode(
this.input = input;
this.predecessors = predecessors;
this.threadPool = threadPool;
this.threadPoolName = threadPoolName;
this.nodeTimeout = nodeTimeout;
}

Expand Down Expand Up @@ -152,34 +152,23 @@ public PlainActionFuture<WorkflowData> execute() {
WorkflowData wd = node.future().actionGet();
inputMap.put(wd.getNodeId(), wd);
}
logger.info("Starting {}.", this.id);

ScheduledCancellable delayExec = null;
if (this.nodeTimeout.compareTo(TimeValue.ZERO) > 0) {
delayExec = threadPool.schedule(() -> {
if (!future.isDone()) {
future.onFailure(new TimeoutException("Execute timed out for " + this.id));
}
}, this.nodeTimeout, ThreadPool.Names.SAME);
}
// record start time for this step.
logger.info("Starting {}.", this.id);
PlainActionFuture<WorkflowData> stepFuture = this.workflowStep.execute(
this.id,
this.input,
inputMap,
this.previousNodeInputs
);
// If completed exceptionally, this is a no-op
future.onResponse(stepFuture.get());
future.onResponse(stepFuture.actionGet(this.nodeTimeout));
// record end time passing workflow steps
if (delayExec != null) {
delayExec.cancel();
}
logger.info("Finished {}.", this.id);
} catch (Exception e) {
this.future.onFailure(e);
}
}, threadPool.executor(WORKFLOW_THREAD_POOL));
}, threadPool.executor(this.threadPoolName));
return this.future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.model.WorkflowNode.NODE_TIMEOUT_DEFAULT_VALUE;
import static org.opensearch.flowframework.model.WorkflowNode.NODE_TIMEOUT_FIELD;
Expand Down Expand Up @@ -122,6 +123,7 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId)
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
idToNodeMap.put(processNode.id(), processNode);
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/mappings/workflow-steps.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "60s"
},
"delete_connector": {
"inputs": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testPlugin() throws IOException {
);
assertEquals(9, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(9, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(3, ffp.getExecutorBuilders(settings).size());
assertEquals(5, ffp.getSettings().size());
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/test/java/org/opensearch/flowframework/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ public static Response makeRequest(
if (entity != null) {
request.setEntity(entity);
}
try {
return client.performRequest(request);
} catch (IOException e) {
// In restricted resource cluster, initialization of REST clients on other nodes takes time
// Wait 10 seconds and try again
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {}
}
return client.performRequest(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.model.WorkflowState;
import org.junit.Before;
import org.junit.ComparisonFailure;

import java.util.Collections;
Expand All @@ -30,7 +31,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
Expand All @@ -39,6 +42,18 @@

public class FlowFrameworkRestApiIT extends FlowFrameworkRestTestCase {

private static AtomicBoolean waitToStart = new AtomicBoolean(true);

@Before
public void waitToStart() throws Exception {
// ML Commons cron job runs every 10 seconds and takes 20+ seconds to initialize .plugins-ml-config index
// Delay on the first attempt for 25 seconds to allow this initialization and prevent flaky tests
if (waitToStart.getAndSet(false)) {
CountDownLatch latch = new CountDownLatch(1);
latch.await(25, TimeUnit.SECONDS);
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void testSearchWorkflows() throws Exception {

// Create a Workflow that has a credential 12345
Expand Down Expand Up @@ -257,7 +272,7 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
// wait and ensure state is completed/done
assertBusy(
() -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
30,
120,
TimeUnit.SECONDS
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

import org.mockito.ArgumentCaptor;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -57,11 +57,11 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase
private static ThreadPool threadPool = new TestThreadPool(
DeprovisionWorkflowTransportActionTests.class.getName(),
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
);
private Client client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
Expand Down Expand Up @@ -85,9 +86,16 @@ public void setUp() throws Exception {
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
TimeValue.timeValueMinutes(5),
Math.max(1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
)
);
this.deployModel = new DeployModelStep(
Expand Down
Loading
Loading