Skip to content

Commit

Permalink
fix deadlock #13 (point 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
idelvall committed Nov 16, 2016
1 parent 7c35629 commit 8f70a66
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions src/main/java/org/brutusin/wava/core/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.brutusin.wava.input.SubmitInput;
import org.brutusin.wava.utils.ANSICode;
import org.brutusin.wava.utils.NonRootUserException;
import org.brutusin.wava.utils.Utils;
import org.brutusin.wava.utils.RetCode;

public class Scheduler {

Expand Down Expand Up @@ -283,7 +283,24 @@ public void submit(PeerChannel<SubmitInput> submitChannel) throws IOException, I

if (Config.getInstance().getSchedulerCfg().getMaxJobRSSBytes() > 0 && submitChannel.getRequest().getMaxRSS() > Config.getInstance().getSchedulerCfg().getMaxJobRSSBytes() || maxManagedRss < submitChannel.getRequest().getMaxRSS()) {
submitChannel.sendEvent(Event.exceed_global, Config.getInstance().getSchedulerCfg().getMaxJobRSSBytes());
submitChannel.sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
submitChannel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
submitChannel.close();
return;
}
long treeRSS = submitChannel.getRequest().getMaxRSS();
Integer parentId = submitChannel.getRequest().getParentId();
while (parentId != null) {
JobInfo ji = jobMap.get(parentId);
if (ji == null) {
break;
}
treeRSS += ji.getSubmitChannel().getRequest().getMaxRSS();
parentId = ji.getSubmitChannel().getRequest().getParentId();
}

if (treeRSS > maxManagedRss) {
submitChannel.sendEvent(Event.exceed_tree, treeRSS);
submitChannel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
submitChannel.close();
return;
}
Expand Down Expand Up @@ -545,17 +562,17 @@ public void cancel(PeerChannel<CancelInput> cancelChannel) throws IOException, I
JobSet.State state = jobSet.getState(id);
if (state == null) {
cancelChannel.log(ANSICode.RED, "job not found");
cancelChannel.sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
cancelChannel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
} else if (state == JobSet.State.queued) {
JobInfo ji = jobMap.get(id);
if (ji != null) {
if (!cancelChannel.getUser().equals("root") && !cancelChannel.getUser().equals(ji.getSubmitChannel().getUser())) {
cancelChannel.log(ANSICode.RED, "user '" + cancelChannel.getUser() + "' is not allowed to cancel a job from user '" + ji.getSubmitChannel().getUser() + "'");
cancelChannel.sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
cancelChannel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
return;
}
ji.getSubmitChannel().sendEvent(Event.cancelled, cancelChannel.getUser());
ji.getSubmitChannel().sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
ji.getSubmitChannel().sendEvent(Event.retcode, RetCode.ERROR.getCode());
ji.getSubmitChannel().close();
cancelChannel.log(ANSICode.GREEN, "enqueued job sucessfully cancelled");
cancelChannel.sendEvent(Event.retcode, 0);
Expand All @@ -571,7 +588,7 @@ public void cancel(PeerChannel<CancelInput> cancelChannel) throws IOException, I
if (pi != null) {
if (!cancelChannel.getUser().equals("root") && !cancelChannel.getUser().equals(pi.getJobInfo().getSubmitChannel().getUser())) {
cancelChannel.log(ANSICode.RED, "user '" + cancelChannel.getUser() + "' is not allowed to cancel a job from user '" + pi.getJobInfo().getSubmitChannel().getUser() + "'");
cancelChannel.sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
cancelChannel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
return;
}
pi.getJobInfo().getSubmitChannel().sendEvent(Event.cancelled, cancelChannel.getUser());
Expand Down Expand Up @@ -605,7 +622,7 @@ public void updateGroup(PeerChannel<GroupInput> channel) throws IOException {
} else {
channel.log(ANSICode.RED, "Group '" + channel.getRequest().getGroupName() + "' can only be updated by users 'root' and '" + gi.getUser() + "'");
}
channel.sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
channel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
return;
}
if (gi.getJobs().isEmpty()) {
Expand All @@ -615,7 +632,7 @@ public void updateGroup(PeerChannel<GroupInput> channel) throws IOException {
return;
} else {
channel.log(ANSICode.RED, "Group '" + channel.getRequest().getGroupName() + "' cannot be deleted, since it contains " + gi.getJobs().size() + " active jobs");
channel.sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
channel.sendEvent(Event.retcode, RetCode.ERROR.getCode());
return;
}
}
Expand Down Expand Up @@ -677,7 +694,7 @@ public void run() {
updateNiceness(pId);
} catch (Exception ex) {
ji.getSubmitChannel().sendEvent(Event.error, JsonCodec.getInstance().transform(Miscellaneous.getStrackTrace(ex)));
ji.getSubmitChannel().sendEvent(Event.retcode, Utils.WAVA_ERROR_RETCODE);
ji.getSubmitChannel().sendEvent(Event.retcode, RetCode.ERROR.getCode());
return;
}
Thread stoutReaderThread = Miscellaneous.pipeAsynchronously(process.getInputStream(), (ErrorHandler) null, true, ji.getSubmitChannel().getStdoutOs());
Expand Down

0 comments on commit 8f70a66

Please sign in to comment.