Skip to content

Commit 05c810f

Browse files
committed
fix #4154: adding a callback for stream consumption
1 parent 0b89978 commit 05c810f

File tree

22 files changed

+234
-369
lines changed

22 files changed

+234
-369
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.fabric8.kubernetes.client;
18+
19+
import java.io.OutputStream;
20+
import java.io.PipedInputStream;
21+
import java.io.PipedOutputStream;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.Channels;
24+
import java.nio.channels.WritableByteChannel;
25+
import java.util.concurrent.CompletionStage;
26+
27+
public interface StreamConsumer {
28+
29+
public static StreamConsumer newStreamConsumer(OutputStream os) {
30+
if (os == null) {
31+
return null;
32+
}
33+
checkForPiped(os);
34+
return newBlockingStreamConsumer(Channels.newChannel(os));
35+
}
36+
37+
public static StreamConsumer newBlockingStreamConsumer(WritableByteChannel channel) {
38+
if (channel == null) {
39+
return null;
40+
}
41+
return buffer -> {
42+
int remaining = buffer.remaining();
43+
if (channel.write(buffer) != remaining) {
44+
throw new KubernetesClientException("Unsucessful blocking write");
45+
}
46+
return null;
47+
};
48+
}
49+
50+
public static void checkForPiped(Object object) {
51+
if (object instanceof PipedOutputStream || object instanceof PipedInputStream) {
52+
throw new KubernetesClientException("Piped streams should not be used");
53+
}
54+
}
55+
56+
/**
57+
* A callback for consuming a stream as a series of {@link ByteBuffer}s
58+
*
59+
* @param buffer
60+
* @return a {@link CompletionStage} that is completed when the buffer has been fully consumed,
61+
* or null if it was already consumed
62+
* @throws Exception
63+
*/
64+
CompletionStage<?> consume(ByteBuffer buffer) throws Exception;
65+
66+
}

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.fabric8.kubernetes.client.dsl;
1818

19+
import io.fabric8.kubernetes.client.StreamConsumer;
20+
1921
import java.io.InputStream;
2022
import java.io.OutputStream;
2123
import java.io.PipedOutputStream;
@@ -70,7 +72,11 @@ public interface Loggable {
7072
* @param out {@link OutputStream} for storing logs
7173
* @return returns a Closeable interface for log watch
7274
*/
73-
LogWatch watchLog(OutputStream out);
75+
default LogWatch watchLog(OutputStream out) {
76+
return watchLog(StreamConsumer.newStreamConsumer(out), true);
77+
}
78+
79+
LogWatch watchLog(StreamConsumer consumer, boolean blocking);
7480

7581
/**
7682
* While waiting for Pod logs, how long shall we wait until a Pod

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.fabric8.kubernetes.client.dsl;
1717

18+
import io.fabric8.kubernetes.client.StreamConsumer;
19+
1820
import java.io.InputStream;
1921
import java.io.OutputStream;
2022
import java.io.PipedOutputStream;
@@ -27,7 +29,11 @@ public interface TtyExecErrorable extends
2729
* <p>
2830
* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingError()} instead
2931
*/
30-
TtyExecErrorChannelable writingError(OutputStream in);
32+
default TtyExecErrorChannelable writingError(OutputStream in) {
33+
return writingError(StreamConsumer.newStreamConsumer(in), true);
34+
}
35+
36+
TtyExecErrorChannelable writingError(StreamConsumer consumer, boolean blocking);
3137

3238
/**
3339
* If the {@link ExecWatch} should terminate when a stdErr message is received.

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.fabric8.kubernetes.client.dsl;
1717

18+
import io.fabric8.kubernetes.client.StreamConsumer;
19+
1820
import java.io.InputStream;
1921
import java.io.OutputStream;
2022
import java.io.PipedOutputStream;
@@ -27,7 +29,11 @@ public interface TtyExecOutputErrorable extends
2729
* <p>
2830
* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingOutput()} instead
2931
*/
30-
TtyExecErrorable writingOutput(OutputStream in);
32+
default TtyExecErrorable writingOutput(OutputStream in) {
33+
return writingOutput(StreamConsumer.newStreamConsumer(in), true);
34+
}
35+
36+
TtyExecErrorable writingOutput(StreamConsumer consumer, boolean blocking);
3137

3238
/**
3339
* Will provide an {@link InputStream} via {@link ExecWatch#getOutput()}

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ public static ByteBuffer copy(ByteBuffer buffer) {
7676

7777
/**
7878
* Very rudimentary method to check if the provided ByteBuffer contains text.
79-
*
79+
*
8080
* @return true if the buffer contains text, false otherwise.
8181
*/
8282
public static boolean isPlainText(ByteBuffer originalBuffer) {
8383
if (originalBuffer == null) {
8484
return false;
8585
}
86-
final ByteBuffer buffer = copy(originalBuffer);
86+
final ByteBuffer buffer = originalBuffer.asReadOnlyBuffer();
8787
final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
8888
try {
8989
decoder.decode(buffer);

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ default void onMessage(WebSocket webSocket, String text) {
4242
/**
4343
* Called once the full binary message has been built. {@link WebSocket#request()} must
4444
* be called to receive more messages.
45+
*
46+
* @param bytes which will not further used nor modified by the {@link HttpClient}
4547
*/
4648
default void onMessage(WebSocket webSocket, ByteBuffer bytes) {
4749
webSocket.request();

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.fabric8.kubernetes.api.model.Status;
2121
import io.fabric8.kubernetes.api.model.StatusCause;
2222
import io.fabric8.kubernetes.client.KubernetesClientException;
23+
import io.fabric8.kubernetes.client.StreamConsumer;
2324
import io.fabric8.kubernetes.client.dsl.ExecListener;
2425
import io.fabric8.kubernetes.client.dsl.ExecListener.Response;
2526
import io.fabric8.kubernetes.client.dsl.ExecWatch;
@@ -37,8 +38,6 @@
3738
import java.io.InputStream;
3839
import java.io.OutputStream;
3940
import java.nio.ByteBuffer;
40-
import java.nio.channels.Channels;
41-
import java.nio.channels.WritableByteChannel;
4241
import java.nio.charset.StandardCharsets;
4342
import java.util.Arrays;
4443
import java.util.HashMap;
@@ -51,6 +50,7 @@
5150
import java.util.concurrent.Executors;
5251
import java.util.concurrent.atomic.AtomicBoolean;
5352
import java.util.concurrent.atomic.AtomicReference;
53+
import java.util.function.Consumer;
5454

5555
/**
5656
* A {@link WebSocket.Listener} for exec operations.
@@ -106,7 +106,7 @@ public ListenerStream(String name) {
106106
this.name = name;
107107
}
108108

109-
private void handle(ByteBuffer byteString, WebSocket webSocket) throws IOException {
109+
private void handle(ByteBuffer byteString, WebSocket webSocket) throws Exception {
110110
if (handler != null) {
111111
if (LOGGER.isDebugEnabled()) {
112112
LOGGER.debug("exec message received {} bytes on channel {}", byteString.remaining(), name);
@@ -178,37 +178,46 @@ private ListenerStream createStream(String name, StreamContext streamContext) {
178178
if (streamContext == null) {
179179
return stream;
180180
}
181-
OutputStream os = streamContext.getOutputStream();
182-
if (os == null) {
181+
StreamConsumer consumer = streamContext.getConsumer();
182+
if (consumer == null) {
183183
// redirecting
184184
stream.inputStream = new ExecWatchInputStream(() -> this.webSocketRef.get().request());
185185
this.exitCode.whenComplete(stream.inputStream::onExit);
186186
stream.handler = b -> stream.inputStream.consume(Arrays.asList(b));
187187
} else {
188-
WritableByteChannel channel = Channels.newChannel(os);
189-
stream.handler = b -> asyncWrite(channel, b);
188+
stream.handler = b -> consume(consumer, b, streamContext.isBlocking() ? serialExecutor : Runnable::run,
189+
this::postConsume);
190190
}
191191
return stream;
192192
}
193193

194-
private void asyncWrite(WritableByteChannel channel, ByteBuffer b) {
195-
CompletableFuture.runAsync(() -> {
194+
public static void consume(StreamConsumer consumer, ByteBuffer bytes, Executor executor, Consumer<Throwable> postConsume) {
195+
CompletableFuture.supplyAsync(() -> {
196196
try {
197-
channel.write(b);
198-
} catch (IOException e) {
197+
return consumer.consume(bytes);
198+
} catch (Exception e) {
199199
throw KubernetesClientException.launderThrowable(e);
200200
}
201-
}, serialExecutor).whenComplete((v, t) -> {
202-
webSocketRef.get().request();
203-
if (t != null) {
204-
if (closed.get()) {
205-
LOGGER.debug("Stream write failed after close", t);
206-
} else {
207-
// This could happen if the user simply closes their stream prior to completion
208-
LOGGER.warn("Stream write failed", t);
209-
}
201+
}, executor)
202+
.whenComplete((cs, t) -> {
203+
if (cs != null) {
204+
cs.whenComplete((v, t1) -> postConsume.accept(t1));
205+
} else {
206+
postConsume.accept(t);
207+
}
208+
});
209+
}
210+
211+
private void postConsume(Throwable t) {
212+
webSocketRef.get().request();
213+
if (t != null) {
214+
if (closed.get()) {
215+
LOGGER.debug("Stream write failed after close", t);
216+
} else {
217+
// This could happen if the user simply closes their stream prior to completion
218+
LOGGER.warn("Stream write failed", t);
210219
}
211-
});
220+
}
212221
}
213222

214223
@Override
@@ -299,6 +308,7 @@ public void onError(WebSocket webSocket, Throwable t, boolean connectionError) {
299308
@Override
300309
public void onMessage(WebSocket webSocket, String text) {
301310
LOGGER.debug("Exec Web Socket: onMessage(String)");
311+
// this is unexpected and will likely just result in an exception
302312
onMessage(webSocket, ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
303313
}
304314

@@ -337,7 +347,7 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) {
337347
default:
338348
throw new IOException("Unknown stream ID " + streamID);
339349
}
340-
} catch (IOException e) {
350+
} catch (Exception e) {
341351
throw KubernetesClientException.launderThrowable(e);
342352
} finally {
343353
if (close) {

0 commit comments

Comments
 (0)