Skip to content

Commit

Permalink
Configurable status when dependency times out (#642)
Browse files Browse the repository at this point in the history
Introducing the new config of `tony.application.dependency.[X].timeout.after.[GROUP].ignored = true` to solve the #641. When specifying the above conf, it will make this task type untracked.
  • Loading branch information
zuston authored Feb 6, 2022
1 parent a014d68 commit 416019c
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@ For more information about TonY, check out the following:
3. My tensorflow's partial workers hang when chief finished. Or evaluator hang when chief and workers finished.
Please see the [PR#521](https://github.com/tony-framework/TonY/pull/621) on Tensorflow configuration to solve it.
Please see the [PR#521](https://github.com/tony-framework/TonY/pull/621) and [PR#641](https://github.com/tony-framework/TonY/issues/641) on Tensorflow configuration to solve it.
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ public static String getContainerDockerMountKey() {
public static final String GROUP_REGEX = TONY_APPLICATION_PREFIX + "group\\.([A-Za-z]+)$";
public static final String GROUP_DEPEND_TIMEOUT_REGEX =
TONY_APPLICATION_PREFIX + "dependency\\.([A-Za-z]+)\\.timeout\\.after\\.([A-Za-z]+)$";
public static final String GROUP_DEPEND_TIMEOUT_IGNORE_REGEX =
TONY_APPLICATION_PREFIX + "dependency\\.([A-Za-z]+)\\.timeout\\.after\\.([A-Za-z]+)$.ignored";

public static String getGroupKey(String groupName) {
return String.format(TONY_APPLICATION_PREFIX + "group.%s", groupName);
Expand All @@ -357,4 +359,8 @@ public static String getGroupKey(String groupName) {
public static String getGroupDependentKey(String grp, String dependentGrp) {
return String.format(TONY_APPLICATION_PREFIX + "dependency.%s.timeout.after.%s", grp, dependentGrp);
}

public static String getGroupDependentIgnoredKey(String roleType, String dependentGrp) {
return String.format(TONY_APPLICATION_PREFIX + "dependency.%s.timeout.after.%s.ignored", roleType, dependentGrp);
}
}
10 changes: 10 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/TonySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -37,6 +38,8 @@

import static com.linkedin.tony.Constants.CHIEF_JOB_NAME;
import static com.linkedin.tony.Constants.WORKER_JOB_NAME;
import static com.linkedin.tony.TonyConfigurationKeys.UNTRACKED_JOBTYPES;
import static com.linkedin.tony.util.Utils.getUntrackedJobTypes;


/**
Expand Down Expand Up @@ -670,4 +673,11 @@ public int getNumRegisteredTasks() {
public Set<String> getRegisteredTasks() {
return registeredTasks;
}

public void makeTaskTypeUntracked(String taskType) {
String[] defaultUntrackedTypes = getUntrackedJobTypes(tonyConf);
List<String> untrackedList = Arrays.stream(defaultUntrackedTypes).collect(Collectors.toList());
untrackedList.add(taskType);
tonyConf.set(UNTRACKED_JOBTYPES, StringUtils.join(untrackedList, ","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.linkedin.tony.util.Utils;

import static com.linkedin.tony.Constants.SIDECAR_TB_ROLE_NAME;
import static com.linkedin.tony.TonyConfigurationKeys.getGroupDependentIgnoredKey;

public abstract class MLGenericRuntime extends AbstractFrameworkRuntime {
private static final long REGISTRATION_STATUS_INTERVAL_MS = 15 * 1000;
Expand Down Expand Up @@ -141,9 +142,16 @@ public boolean isHealthy(Configuration tonyConf) {
* chief/workers are finished, the mechanism of dependency group timeout will make job failed.
*
* Dependency group timeout configuration as follows:
*
* ```
* tony.application.group.A = worker,chief
* tony.application.dependency.evaluator.timeout.after.A = 3600
* ```
*
* And in some of the cases, we don't want to fail the whole job even though a dependency times out.
* For example, if chief succeeded and there is a worker hanging for 1 hour,
* users can configure the job to still pass. So it introduces the new config of
* `tony.application.dependency.[X].timeout.after.[GROUP].ignored = true`, and more details could be
* found in https://github.com/tony-framework/TonY/issues/641.
*
*/
String errorMsg = groupDependencyTimeout(tonyConf);
Expand Down Expand Up @@ -231,10 +239,20 @@ protected String groupDependencyTimeout(Configuration tonyConf) {
}

if (System.currentTimeMillis() - latestEndTimeInAllDependentTasks > timeout) {
return String.format("Jobtype: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));

String ignoredTaskTypeKey = getGroupDependentIgnoredKey(runningTaskType, dependentGroupName);
boolean ignoreTimeout = tonyConf.getBoolean(ignoredTaskTypeKey, false);
if (!ignoreTimeout) {
return String.format("Task type: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));
}

log.info(
String.format("Task type: %s is marked as untracked.", runningJobTypes)
);
session.makeTaskTypeUntracked(runningTaskType);
}
}

Expand Down
76 changes: 76 additions & 0 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,82 @@ public void testGroupDependencyTimeoutShouldPass() throws ParseException, IOExce
client.removeListener(handler);
}

/**
* When enable the conf of "tony.application.dependency.worker.timeout.after.A.ignored=true",
* it should make the job succeed.
*/
@Test
public void testTaskWithDependencyTimeoutButIgnoredShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
});
int exitCode = client.start();
Assert.assertEquals(exitCode, 0);
}

/**
* Test task-dependency-timeout with the task role of PS (the default untracked job type)
*/
@Test
public void testTaskWithDependencyTimeoutButIgnoredAndWithPSShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.ps.instances=1",
"--conf", "tony.ps.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
});
int exitCode = client.start();
Assert.assertEquals(exitCode, 0);
}

/**
* Test task(Worker) dependency-timeout, but the role of worker exit with -1,
* and then this job should fail.
*/
@Test
public void testTaskWithDependencyTimeAndIgnoredButFailedShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python sleep_10_and_exit_1.py",
"--conf", "tony.evaluator.instances=1",
"--conf", "tony.evaluator.command=python sleep_30.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=5",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
"--conf", "tony.application.stop-on-failure-jobtypes=worker"
});
int exitCode = client.start();
Assert.assertEquals(exitCode, -1);
}

@Test
public void testLostConnectionWithAMJobShouldFail() throws Exception {
client.init(new String[]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public void testGroupDependencyShouldPass() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: evaluator runs exceeded timeout because it's dependent group: A "
+ "(task set: [worker,chief]) has been finished."
"Task type: evaluator runs exceeded timeout because it's dependent group: "
+ "A (task set: [worker,chief]) has been finished."
);
}

Expand All @@ -142,7 +142,8 @@ public void testGroupDependencyWorkerWhenChiefFinished() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: otherWorker runs exceeded timeout because it's dependent group: A (task set: [chief]) has been finished."
"Task type: otherWorker runs exceeded timeout because it's dependent group: "
+ "A (task set: [chief]) has been finished."
);
}

Expand All @@ -164,7 +165,8 @@ public void testGroupDependencyWithMultipleGroup() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: evaluator runs exceeded timeout because it's dependent group: B (task set: [chief,worker]) has been finished."
"Task type: evaluator runs exceeded timeout because it's dependent group: "
+ "B (task set: [chief,worker]) has been finished."
);
}

Expand Down Expand Up @@ -235,6 +237,32 @@ public void testPartialTaskScheduledShouldPass() {
);
}

/**
* Test case for partial tasks with ignored timeout, and it will be marked as untracked
* when dependency times out.
*/
@Test
public void testTaskTimeoutWithIgnoredShouldPass() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "chief");
conf.set("tony.application.dependency.otherWorker.timeout.after.A", "3600");
conf.set("tony.application.dependency.otherWorker.timeout.after.A.ignored", "true");

TonySession session = buildMockSession(conf);

TonySession.TonyTask chiefTask = session.getTask("chief", "0");
chiefTask.setEndTime(System.currentTimeMillis() - 1000 * 60 * 120);

MLGenericRuntime.AM am = (MLGenericRuntime.AM) runtime.getAMAdapter();
am.setTonySession(session);
Assert.assertNull(
am.groupDependencyTimeout(conf)
);

Assert.assertEquals(session.getTotalTasks() - session.getTotalTrackedTasks(), 3);
}

private TonySession buildPartialTaskScheduledSession(Configuration conf) {
TonySession session = new TonySession.Builder().setTonyConf(conf).build();

Expand Down
8 changes: 8 additions & 0 deletions tony-core/src/test/resources/scripts/sleep_10_and_exit_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright 2022 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
# See LICENSE in the project root for license information.
#
import time

time.sleep(10)
exit(1)

0 comments on commit 416019c

Please sign in to comment.