Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import de.gesellix.docker.remote.api.core.ServerError
import de.gesellix.docker.remote.api.core.ServerException
import de.gesellix.docker.remote.api.core.StreamCallback
import de.gesellix.docker.remote.api.core.Success
import de.gesellix.docker.remote.api.core.SuccessBidirectionalStream
import de.gesellix.docker.remote.api.core.SuccessStream
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -239,11 +240,6 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
) {
val localVariableConfig = execStartRequestConfig(id = id, execStartConfig = execStartConfig)

val expectMultiplexedResponse: Boolean = if (execStartConfig?.tty != null) {
!(execStartConfig.tty ?: false)
} else {
!(execInspect(id).processConfig?.tty ?: false)
}
val localVarResponse = requestFrames(
localVariableConfig
)
Expand All @@ -256,18 +252,32 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
val actualCallback = callback ?: LoggingCallback<Frame?>()

when (localVarResponse.responseType) {
ResponseType.Success -> {
runBlocking {
launch {
withTimeoutOrNull(timeout.toMillis()) {
actualCallback.onStarting(this@launch::cancel)
(localVarResponse as SuccessStream<Frame>).data.collect { actualCallback.onNext(it) }
actualCallback.onFinished()
ResponseType.Success,
ResponseType.Informational -> {
when (localVarResponse) {
is SuccessBidirectionalStream ->
runBlocking {
launch {
withTimeoutOrNull(timeout.toMillis()) {
actualCallback.onStarting(this@launch::cancel)
actualCallback.attachInput(localVarResponse.socket.sink)
localVarResponse.data.collect { actualCallback.onNext(it) }
actualCallback.onFinished()
}
}
}
else ->
runBlocking {
launch {
withTimeoutOrNull(timeout.toMillis()) {
actualCallback.onStarting(this@launch::cancel)
(localVarResponse as SuccessStream<Frame>).data.collect { actualCallback.onNext(it) }
actualCallback.onFinished()
}
}
}
}
}
}
ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.")
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
ResponseType.ClientError -> {
val localVarError = localVarResponse as ClientError<*>
Expand All @@ -292,6 +302,18 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
val localVariableQuery: MultiValueMap = mutableMapOf()
val localVariableHeaders: MutableMap<String, String> = mutableMapOf()

// val expectMultiplexedResponse: Boolean = if (execStartConfig?.tty != null) {
// !(execStartConfig.tty ?: false)
// } else {
// !(execInspect(id).processConfig?.tty ?: false)
// }
val requiresConnectionUpgrade = execStartConfig?.tty != null && execStartConfig.tty!!
if (requiresConnectionUpgrade)
localVariableHeaders.apply {
put("Connection", "Upgrade")
put("Upgrade", "tcp")
}

return RequestConfig(
method = POST,
path = "/exec/{id}/start".replace("{" + "id" + "}", id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,38 @@
import de.gesellix.docker.remote.api.ExecInspectResponse;
import de.gesellix.docker.remote.api.ExecStartConfig;
import de.gesellix.docker.remote.api.IdResponse;
import de.gesellix.docker.remote.api.core.Frame;
import de.gesellix.docker.remote.api.testutil.DockerEngineAvailable;
import de.gesellix.docker.remote.api.testutil.InjectDockerClient;
import de.gesellix.docker.remote.api.testutil.TestImage;
import okio.BufferedSink;
import okio.Okio;
import okio.Sink;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static de.gesellix.docker.remote.api.testutil.Constants.LABEL_KEY;
import static de.gesellix.docker.remote.api.testutil.Constants.LABEL_VALUE;
import static de.gesellix.docker.remote.api.testutil.Failsafe.perform;
import static de.gesellix.docker.remote.api.testutil.Failsafe.removeContainer;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.time.Duration;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

@DockerEngineAvailable
class ExecApiIntegrationTest {
Expand Down Expand Up @@ -83,4 +102,107 @@ public void containerExecNonInteractive() {

removeContainer(engineApiClient, "container-exec-test");
}

@Test
// see https://github.com/square/okhttp/pull/9159
@Disabled("Pending OkHttp to support upgrade requests with non-empty body")
public void containerExecInteractive() {
removeContainer(engineApiClient, "container-exec-interactive-test");

imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null);

ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest(
null, null, null,
true, true, true,
null,
true, true, null,
null,
null,
null,
null,
testImage.getImageWithTag(),
null, null, null,
null, null,
null,
singletonMap(LABEL_KEY, LABEL_VALUE),
null, null,
null,
null,
null
);
containerApi.containerCreate(containerCreateRequest, "container-exec-interactive-test");
containerApi.containerStart("container-exec-interactive-test", null);

IdResponse exec = execApi.containerExec(
"container-exec-interactive-test",
new ExecConfig(true, true, true, null, null, true,
null,
singletonList("/cat"),
null, null, null));
assertNotNull(exec.getId());

Duration timeout = Duration.of(5, SECONDS);
LogFrameStreamCallback callback = new LogFrameStreamCallback() {
@Override
public void attachInput(Sink sink) {
System.out.println("attachInput, sending data...");
new Thread(() -> {
BufferedSink buffer = Okio.buffer(sink);
try {
buffer.writeUtf8("hello echo\n");
buffer.flush();
System.out.println("... data sent");
} catch (IOException e) {
e.printStackTrace();
System.err.println("Failed to write to stdin: " + e.getMessage());
} finally {
try {
Thread.sleep(100);
sink.close();
} catch (Exception ignored) {
// ignore
}
}
}).start();
}
};

new Thread(() -> execApi.execStart(
exec.getId(),
new ExecStartConfig(false, true, null),
callback, timeout.toMillis())).start();

CountDownLatch wait = new CountDownLatch(1);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (callback.job != null) {
callback.job.cancel();
}
wait.countDown();
}
}, 5000);

try {
wait.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}

ExecInspectResponse execInspect = execApi.execInspect(exec.getId());
assertTrue(execInspect.getRunning());

assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType());
assertEquals(
"hello echo\nhello echo".replaceAll("[\\n\\r]", ""),
callback.frames.stream().map(Frame::getPayloadAsString).collect(Collectors.joining()).replaceAll("[\\n\\r]", ""));

removeContainer(engineApiClient, "container-exec-interactive-test");

perform(() -> {
ExecInspectResponse execInspectAfterStop = execApi.execInspect(exec.getId());
assertFalse(execInspectAfterStop.getRunning());
});
}
}
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ subprojects {
repositories {
// mavenLocal()
// listOf<String>(
//// "gesellix/okhttp",
// "gesellix/okhttp",
//// "docker-client/*",
// ).forEach { slug ->
//// fun findProperty(s: String) = project.findProperty(s) as String?
Expand Down Expand Up @@ -71,6 +71,8 @@ allprojects {
}
}
// dependencySubstitution {
// substitute(module("com.squareup.okhttp3:okhttp"))
// .using(module("de.gesellix.okhttp3-forked:okhttp:${libs.versions.okhttp.get()}"))
// substitute(module("com.squareup.okhttp3:okhttp-jvm"))
// .using(module("de.gesellix.okhttp3-forked:okhttp-jvm:${libs.versions.okhttp.get()}"))
// }
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ okhttpMockwebserverJunit5 = { module = "com.squareup.okhttp3:mockwebserver3-juni
okio = { module = "com.squareup.okio:okio", version.ref = "okio" }
okioJvm = { module = "com.squareup.okio:okio-jvm", version.ref = "okio" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4jJul = { module = "org.slf4j:jul-to-slf4j", version.ref = "slf4j" }

[bundles]
kotlin = ["kotlin", "kotlinCommon", "kotlinJdk7", "kotlinJdk8", "kotlinReflect", "kotlinScriptingJvm", "kotlinStdlib", "kotlinTest"]
Expand Down
Loading