Skip to content

Commit

Permalink
Merge pull request #31 from nchabalier/enhance-ioplugins
Browse files Browse the repository at this point in the history
Enhance ioplugins
  • Loading branch information
yannrichet-irsn authored Dec 19, 2023
2 parents 4192e71 + 9417ae4 commit 6181f6a
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 115 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: test

on:
workflow_dispatch:
push:
branches: [ master ]
tags: [ 'v*.*' ]
Expand Down
7 changes: 6 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# v1.16 - ??/??/2023

## Fixes
## Improvements
* add a field to define output numeric format to display in IOPlugins
* add a field to define default columns to display in a table in IOPlugins

## Fixes
* fix old results that cannot be reloaded randomly
* fix creation of concurent RMahtExpression when there are multi thread on the same Plugin (bug with Rserve session never killed)
* support missing output/input in Python wrapper
* update Rsession to 3.1.8 to support R>=4.3

Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/funz/Project.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
*/
public class Project {


@Override
public String toString() {
return getName();
Expand Down Expand Up @@ -72,6 +73,16 @@ public String toString() {
public boolean deletePreviousArchive = DEFAULT_deletePreviousArchive;
private AbstractShell shell;

/**
* Name of the file we wan't to check to get cache results.
* Keep it "null" or empty if we wan't to check all inputDir files m5.
*/
private String cacheFileNameToCheck = null;

public void setCacheFileNameToCheck(String cacheFileNameToCheck) {
this.cacheFileNameToCheck = cacheFileNameToCheck;
}

/**
* @return the shell
*/
Expand Down Expand Up @@ -136,6 +147,10 @@ public long getBlacklistTimeout() {
return blacklistTimeout;
}

public String getCacheFileNameToCheck() {
return this.cacheFileNameToCheck;
}

/* too much dangerous : InputFile are not updated nor checked, so when calling Project.cleanParameters, var will disappear...
public void replaceVariable(Variable x) {
Variable x_old = getVariableByName(x.getName());
Expand Down
113 changes: 82 additions & 31 deletions src/main/java/org/funz/api/BatchRun_v1.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.funz.parameter.OutputFunctionExpression;
import org.funz.run.Client;
import org.funz.run.Computer;
import org.funz.util.Digest;
import org.funz.util.Disk;
import org.funz.util.Format;
import org.funz.util.ZipTool;
Expand All @@ -26,6 +27,8 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.Thread.sleep;
import static org.funz.Protocol.ARCHIVE_FILTER;
Expand Down Expand Up @@ -540,7 +543,18 @@ boolean runCase(final Case c) {
for (Cache _c : getCache()) {
addInfoHistory(c, "Searching in cache: " + getCache());
File dir = files[0].getParentFile().getParentFile();
File matching_output = _c.getMatchingOutputDirinCache(new File(dir + File.separator + Constants.INPUT_DIR), prj.getCode());
File inputDir = new File(dir + File.separator + Constants.INPUT_DIR);
String cacheFileNameToCheck = prj.getCacheFileNameToCheck();
byte[] checkSum = null;
if(cacheFileNameToCheck!=null && !cacheFileNameToCheck.isEmpty()) {
File modelVars = new File(inputDir, cacheFileNameToCheck);
if(modelVars!=null && modelVars.exists()) {
checkSum = Digest.getSum(modelVars);
}
}


File matching_output = _c.getMatchingOutputDirinCache(inputDir, prj.getCode(), checkSum);
if (matching_output != null) {
addInfoHistory(c, "Found in " + matching_output.getAbsolutePath());

Expand Down Expand Up @@ -1236,6 +1250,14 @@ public boolean test(Case t) {
return theseCases;
}

private static boolean hasDuplicate(List<Case> caseList) {
Set<String> set = new HashSet<>();
// Set#add returns false if the set does not change, which
// indicates that a duplicate element has been added.
for (Case aCase: caseList) if (!set.add(aCase.getName())) return true;
return false;
}

public List<Case> getFinishedCases() {
List<Case> theseCases = new LinkedList<>();
for (Case t : getSelectedCases()) {
Expand All @@ -1262,6 +1284,7 @@ private boolean hasPendingCases() {

public List<Case> torun;


public boolean runBatch() throws Exception {
//LogUtils.tic("runBatch");

Expand Down Expand Up @@ -1334,14 +1357,18 @@ public boolean runBatch() throws Exception {
List<CloneCase> cloneCases = new ArrayList<CloneCase>();
//LogUtils.toc("getPendingCases");

List<Case> finishedCaseList = getFinishedCases();
List<Case> pendingCaseList = getPendingCases();
boolean hasDuplicatedCases = hasDuplicate(getPendingCases());
int numToRun = torun.size();
for (int i = 0; i < numToRun; i++) {
final Case c = torun.get(i);
c.setObserver(observer);
boolean already_launched = false;
boolean cloned = false;
for (int j = 0; j < getFinishedCases().size(); j++) { //for old cases
final Case cprev = getFinishedCases().get(j);

for (int j = 0; j < finishedCaseList.size(); j++) { //for old cases
final Case cprev = finishedCaseList.get(j);
//LogUtils.tic("synchronized (cprev) ");
synchronized (cprev) {
if (c.getName().equals(cprev.getName())) {
Expand All @@ -1354,22 +1381,27 @@ public boolean runBatch() throws Exception {
}
//LogUtils.toc("synchronized (cprev) ");
}
if (!already_launched && !cloned) {
for (int j = 0; j < i; j++) { // when same case is just asked before.
final Case cprev = getPendingCases().get(j);
//LogUtils.tic("synchronized (cprev) ");
synchronized (cprev) {
if (c.getName().equals(cprev.getName())) {
already_launched = true;
out("Case " + c.getName() + " already launched...", 1);
cloneCases.add(new CloneCase(c, cprev));
cloned = true;
break;

if(hasDuplicatedCases) {
// If there is not duplicated cases, ignore the block bellow which is very inefficient
if (!already_launched && !cloned) {
for (int j = 0; j < i; j++) { // when same case is just asked before.
final Case cprev = pendingCaseList.get(j);
//LogUtils.tic("synchronized (cprev) ");
synchronized (cprev) {
if (c.getName().equals(cprev.getName())) {
already_launched = true;
out("Case " + c.getName() + " already launched...", 1);
cloneCases.add(new CloneCase(c, cprev));
cloned = true;
break;
}
}
}
//LogUtils.toc("synchronized (cprev) ");
}
}


if (!already_launched && !cloned) {
//LogUtils.tic("new RunCase");
RunCase rc = new RunCase(c);
Expand Down Expand Up @@ -1404,15 +1436,19 @@ public void run() {
//LogUtils.toc("beforeRunCases");

setState(BATCH_RUNNING);
// TODO NC: configure nb threads
//ExecutorService executorService = Executors.newFixedThreadPool(8);
// let's start only some cases (to limit concurrent RunCase threads)
for (int i = 0; i < prj.getMaxCalcs(); i++) {
if (i < runCases.size()) {
out("Will start case " + runCases.get(i).c.getName(), 3);
//LogUtils.tic("runCases.get(i).start()");
//executorService.submit(runCases.get(i));
runCases.get(i).start();
//LogUtils.toc("runCases.get(i).start()");
}
}
//executorService.shutdown();

int f = 0;
String state_value = "";
Expand All @@ -1428,29 +1464,35 @@ public void run() {
int[][] states = new int[getSelectedCases().size()][Case.STATE_STRINGS.length];
while (f < numToRun && !askToStop) {
//out_noln(" ? ", 5);
int f_old = f;

//LogTicToc.tic("filled");
f = filled(torun);
//LogTicToc.toc("filled");
// let's start only some cases (to limit concurrent RunCase threads)
for (int i = 0; i < /*f - f_old*/ Math.min(prj.getMaxCalcs(), numToRun - f); i++) {
for (int j = 0; j < runCases.size(); j++) {
//LogTicToc.tic("runCases.get(j)");
RunCase rc = runCases.get(j);
//LogTicToc.toc("runCases.get(j)");
if (!rc.isAlive()) {
if (rc.c != null && rc.c.getState() == Case.STATE_INTACT) {
out("Starting case " + rc.c.getName(), 3);
//LogTicToc.tic("rc.start()");
rc.start();
//LogTicToc.toc("rc.start()");
//System.err.println("+");
break;
}//else System.err.println(rc.c.getStatusInformation());
}
int nbCaseAlive = 0;
for (int j = 0; j < runCases.size(); j++) {
//LogTicToc.tic("runCases.get(j)");
RunCase rc = runCases.get(j);
//LogTicToc.toc("runCases.get(j)");
if(rc.isAlive()) {
nbCaseAlive++;
} else {
if (rc.c != null && rc.c.getState() == Case.STATE_INTACT) {
out("Starting case " + rc.c.getName(), 3);
//LogTicToc.tic("rc.start()");
rc.start();
//LogTicToc.toc("rc.start()");
//System.err.println("+");
break;
}//else System.err.println(rc.c.getStatusInformation());
}
// Only run "prj.getMaxCalcs()" number of RunCases in parallel
if(nbCaseAlive >= prj.getMaxCalcs()) {
break;
}
}


//out("Finished " + f + "/" + getCases().size() + " cases.", 2);
if (Funz.getVerbosity() > 3) {
//out("Cases status:", 3);
Expand Down Expand Up @@ -1524,7 +1566,15 @@ public void run() {
setState(BATCH_EXCEPTION);
return false;
} finally {
// Don't finalize formula interpreter here because Shell_v1 needs it
// try {
// // Finalize RServe or other formula interpreter
// //prj.getPlugin().getFormulaInterpreter().finalize();
// } catch (Throwable throwable) {
// throw new Exception("Failed to close formula interpreter \n" + throwable.getMessage());
// }
try {
setState(MERGING_RESULTS);
merged_results.putAll(merge(getSelectedCases()));//torun));
} catch (Exception e) {
err("Failed to merge results: " + torun, 0);
Expand Down Expand Up @@ -1561,6 +1611,7 @@ public File getArchiveDirectory() {
public static final String BATCH_STARTING = "Starting...";
public static final String BATCH_ERROR = "Batch failed";
public static final String BATCH_EXCEPTION = "Batch exception";
private static final String MERGING_RESULTS = "Merging results";

public String getState() {
return state;
Expand Down
64 changes: 49 additions & 15 deletions src/main/java/org/funz/ioplugin/BasicIOPlugin.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
package org.funz.ioplugin;

import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.funz.Constants;
import org.funz.log.Log;
import org.funz.log.LogCollector.SeverityLevel;
import org.funz.parameter.OutputFunctionExpression;
import org.funz.parameter.SyntaxRules;
import org.funz.script.MathExpression;
import org.funz.script.ParseExpression;
import org.funz.script.RMathExpression;
import org.funz.util.Data;
import org.math.io.parser.ArrayString;

import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URL;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Simplified IOPlugin implementation based on properties file. Following
Expand Down Expand Up @@ -195,6 +189,9 @@ public void setInputFiles(File[] inputfiles) {
}
}

initializeDefaultDisplayedOutput();
initializeOutputFormat();

//_output.put("output_lines", new String[0]);
//_output.put("out", "");
//_output.put("err", "");
Expand Down Expand Up @@ -398,6 +395,43 @@ public boolean accept(File f) {
}
// ant compile dist; cd dist; zip -r ../../plugin-R/funz-client.zip *; cd ..


@Override
public void initializeDefaultDisplayedOutput() {
if (_properties.containsKey("defaultdisplayedoutputlist") && _properties.getProperty("defaultdisplayedoutputlist").length() > 0) {
Log.logMessage("BasicIOPlugin " + getID(), SeverityLevel.INFO, false, " defaultdisplayedoutputlist=" + _properties.getProperty("defaultdisplayedoutputlist"));

List<String> outputlist = new LinkedList<String>();
Matcher m = Pattern.compile("([^\"]\\S*|\".+?\")\\s*").matcher(_properties.getProperty("defaultdisplayedoutputlist"));
while (m.find()) outputlist.add(m.group(1));
this._defaultDisplayedOutput=outputlist;
}
}

@Override
public void initializeOutputFormat() {
if (_properties.containsKey("outputformat") && _properties.getProperty("outputformat").length() > 0) {
Log.logMessage("BasicIOPlugin " + getID(), SeverityLevel.INFO, false, " outputformat=" + _properties.getProperty("outputformat"));

List<String> outputFormatList = new LinkedList<String>();
Matcher m = Pattern.compile("([^\"]\\S*|\".+?\")\\s*").matcher(_properties.getProperty("outputformat"));
while (m.find()) outputFormatList.add(m.group(1));
for(String outputFormat : outputFormatList) {
boolean readSucess = false;
if(outputFormat.contains(":")) {
String[] outputAndFormat = outputFormat.split(":");
if(outputAndFormat.length==2) {
this._outputFormat.put(outputAndFormat[0], outputAndFormat[1]);
readSucess = true;
}
}
if(!readSucess) {
System.err.println("The following outputFormat is unreadable (please respect syntax 'output:format'): " + outputFormat);
}
}
}
}

@Override
public void setCommentLine(String c) {
commentLine = c;
Expand Down
Loading

0 comments on commit 6181f6a

Please sign in to comment.