Skip to content

Commit d567cf1

Browse files
authored
Merge pull request #52 from opensha/2024_09-module_archive_io_abstraction
Updates to dev project for upstream Module archive I/O abstraction PR
2 parents 8ee3cf2 + 50e93e9 commit d567cf1

12 files changed

+581
-29
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package scratch.kevin;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.io.InputStream;
6+
import java.math.BigInteger;
7+
import java.security.MessageDigest;
8+
import java.security.NoSuchAlgorithmException;
9+
import java.util.ArrayDeque;
10+
import java.util.ArrayList;
11+
import java.util.HashMap;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.concurrent.Callable;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.Future;
19+
import java.util.stream.Collectors;
20+
21+
import org.opensha.commons.util.ExceptionUtils;
22+
import org.opensha.commons.util.io.archive.ArchiveInput;
23+
import org.opensha.sha.earthquake.faultSysSolution.util.FaultSysTools;
24+
25+
public class ZipMD5Compare {
26+
27+
public static void main(String[] args) throws IOException {
28+
File zip1 = new File("/home/kevin/OpenSHA/nshm23/batch_inversions/2024_02_02-nshm23_branches-WUS_FM_v3/results_simplified.zip");
29+
File zip2 = new File("/tmp/results_simplified.zip");
30+
31+
ExecutorService exec = Executors.newFixedThreadPool(FaultSysTools.defaultNumThreads());
32+
33+
System.out.println("Calulating MD5s for Zip1: "+zip1.getAbsolutePath());
34+
Map<String, String> md5s1 = loadCalcMD5s(zip1, exec);
35+
System.out.println("Calulating MD5s for Zip2: "+zip2.getAbsolutePath());
36+
Map<String, String> md5s2 = loadCalcMD5s(zip2, exec);
37+
38+
for (String entry : md5s1.keySet()) {
39+
String md51 = md5s1.get(entry);
40+
String md52 = md5s2.get(entry);
41+
if (md52 == null)
42+
System.err.println("Zip2 is missing:\t"+entry);
43+
else if (!md51.equals(md52))
44+
System.err.println("Zip2 differs for:\t"+entry+"\t("+md51+" != "+md52+")");
45+
}
46+
for (String entry : md5s2.keySet())
47+
if (!md5s1.containsKey(entry))
48+
System.err.println("Zip1 is missing:\t"+entry);
49+
50+
exec.shutdown();
51+
}
52+
53+
public static Map<String, String> loadCalcMD5s(File zipFile, ExecutorService exec) throws IOException {
54+
ArchiveInput input = ArchiveInput.getDefaultInput(zipFile);
55+
Map<String, String> ret = loadCalcMD5s(input, exec);
56+
input.close();
57+
return ret;
58+
}
59+
60+
public static Map<String, String> loadCalcMD5s(ArchiveInput input, ExecutorService exec) throws IOException {
61+
ArrayDeque<MessageDigest> mds = new ArrayDeque<>();
62+
63+
List<String> entries = input.entryStream().collect(Collectors.toList());
64+
List<Future<byte[]>> futures = new ArrayList<>(entries.size());
65+
66+
for (String entry : entries) {
67+
futures.add(exec.submit(new Callable<byte[]>() {
68+
69+
@Override
70+
public byte[] call() throws Exception {
71+
MessageDigest md;
72+
synchronized (mds) {
73+
md = mds.poll();
74+
}
75+
if (md == null) {
76+
try {
77+
md = MessageDigest.getInstance("MD5");
78+
} catch (NoSuchAlgorithmException e) {
79+
throw ExceptionUtils.asRuntimeException(e);
80+
}
81+
}
82+
83+
byte[] buffer = new byte[8192]; // Read in chunks of 8KB
84+
int bytesRead;
85+
86+
InputStream is;
87+
synchronized (input) {
88+
is = input.getInputStream(entry);
89+
}
90+
91+
// Read the input stream in chunks and update the digest
92+
while ((bytesRead = is.read(buffer)) != -1) {
93+
md.update(buffer, 0, bytesRead);
94+
}
95+
96+
byte[] ret = md.digest();
97+
md.reset();
98+
synchronized (mds) {
99+
mds.push(md);
100+
}
101+
return ret;
102+
}
103+
}));
104+
}
105+
106+
Map<String, String> ret = new HashMap<>(entries.size());
107+
try {
108+
for (int i=0; i<entries.size(); i++)
109+
ret.put(entries.get(i), hashBytesToString(futures.get(i).get()));
110+
} catch (InterruptedException | ExecutionException e) {
111+
throw ExceptionUtils.asRuntimeException(e);
112+
}
113+
return ret;
114+
}
115+
116+
private static String hashBytesToString(byte[] hash) {
117+
BigInteger bigInt = new BigInteger(1,hash);
118+
String hashtext = bigInt.toString(16);
119+
// Now we need to zero pad it if you actually want the full 32 chars.
120+
while(hashtext.length() < 32 ){
121+
hashtext = "0"+hashtext;
122+
}
123+
return hashtext;
124+
}
125+
126+
}

src/main/java/scratch/kevin/nshm23/BranchAveragedHazardScriptWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public static void main(String[] args) throws IOException {
3535
// String baseDirName = "2023_11_16-nshm23_branches-randB-randSeg-NSHM23_v2-CoulombRupSet-DsrUni-TotNuclRate-NoRed-ThreshAvgIterRelGR";
3636
// String baseDirName = "2024_05_07-nshm23_branches-WUS_FM_v3-AvgSupraB-AvgSeg";
3737
// String baseDirName = "2024_07_31-prvi25_subduction_branches";
38-
String baseDirName = "2024_08_16-prvi25_crustal_subduction_combined_branches";
39-
// String baseDirName = "2024_08_16-prvi25_crustal_branches-dmSample5x";
40-
// String baseDirName = "2024_08_16-prvi25_subduction_branches";
38+
// String baseDirName = "2024_09_04-prvi25_crustal_subduction_combined_branches";
39+
// String baseDirName = "2024_09_04-prvi25_crustal_branches-dmSample5x";
40+
String baseDirName = "2024_09_04-prvi25_subduction_branches";
4141

4242
// String suffix = "true_mean";
4343
// String solFileName = "true_mean_solution.zip";
@@ -55,8 +55,8 @@ public static void main(String[] args) throws IOException {
5555
// String suffix = "ba_only-LARGE-true_pt_src";
5656
// String solFileName = "results_PRVI_SUB_FM_LARGE_branch_averaged_gridded.zip";
5757

58-
String suffix = "ba_only";
59-
String solFileName = "combined_branch_averaged_solution.zip";
58+
// String suffix = "ba_only";
59+
// String solFileName = "combined_branch_averaged_solution.zip";
6060

6161
// String suffix = "ba_only";
6262
// String solFileName = "results_PRVI_CRUSTAL_FM_V1p1_branch_averaged_gridded.zip";
@@ -68,8 +68,8 @@ public static void main(String[] args) throws IOException {
6868
// String suffix = "ba_only-INTERFACE_only";
6969
// String solFileName = "results_PRVI_INTERFACE_ONLY_branch_averaged_gridded.zip";
7070

71-
// String suffix = "ba_only-both_fms";
72-
// String solFileName = "results_PRVI_SUB_FMs_combined_branch_averaged_gridded.zip";
71+
String suffix = "ba_only-both_fms";
72+
String solFileName = "results_PRVI_SUB_FMs_combined_branch_averaged_gridded.zip";
7373

7474
boolean noMFDs = false;
7575

src/main/java/scratch/kevin/nshm23/DevinSlipRateCSV.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ public class DevinSlipRateCSV {
3232
public static void main(String[] args) throws IOException {
3333
File solDir = new File("/home/kevin/OpenSHA/nshm23/batch_inversions/"
3434
// + "2023_04_11-nshm23_branches-NSHM23_v2-CoulombRupSet-TotNuclRate-NoRed-ThreshAvgIterRelGR/");
35-
+ "2023_06_29-nshm23_branches-NSHM23_v2-CoulombRupSet-DsrTap-TotNuclRate-NoRed-ThreshAvgIterRelGR/");
35+
// + "2023_06_29-nshm23_branches-NSHM23_v2-CoulombRupSet-DsrTap-TotNuclRate-NoRed-ThreshAvgIterRelGR/");
36+
+ "2024_02_02-nshm23_branches-WUS_FM_v3/");
3637
// File solFile = new File(solDir, "results_NSHM23_v2_CoulombRupSet_branch_averaged_gridded.zip");
37-
File solFile = new File(solDir, "results_NSHM23_v2_CoulombRupSet_branch_averaged.zip");
38+
// File solFile = new File(solDir, "results_NSHM23_v2_CoulombRupSet_branch_averaged.zip");
39+
File solFile = new File(solDir, "results_WUS_FM_v3_branch_averaged.zip");
3840
FaultSystemSolution sol = FaultSystemSolution.load(solFile);
3941

4042
SolutionSlipRates solSlips = sol.requireModule(SolutionSlipRates.class);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package scratch.kevin.nshm23;
2+
3+
import java.io.BufferedReader;
4+
import java.io.BufferedWriter;
5+
import java.io.File;
6+
import java.io.IOException;
7+
import java.io.InputStream;
8+
import java.io.InputStreamReader;
9+
import java.io.OutputStreamWriter;
10+
11+
import org.opensha.commons.util.io.archive.ArchiveInput;
12+
import org.opensha.commons.util.io.archive.ArchiveOutput;
13+
import org.opensha.sha.earthquake.faultSysSolution.modules.AbstractLogicTreeModule;
14+
15+
public class HazardZipLogicTreeSegModelRename {
16+
17+
public static void main(String[] args) throws IOException {
18+
File inputFile = new File(args[0]);
19+
File outputFile = new File(args[1]);
20+
21+
ArchiveInput.ApacheZipFileInput input = new ArchiveInput.ApacheZipFileInput(inputFile);
22+
ArchiveOutput.ApacheZipFileOutput output = new ArchiveOutput.ApacheZipFileOutput(outputFile);
23+
24+
for (String entry : input.getEntries()) {
25+
if (entry.equals(AbstractLogicTreeModule.LOGIC_TREE_FILE_NAME) || entry.equals(AbstractLogicTreeModule.LOGIC_TREE_MAPPINGS_FILE_NAME)) {
26+
System.out.println("Translating "+entry);
27+
InputStream is = input.getInputStream(entry);
28+
BufferedReader bRead = new BufferedReader(new InputStreamReader(is));
29+
30+
output.putNextEntry(entry);
31+
32+
BufferedWriter bWrite = new BufferedWriter(new OutputStreamWriter(output.getOutputStream()));
33+
34+
String line;
35+
while ((line = bRead.readLine()) != null) {
36+
bWrite.write(replace(line));
37+
bWrite.write('\n');
38+
}
39+
40+
bWrite.flush();
41+
output.closeEntry();
42+
bRead.close();
43+
} else {
44+
String modName = replace(entry);
45+
// System.out.println("Copying "+entry+" to "+modName);
46+
output.transferFrom(input, entry, modName);
47+
}
48+
}
49+
50+
output.close();
51+
input.close();
52+
}
53+
54+
private static String replace(String str) {
55+
return str.replaceAll("HighSeg", "High").replaceAll("MidSeg", "Middle").replaceAll("LowSeg", "Low");
56+
}
57+
58+
}

src/main/java/scratch/kevin/nshm23/LogicTreeMisfitPageGen.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensha.commons.util.DataUtils.MinMaxAveTracker;
4141
import org.opensha.commons.util.MarkdownUtils.TableBuilder;
4242
import org.opensha.commons.util.cpt.CPT;
43+
import org.opensha.commons.util.io.archive.ArchiveInput;
4344
import org.opensha.commons.util.modules.AverageableModule.AveragingAccumulator;
4445
import org.opensha.commons.util.modules.helpers.FileBackedModule;
4546
import org.opensha.sha.earthquake.faultSysSolution.hazard.LogicTreeCurveAverager;
@@ -1048,10 +1049,10 @@ public static Map<LogicTreeBranch<?>, InversionMisfitStats> loadBranchMisfits(Fi
10481049

10491050
public static Map<LogicTreeBranch<?>, InversionMisfitStats> loadBranchMisfits(File resultsFile, LogicTree<?> tree)
10501051
throws IOException {
1051-
ZipFile zip = new ZipFile(resultsFile);
1052+
ArchiveInput input = new ArchiveInput.ZipFileInput(resultsFile);
10521053

10531054
if (tree == null) {
1054-
BufferedInputStream logicTreeIS = FileBackedModule.getInputStream(zip, "solution_logic_tree/", "logic_tree.json");
1055+
BufferedInputStream logicTreeIS = FileBackedModule.getInputStream(input, "solution_logic_tree/", "logic_tree.json");
10551056
Gson gson = new GsonBuilder().registerTypeAdapter(LogicTree.class, new LogicTree.Adapter<>()).create();
10561057
InputStreamReader reader = new InputStreamReader(logicTreeIS);
10571058
tree = gson.fromJson(reader, LogicTree.class);
@@ -1066,16 +1067,15 @@ public static Map<LogicTreeBranch<?>, InversionMisfitStats> loadBranchMisfits(Fi
10661067
}
10671068
entryName += InversionMisfitStats.MISFIT_STATS_FILE_NAME;
10681069
// System.out.println("Loading "+entryName);
1069-
ZipEntry entry = zip.getEntry(entryName);
1070-
Preconditions.checkNotNull(entry, "Entry not found: %s", entryName);
1070+
Preconditions.checkNotNull(input.hasEntry(entryName), "Entry not found: %s", entryName);
10711071

1072-
CSVFile<String> csv = CSVFile.readStream(zip.getInputStream(entry), true);
1072+
CSVFile<String> csv = CSVFile.readStream(input.getInputStream(entryName), true);
10731073
InversionMisfitStats stats = new InversionMisfitStats(null);
10741074
stats.initFromCSV(csv);
10751075
ret.put(branch, stats);
10761076
}
10771077

1078-
zip.close();
1078+
input.close();
10791079
return ret;
10801080
}
10811081

src/main/java/scratch/kevin/nshm23/MPJ_LogicTreeInversionRunnerScriptWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,7 @@ else if (mpjWrite instanceof FastMPJShellScriptWriter)
985985
String randLTPath = dirPath+"/logic_tree_full_gridded_sampled.json";
986986
argz += " --write-full-tree "+fullLTPath;
987987
argz += " --write-rand-tree "+randLTPath+" --num-samples-per-sol 5";
988+
argz += " --slt-min-mag 5";
988989
String onlyLTPath;
989990
if (allLevelsAffected) {
990991
onlyLTPath = fullLTPath;
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package scratch.kevin.nshm23;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Random;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.stream.Collectors;
12+
13+
import org.opensha.commons.util.io.archive.ArchiveInput;
14+
import org.opensha.commons.util.io.archive.ArchiveOutput;
15+
16+
import com.google.common.base.Preconditions;
17+
import com.google.common.base.Stopwatch;
18+
19+
import scratch.kevin.ZipMD5Compare;
20+
21+
public class ParallelZipTests {
22+
23+
public static void main(String[] args) throws IOException {
24+
File inFile = new File("/home/kevin/OpenSHA/nshm23/batch_inversions/2024_02_02-nshm23_branches-WUS_FM_v3/results.zip");
25+
// File inFile = new File("/home/kevin/OpenSHA/nshm23/batch_inversions/2024_02_02-nshm23_branches-WUS_FM_v3/results_gridded_branches_simplified.zip");
26+
ArchiveInput input = new ArchiveInput.ZipFileInput(inFile); // don't use apache, we want to de/recompress for this test
27+
// ModuleArchiveOutput output = new ModuleArchiveOutput.ZipFileOutput(new File("/tmp/benchmark_output-java_zip.zip"));
28+
// ArchiveOutput output = new ArchiveOutput.ApacheZipFileOutput(new File("/tmp/benchmark_output-apache_zip.zip"));
29+
// ArchiveOutput output = new ArchiveOutput.ParallelZipFileOutput(new File("/tmp/benchmark_output-apache_parallel_zip.zip"), 1, true);
30+
ArchiveOutput output = new ArchiveOutput.AsynchronousZipFileOutput(new File("/tmp/benchmark_output-apache_async_zip.zip"));
31+
32+
System.out.println("Pre-reading and calculating input MD5s");
33+
ExecutorService exec = Executors.newFixedThreadPool(8);
34+
Map<String, String> inputHashes = ZipMD5Compare.loadCalcMD5s(input, exec);
35+
36+
Stopwatch watch = Stopwatch.createStarted();
37+
if (output instanceof ArchiveOutput.ParallelZipFileOutput)
38+
((ArchiveOutput.ParallelZipFileOutput)output).setTrackBlockingTimes(true);
39+
List<String> entries = input.entryStream().collect(Collectors.toList());
40+
int numDone = 0;
41+
Random rand = new Random();
42+
for (String entry : entries) {
43+
int method = rand.nextInt(4);
44+
System.out.println("Processing "+entry+"\t[method "+method+"]");
45+
// try all of the different methods randomly
46+
switch (method) {
47+
case 0:
48+
output.transferFrom(input.getInputStream(entry), entry);
49+
break;
50+
case 1:
51+
output.transferFrom(input, entry);
52+
break;
53+
case 2:
54+
output.transferFrom(input, entry, entry);
55+
break;
56+
case 3:
57+
output.putNextEntry(entry);
58+
input.getInputStream(entry).transferTo(output.getOutputStream());
59+
output.closeEntry();
60+
break;
61+
62+
default:
63+
throw new IllegalStateException();
64+
}
65+
numDone++;
66+
double secs = watch.elapsed(TimeUnit.MILLISECONDS)/1000d;
67+
double secsEach = secs/(double)numDone;
68+
double writePerSec = (double)numDone/secs;
69+
String str = numDone+"/"+entries.size()+" done in "+(float)secs+" s; "+(float)secsEach+" s/write; "+(float)writePerSec+" write/s";
70+
if (output instanceof ArchiveOutput.ParallelZipFileOutput)
71+
str += "\t"+((ArchiveOutput.ParallelZipFileOutput)output).getBlockingTimeStats();
72+
System.out.println(str);
73+
}
74+
75+
output.close();
76+
input.close();
77+
watch.stop();
78+
ArchiveInput input2 = output.getCompletedInput();
79+
System.out.println("Calculating output MD5s");
80+
Map<String, String> outputHashes = ZipMD5Compare.loadCalcMD5s(input2, exec);
81+
exec.shutdown();
82+
for (String entry : entries) {
83+
Preconditions.checkState(outputHashes.containsKey(entry));
84+
String inputHash = inputHashes.get(entry);
85+
String outputHash = outputHashes.get(entry);
86+
Preconditions.checkState(inputHash.equals(outputHash),
87+
"Bad hash for %s: %s != %s", entry, inputHash, outputHash);
88+
}
89+
System.out.println("Validated "+entries.size()+" entries!");
90+
input2.close();
91+
}
92+
93+
}

src/main/java/scratch/kevin/prvi25/InterfaceSlabOnlyBASolWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
public class InterfaceSlabOnlyBASolWriter {
2222

2323
public static void main(String[] args) throws IOException {
24-
File dir = new File("/home/kevin/OpenSHA/nshm23/batch_inversions/2024_08_16-prvi25_subduction_branches");
24+
File dir;
25+
if (args.length == 0)
26+
dir = new File("/home/kevin/OpenSHA/nshm23/batch_inversions/2024_09_04-prvi25_subduction_branches");
27+
else
28+
dir = new File(args[0]);
2529
File largeInputFile = new File(dir, "results_PRVI_SUB_FM_LARGE_branch_averaged_gridded.zip");
2630
File smallInputFile = new File(dir, "results_PRVI_SUB_FM_SMALL_branch_averaged_gridded.zip");
2731
File slabOutputFile = new File(dir, "results_PRVI_SLAB_ONLY_branch_averaged_gridded.zip");

0 commit comments

Comments
 (0)