Skip to content

Commit

Permalink
Use virtual threads for I/O bound operations
Browse files Browse the repository at this point in the history
This commit introduces the use of virtual threads for short-lived
I/O bound operations. More in details virtual threads are used in
place of the usual thread pool for:
- PairingInboundStore: use a virtual thread pool to creare a CompletableFuture
  to await the completation of a remote command
- HttpClientFactory: use a virtual thread for HttpClient operations
- TimedInputStream: use a virtual thread pool to read the target input
  stream within a fixed amount of time.

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Nov 25, 2023
1 parent 516cca5 commit 70db490
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ micronaut {
//
jib {
from {
image = 'cr.seqera.io/public/nf-jdk:corretto-17.0.9-jemalloc'
image = 'cr.seqera.io/public/nf-jdk:corretto-21.0.1-jemalloc'
platforms {
platform { architecture = 'amd64'; os = 'linux' }
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.concurrent.Executors

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.util.CustomThreadFactory
/**
* Java HttpClient factory
*
Expand All @@ -35,7 +34,7 @@ import io.seqera.wave.util.CustomThreadFactory
@CompileStatic
class HttpClientFactory {

static private ExecutorService threadPool = Executors.newCachedThreadPool(new CustomThreadFactory("HttpClientThread"))
static private ExecutorService threadPool = Executors.newVirtualThreadPerTaskExecutor()

static private Duration timeout = Duration.ofSeconds(20)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package io.seqera.wave.service.data.future

import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException

import groovy.transform.CompileStatic
Expand Down Expand Up @@ -50,9 +52,12 @@ abstract class AbstractFutureStore<V> implements FutureStore<String,V> {
@Value('${wave.pairing.channel.awaitTimeout:100ms}')
private Duration pollInterval

private ExecutorService virtPool

AbstractFutureStore(FutureHash<String> store, EncodingStrategy<V> encodingStrategy) {
this.store = store
this.encodingStrategy = encodingStrategy
this.virtPool = Executors.newVirtualThreadPerTaskExecutor()
}

abstract String prefix()
Expand Down Expand Up @@ -85,7 +90,7 @@ abstract class AbstractFutureStore<V> implements FutureStore<String,V> {
// sleep for a while
sleep(pollInterval.toMillis())
}
})
}, virtPool) // <-- use a virtual thread pool
}

/**
Expand Down
14 changes: 10 additions & 4 deletions src/main/groovy/io/seqera/wave/util/TimedInputStream.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package io.seqera.wave.util

import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

import groovy.transform.CompileStatic
import io.seqera.wave.core.RoutePath
import io.seqera.wave.exception.UnexpectedReadException

/**
* An input stream filter that implements a timeout mechanism on the reading
* operation over the target stream.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
Expand All @@ -39,18 +42,21 @@ class TimedInputStream extends FilterInputStream {

private final RoutePath route

private final ExecutorService executor

private volatile boolean closed

TimedInputStream(InputStream inputStream, Duration timeout, RoutePath route) {
super(inputStream)
this.target = inputStream
this.timeoutMillis = (int)timeout.toMillis()
this.route = route
this.executor = Executors.newVirtualThreadPerTaskExecutor()
}

@Override
int read() throws IOException {
final result = CompletableFuture<Integer>.supplyAsync(() -> target.read())
final result = executor.submit((Callable<Integer>)(() -> target.read()))
try {
return result.get(timeoutMillis, TimeUnit.MILLISECONDS)
}
Expand All @@ -62,7 +68,7 @@ class TimedInputStream extends FilterInputStream {

@Override
int read(byte[] b, int off, int len) throws IOException {
final result = CompletableFuture<Integer>.supplyAsync(() -> target.read(b,off,len))
final result = executor.submit((Callable<Integer>)(() -> target.read(b,off,len)))
try {
return result.get(timeoutMillis, TimeUnit.MILLISECONDS)
}
Expand Down

0 comments on commit 70db490

Please sign in to comment.