Skip to content

Commit

Permalink
Merge branch '4-add-java-examples' into 'master'
Browse files Browse the repository at this point in the history
Resolve "Add java examples"

Closes #4

See merge request !4
  • Loading branch information
Cristian Ramon-Cortes Vilarrodona committed Jul 25, 2019
2 parents cab7ba4 + 19f01f3 commit 0a5bead
Show file tree
Hide file tree
Showing 39 changed files with 1,891 additions and 32 deletions.
114 changes: 114 additions & 0 deletions examples/nested-hybrid-java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- GENERAL INFORMATION -->
<groupId>es.bsc.distro-stream-lib.examples</groupId>
<artifactId>nested-hybrid-java</artifactId>
<parent>
<groupId>es.bsc.distro-stream-lib</groupId>
<artifactId>examples</artifactId>
<version>1.0</version>
<relativePath>..</relativePath>
</parent>

<!-- PROJECT INFORMATION -->
<name>Nested Hybrid Example</name>
<description>Nested Hybrid Example in Java</description>
<url>http://compss.bsc.es</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
</license>
</licenses>
<organization>
<name>BSC</name>
<url>http://www.bsc.es</url>
</organization>

<!-- PROPERTIES -->
<properties>
</properties>

<!-- DEPENCENCIES -->
<dependencies>
<!-- JUnit test dependency -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<!-- Log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>provided</scope>
</dependency>

<!-- COMPSs -->
<dependency>
<groupId>es.bsc.compss</groupId>
<artifactId>compss-api</artifactId>
</dependency>

<!-- DistroStream Stream -->
<dependency>
<groupId>es.bsc.distro-stream-lib</groupId>
<artifactId>distro-stream</artifactId>
</dependency>
</dependencies>

<!-- BUILD -->
<build>
<plugins>
<!-- Normal Compilation -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>

<!-- Create JAR with dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}</finalName>
</configuration>
</execution>
</executions>
</plugin>

<!-- Add revision to manifest -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
<manifestEntries>
<Implementation-Build>$\{buildNumber}</Implementation-Build>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
48 changes: 48 additions & 0 deletions examples/nested-hybrid-java/run_scripts/run_streams.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash -e

# Define script constants
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

# Define customizable application constraints
export NUM_FILES=50

export RUNCOMPSS=$(which runcompss)

export NESTED_COMPUTING_NODES=1
export CORES_SENSOR=8
export CORES_BIG_FILTER=4
export CORES_EXTRACT=1
export CORES_TF=8

# Define application arguments
app_exec="mains.NestedHybrid"

sensor_num_files=${NUM_FILES}
sensor_base_sleep_time=300
sensor_sleep_random_range=50
filter_batch_size=10
filter_base_sleep_time=2000
filter_sleep_random_range=200
extract_base_sleep_time=500
extract_sleep_random_range=100
tf_depth=2
tf_base_sleep_time=1000
tf_sleep_random_range=50

# Run job
runcompss \
--project="${SCRIPT_DIR}/../xmls/project.xml" \
--resources="${SCRIPT_DIR}/../xmls/resources.xml" \
--classpath="${SCRIPT_DIR}/../target/nested-hybrid-java.jar" \
--jvm_workers_opts="-Dcompss.worker.removeWD=false" \
--streaming=OBJECTS \
--streaming_master_port=49049 \
-d \
-t \
-g \
--summary \
"${app_exec}" \
"${sensor_num_files}" "${sensor_base_sleep_time}" "${sensor_sleep_random_range}" \
"${filter_batch_size}" "${filter_base_sleep_time}" "${filter_sleep_random_range}" \
"${extract_base_sleep_time}" "${extract_sleep_random_range}" \
"${tf_depth}" "${tf_base_sleep_time}" "${tf_sleep_random_range}"
20 changes: 20 additions & 0 deletions examples/nested-hybrid-java/src/main/java/compss/NESTED.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package compss;

import java.util.List;

import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import mains.MyElement;


public class NESTED {

public static void nestedBigFilter(ObjectDistroStream<MyElement> odsSensor,
ObjectDistroStream<MyElement> odsFiltered, int batchSize, int sleepTime, int sleepRandomRange) {

}

public static Integer nestedTaskFlow(List<MyElement> elements, int depth, int sleepBaseTime, int sleepRandomRange) {
return -1;
}

}
95 changes: 95 additions & 0 deletions examples/nested-hybrid-java/src/main/java/filters/BigFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package filters;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import es.bsc.compss.api.COMPSs;
import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import es.bsc.distrostreamlib.exceptions.BackendException;
import es.bsc.distrostreamlib.exceptions.RegistrationException;
import es.bsc.distrostreamlib.types.ConsumerMode;
import mains.MyElement;


public class BigFilter {

private static final int TIME_BETWEEN_POLLS = 5_000; // ms


private static void processObjects(ObjectDistroStream<MyElement> odsSensor,
ObjectDistroStream<MyElement> odsFiltered, Queue<MyElement> pendingObjects, FilterArguments fargs,
boolean forced) throws BackendException {

// Poll objects
List<MyElement> newObjects = odsSensor.poll();
pendingObjects.addAll(newObjects);

// Send batch to execution if necessary
while (pendingObjects.size() > fargs.getBatchSize()) {
// Get batch and update pending objects
List<MyElement> batchObjects = new LinkedList<>();
for (int i = 0; i < fargs.getBatchSize(); ++i) {
MyElement next = pendingObjects.poll();
batchObjects.add(next);
}
// Launch task
System.out.println("[DEBUG] Launch filter task with " + batchObjects.size() + " elements");
BigFilterTasks.filterObjects(batchObjects, odsFiltered, fargs.getSleepBaseTime(),
fargs.getSleepRandomRange());
}

if (forced) {
// Spawn filter task even if we don't have a full batch
List<MyElement> batchObjects = (LinkedList<MyElement>) pendingObjects;
System.out.println("[DEBUG] Launch filter task with " + batchObjects.size() + " elements");
BigFilterTasks.filterObjects(batchObjects, odsFiltered, fargs.getSleepBaseTime(),
fargs.getSleepRandomRange());
pendingObjects.clear();
}
}

public static void main(String[] args) throws RegistrationException, BackendException, InterruptedException {
// Start application
System.out.println("[INFO] Starting application");
long start = System.currentTimeMillis();

// Parse arguments
System.out.println("[INFO] Parsing application arguments");
FilterArguments fargs = new FilterArguments(args);

// Initialize streams
System.out.println("[INFO] Initializing streams");
ObjectDistroStream<MyElement> odsSensor = new ObjectDistroStream<MyElement>(fargs.getAliasSensor(),
ConsumerMode.AT_MOST_ONCE);
ObjectDistroStream<MyElement> odsFiltered = new ObjectDistroStream<MyElement>(fargs.getAliasFiltered(),
ConsumerMode.AT_MOST_ONCE);

// Process input stream elements
System.out.println("[INFO] Processing input stream elements");
Queue<MyElement> pendingObjects = new LinkedList<>();
while (!odsSensor.isClosed()) {
// Poll files and send batch to execution if necessary
processObjects(odsSensor, odsFiltered, pendingObjects, fargs, false);

// Sleep between polls
Thread.sleep(TIME_BETWEEN_POLLS);
}

// Poll one last time (and force the task to execute)
processObjects(odsSensor, odsFiltered, pendingObjects, fargs, true);

// Synchronize
System.out.println("[INFO] Waiting for all batch tasks to finish");
COMPSs.barrier();

// Close stream (because this app is itself a task)
odsFiltered.close();

// End
System.out.println("DONE");
long end = System.currentTimeMillis();
long elapsedTime = end - start;
System.out.println("[TIME] TOTAL ELAPSED: " + elapsedTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package filters;

import java.util.List;

import es.bsc.compss.types.annotations.Constraints;
import es.bsc.compss.types.annotations.Parameter;
import es.bsc.compss.types.annotations.parameter.Direction;
import es.bsc.compss.types.annotations.parameter.Type;
import es.bsc.compss.types.annotations.task.Method;
import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import mains.MyElement;


public interface BigFilterItf {

@Constraints(computingUnits = "1")
@Method(declaringClass = "filters.BigFilterTasks")
void filterObjects(
@Parameter(type = Type.OBJECT, direction = Direction.IN) List<MyElement> pendingObjects,
@Parameter(type = Type.STREAM, direction = Direction.OUT) ObjectDistroStream<MyElement> odsFiltered,
@Parameter() int sleepBaseTime,
@Parameter() int sleepRandomRange
);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package filters;

import java.util.List;
import java.util.Random;

import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import es.bsc.distrostreamlib.exceptions.BackendException;
import mains.MyElement;


public class BigFilterTasks {

public static void filterObjects(List<MyElement> pendingObjects, ObjectDistroStream<MyElement> odsFiltered,
int sleepBaseTime, int sleepRandomRange) throws BackendException {

for (MyElement e : pendingObjects) {
// Print input object content
System.out.println("[DEBUG] Processing object " + e);

// Sleep to simulate time spent to process the object
try {
Random randomGenerator = new Random();
int randomInt = randomGenerator.nextInt(sleepRandomRange);
int sleepTime = sleepBaseTime + randomInt;
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}

// Generate filtered object
System.out.println("Generating filtered object");
MyElement out = new MyElement(e.getInputFiles());
odsFiltered.publish(out);
}

// Close filtered stream
odsFiltered.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package filters;

public class FilterArguments {

private final String aliasSensor;
private final String aliasFiltered;
private final int batchSize;
private final int sleepBaseTime;
private final int sleepRandomRange;


public FilterArguments(String[] args) {
assert (args.length == 5);

this.aliasSensor = args[0];
this.aliasFiltered = args[1];
this.batchSize = Integer.valueOf(args[2]);
this.sleepBaseTime = Integer.valueOf(args[3]);
this.sleepRandomRange = Integer.valueOf(args[4]);
}

public String getAliasSensor() {
return this.aliasSensor;
}

public String getAliasFiltered() {
return this.aliasFiltered;
}

public int getBatchSize() {
return this.batchSize;
}

public int getSleepBaseTime() {
return this.sleepBaseTime;
}

public int getSleepRandomRange() {
return this.sleepRandomRange;
}
}
Loading

0 comments on commit 0a5bead

Please sign in to comment.