From 582021e45d9441a907e4470029b8b559ce65f3ab Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Fri, 24 Nov 2023 13:14:09 -0800 Subject: [PATCH 1/2] fix: Fix PayloadProcessor response payload race condition (#120) Prevent incorrectly empty logged payloads: - Do not attempt to avoid slicing the response bytebuf in the case that a PayloadProcessor is configured - Do not attempt to avoid some additional refcount updates in the case status != OK Resolves #111 ----- Signed-off-by: Nick Hill --- .../ibm/watson/modelmesh/ModelMeshApi.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 715c0efe..ad91ac11 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -725,6 +725,7 @@ public void onHalfClose() { String vModelId = null; String requestId = null; ModelResponse response = null; + ByteBuf responsePayload = null; try (InterruptingListener cancelListener = newInterruptingListener()) { if (logHeaders != null) { logHeaders.addToMDC(headers); // MDC cleared in finally block @@ -767,18 +768,20 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, resolvedModelId, methodName, headers, null, true); + requestId, resolvedModelId, methodName, headers, null); } else { releaseReqMessage(); } reqMessage = null; // ownership released or transferred } - respReaderIndex = response.data.readerIndex(); respSize = response.data.readableBytes(); call.sendHeaders(response.metadata); + if (payloadProcessor != null) { + responsePayload = response.data.retainedSlice(); + } call.sendMessage(response.data); - // response is released via ReleaseAfterResponse.releaseAll() + // final response refcount is released via ReleaseAfterResponse.releaseAll() status = OK; } catch (Exception e) { status = toStatus(e); @@ -795,17 +798,13 @@ public void onHalfClose() { evictMethodDescriptor(methodName); } } finally { - final boolean releaseResponse = status != OK; if (payloadProcessor != null) { - ByteBuf data = null; - Metadata metadata = null; - if (response != null) { - data = response.data.readerIndex(respReaderIndex); - metadata = response.metadata; - } - processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse); - } else if (releaseResponse && response != null) { - response.release(); + Metadata metadata = response != null ? response.metadata : null; + processPayload(responsePayload, requestId, resolvedModelId, methodName, metadata, status); + } + if (status != OK && response != null) { + // An additional release is required if we call.sendMessage() wasn't sucessful + response.data.release(); } ReleaseAfterResponse.releaseAll(); clearThreadLocals(); @@ -820,23 +819,21 @@ public void onHalfClose() { } /** - * Invoke PayloadProcessor on the request/response data + * Invoke PayloadProcessor on the request/response data. This method takes ownership + * of the passed-in {@code ByteBuf}. + * * @param data the binary data * @param payloadId the id of the request * @param modelId the id of the model * @param methodName the name of the invoked method * @param metadata the method name metadata * @param status null for requests, non-null for responses - * @param takeOwnership whether the processor should take ownership */ private void processPayload(ByteBuf data, String payloadId, String modelId, String methodName, - Metadata metadata, io.grpc.Status status, boolean takeOwnership) { + Metadata metadata, io.grpc.Status status) { Payload payload = null; try { assert payloadProcessor != null; - if (!takeOwnership) { - ReferenceCountUtil.retain(data); - } payload = new Payload(payloadId, modelId, methodName, metadata, data, status); if (payloadProcessor.process(payload)) { data = null; // ownership transferred From eaa2fde7031b1bd2f2704b34a7a9767a7fdd988c Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Tue, 28 Nov 2023 11:10:14 -0800 Subject: [PATCH 2/2] feat: Add vModelId to PayloadProcessor Payload (#123) Add a vModelId field to the Payload class and correspondingly update built-in PayloadProcessor implementations where applicable. It may be null if the request was directed at a concrete modelId rather than a vModelId. ----- Signed-off-by: Nick Hill --- .../ibm/watson/modelmesh/ModelMeshApi.java | 8 +-- .../payload/MatchingPayloadProcessor.java | 49 ++++++++++++------- .../ibm/watson/modelmesh/payload/Payload.java | 20 ++++++++ .../payload/RemotePayloadProcessor.java | 21 ++++---- 4 files changed, 68 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index ad91ac11..14073fd0 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -768,7 +768,7 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, resolvedModelId, methodName, headers, null); + requestId, resolvedModelId, vModelId, methodName, headers, null); } else { releaseReqMessage(); } @@ -800,7 +800,7 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { Metadata metadata = response != null ? response.metadata : null; - processPayload(responsePayload, requestId, resolvedModelId, methodName, metadata, status); + processPayload(responsePayload, requestId, resolvedModelId, vModelId, methodName, metadata, status); } if (status != OK && response != null) { // An additional release is required if we call.sendMessage() wasn't sucessful @@ -825,11 +825,12 @@ public void onHalfClose() { * @param data the binary data * @param payloadId the id of the request * @param modelId the id of the model + * @param vModelId the id of the vModel * @param methodName the name of the invoked method * @param metadata the method name metadata * @param status null for requests, non-null for responses */ - private void processPayload(ByteBuf data, String payloadId, String modelId, String methodName, + private void processPayload(ByteBuf data, String payloadId, String modelId, String vModelId, String methodName, Metadata metadata, io.grpc.Status status) { Payload payload = null; try { @@ -1197,6 +1198,7 @@ public void getVModelStatus(GetVModelStatusRequest request, StreamObserver 0) { + if (!modelId.isEmpty()) { modelId = modelId.replaceFirst("/", ""); - if (modelId.length() == 0 || modelId.equals("*")) { + if (modelId.isEmpty() || modelId.equals("*")) { modelId = null; } } else { modelId = null; } } - if (method != null) { - if (method.length() == 0 || method.equals("*")) { - method = null; + if (vModelId != null) { + if (!vModelId.isEmpty()) { + vModelId = vModelId.replaceFirst("/", ""); + if (vModelId.isEmpty() || vModelId.equals("*")) { + vModelId = null; + } + } else { + vModelId = null; } } - return new MatchingPayloadProcessor(processor, method, modelId); + if (method != null && (method.isEmpty() || method.equals("*"))) { + method = null; + } + return new MatchingPayloadProcessor(processor, method, modelId, vModelId); } @Override diff --git a/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java b/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java index 9eed4367..6dcafd17 100644 --- a/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java +++ b/src/main/java/com/ibm/watson/modelmesh/payload/Payload.java @@ -39,6 +39,8 @@ public enum Kind { private final String modelId; + private final String vModelId; + private final String method; private final Metadata metadata; @@ -48,10 +50,17 @@ public enum Kind { // null for requests, non-null for responses private final Status status; + public Payload(@Nonnull String id, @Nonnull String modelId, @Nullable String method, @Nullable Metadata metadata, @Nullable ByteBuf data, @Nullable Status status) { + this(id, modelId, null, method, metadata, data, status); + } + + public Payload(@Nonnull String id, @Nonnull String modelId, @Nullable String vModelId, @Nullable String method, + @Nullable Metadata metadata, @Nullable ByteBuf data, @Nullable Status status) { this.id = id; this.modelId = modelId; + this.vModelId = vModelId; this.method = method; this.metadata = metadata; this.data = data; @@ -68,6 +77,16 @@ public String getModelId() { return modelId; } + @CheckForNull + public String getVModelId() { + return vModelId; + } + + @Nonnull + public String getVModelIdOrModelId() { + return vModelId != null ? vModelId : modelId; + } + @CheckForNull public String getMethod() { return method; @@ -101,6 +120,7 @@ public void release() { public String toString() { return "Payload{" + "id='" + id + '\'' + + ", vModelId=" + (vModelId != null ? ('\'' + vModelId + '\'') : "null") + ", modelId='" + modelId + '\'' + ", method='" + method + '\'' + ", status=" + (status == null ? "request" : String.valueOf(status)) + diff --git a/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java b/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java index 401fba2d..23c2fba1 100644 --- a/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java +++ b/src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java @@ -57,14 +57,10 @@ public boolean process(Payload payload) { private static PayloadContent prepareContentBody(Payload payload) { String id = payload.getId(); String modelId = payload.getModelId(); + String vModelId = payload.getVModelId(); String kind = payload.getKind().toString().toLowerCase(); ByteBuf byteBuf = payload.getData(); - String data; - if (byteBuf != null) { - data = encodeBinaryToString(byteBuf); - } else { - data = ""; - } + String data = byteBuf != null ? encodeBinaryToString(byteBuf) : ""; Metadata metadata = payload.getMetadata(); Map metadataMap = new HashMap<>(); if (metadata != null) { @@ -79,7 +75,7 @@ private static PayloadContent prepareContentBody(Payload payload) { } } String status = payload.getStatus() != null ? payload.getStatus().getCode().toString() : ""; - return new PayloadContent(id, modelId, data, kind, status, metadataMap); + return new PayloadContent(id, modelId, vModelId, data, kind, status, metadataMap); } private static String encodeBinaryToString(ByteBuf byteBuf) { @@ -116,15 +112,17 @@ private static class PayloadContent { private final String id; private final String modelid; + private final String vModelId; private final String data; private final String kind; private final String status; private final Map metadata; - private PayloadContent(String id, String modelid, String data, String kind, String status, - Map metadata) { + private PayloadContent(String id, String modelid, String vModelId, String data, String kind, + String status, Map metadata) { this.id = id; this.modelid = modelid; + this.vModelId = vModelId; this.data = data; this.kind = kind; this.status = status; @@ -143,6 +141,10 @@ public String getModelid() { return modelid; } + public String getvModelId() { + return vModelId; + } + public String getData() { return data; } @@ -160,6 +162,7 @@ public String toString() { return "PayloadContent{" + "id='" + id + '\'' + ", modelid='" + modelid + '\'' + + ", vModelId=" + (vModelId != null ? ('\'' + vModelId + '\'') : "null") + ", data='" + data + '\'' + ", kind='" + kind + '\'' + ", status='" + status + '\'' +