From b63aef54b55dea06394f306ac18da211d67b76ea Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Dec 2024 09:53:29 -0800 Subject: [PATCH] Terminate state on error --- .../macro/RisingwaveAlterStreamMacro.java | 8 +-- .../macro/RisingwaveAlterZtableMacro.java | 12 ++-- .../macro/RisingwaveCreateFunctionMacro.java | 6 +- .../macro/RisingwaveCreateStreamMacro.java | 58 +++++-------------- .../macro/RisingwaveCreateZtableMacro.java | 23 ++++---- .../macro/RisingwaveCreateZviewMacro.java | 22 +++---- .../macro/RisingwaveDropStreamMacro.java | 11 ++-- .../macro/RisingwaveDropZtableMacro.java | 21 +++---- .../macro/RisingwaveDropZviewMacro.java | 14 ++--- .../internal/macro/RisingwaveMacroBase.java | 49 ++++++++++++++++ .../internal/macro/RisingwaveMacroState.java | 13 +++-- .../macro/RisingwaveOnReadyHandler.java | 24 -------- .../stream/RisingwaveProxyFactory.java | 2 +- 13 files changed, 120 insertions(+), 143 deletions(-) create mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroBase.java delete mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveOnReadyHandler.java diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterStreamMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterStreamMacro.java index c955de313b..919bfd561c 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterStreamMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterStreamMacro.java @@ -18,21 +18,19 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveAlterStreamMacro +public class RisingwaveAlterStreamMacro extends RisingwaveMacroBase { private final StringBuilder fieldBuilder; - private final String sql; private final Alter command; - private final RisingwaveMacroHandler handler; public RisingwaveAlterStreamMacro( String sql, Alter command, RisingwaveMacroHandler handler) { - this.sql = sql; + super(sql, handler); + this.command = command; - this.handler = handler; this.fieldBuilder = new StringBuilder(); } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterZtableMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterZtableMacro.java index df78166fa8..d5740d9699 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterZtableMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveAlterZtableMacro.java @@ -18,21 +18,19 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveAlterZtableMacro +public class RisingwaveAlterZtableMacro extends RisingwaveMacroBase { private final StringBuilder fieldBuilder; - private final String sql; private final Alter command; - private final RisingwaveMacroHandler handler; public RisingwaveAlterZtableMacro( String sql, Alter command, RisingwaveMacroHandler handler) { - this.sql = sql; + super(sql, handler); + this.command = command; - this.handler = handler; this.fieldBuilder = new StringBuilder(); } @@ -84,7 +82,8 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + + return errorState(); } } @@ -142,5 +141,4 @@ public RisingwaveMacroState onError( return this; } } - } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateFunctionMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateFunctionMacro.java index 1a88e83ce5..3459e9ac07 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateFunctionMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateFunctionMacro.java @@ -22,7 +22,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveCreateFunctionMacro +public class RisingwaveCreateFunctionMacro extends RisingwaveMacroBase { private final String javaServer; private final String pythonServer; @@ -31,7 +31,6 @@ public class RisingwaveCreateFunctionMacro private final String user; private final String sql; private final Function command; - private final RisingwaveMacroHandler handler; private final StringBuilder fieldBuilder; public RisingwaveCreateFunctionMacro( @@ -42,6 +41,8 @@ public RisingwaveCreateFunctionMacro( Function command, RisingwaveMacroHandler handler) { + super(sql, handler); + String javaServer = null; String pythonServer = null; @@ -66,7 +67,6 @@ else if (udf.language.equalsIgnoreCase("python")) this.user = user; this.sql = sql; this.command = command; - this.handler = handler; this.fieldBuilder = new StringBuilder(); } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateStreamMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateStreamMacro.java index 02ccfba8af..ae9e05536b 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateStreamMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateStreamMacro.java @@ -23,7 +23,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveCreateStreamMacro +public class RisingwaveCreateStreamMacro extends RisingwaveMacroBase { //TODO: Remove after implementing zstream private static final String ZILLA_CORRELATION_ID_OLD = "zilla_correlation_id"; @@ -47,10 +47,7 @@ public class RisingwaveCreateStreamMacro private final long scanStartupMil; private final String systemSchema; private final String user; - private final String sql; private final CreateStream command; - private final RisingwaveMacroHandler handler; - private final RisingwaveOnReadyHandler terminateState = this::terminateState; public RisingwaveCreateStreamMacro( String bootstrapServer, @@ -62,12 +59,12 @@ public RisingwaveCreateStreamMacro( CreateStream command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.scanStartupMil = scanStartupMil; this.systemSchema = systemSchema; this.user = user; - this.sql = sql; this.command = command; - this.handler = handler; this.bootstrapServer = bootstrapServer; this.schemaRegistry = schemaRegistry; @@ -78,32 +75,11 @@ public RisingwaveMacroState start() return new CreateTopicState(); } - private RisingwaveMacroState terminateState( - long traceId, - long authorization) - { - handler.doReady(traceId, authorization, sql.length()); - - return null; - } - private final class CreateTopicState implements RisingwaveMacroState { private final String sqlFormat = """ CREATE TOPIC IF NOT EXISTS %s (%s%s);\u0000"""; - private RisingwaveOnReadyHandler onReadyHandler = this::transition; - - private RisingwaveMacroState transition( - long traceId, - long authorization) - { - CreateSourceState state = new CreateSourceState(); - state.onStarted(traceId, authorization); - - return state; - } - @Override public void onStarted( long traceId, @@ -133,7 +109,10 @@ public RisingwaveMacroState onReady( long authorization, PgsqlFlushExFW flushEx) { - return onReadyHandler.onReady(traceId, authorization); + CreateSourceState state = new CreateSourceState(); + state.onStarted(traceId, authorization); + + return state; } @Override @@ -143,9 +122,8 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - onReadyHandler = terminateState; - return this; + return errorState(); } } @@ -163,18 +141,6 @@ private final class CreateSourceState implements RisingwaveMacroState schema.registry = '%s' );\u0000"""; - private RisingwaveOnReadyHandler onReadyHandler = this::transition; - - private RisingwaveMacroState transition( - long traceId, - long authorization) - { - GrantResourceState state = new GrantResourceState(); - state.onStarted(traceId, authorization); - - return state; - } - @Override public void onStarted( long traceId, @@ -206,7 +172,10 @@ public RisingwaveMacroState onReady( long authorization, PgsqlFlushExFW flushEx) { - return onReadyHandler.onReady(traceId, authorization); + GrantResourceState state = new GrantResourceState(); + state.onStarted(traceId, authorization); + + return state; } @Override @@ -216,8 +185,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - onReadyHandler = terminateState; - return this; + return errorState(); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java index 47e7e32809..3b96f90fe9 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java @@ -27,7 +27,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveCreateZtableMacro +public class RisingwaveCreateZtableMacro extends RisingwaveMacroBase { private static final String ZTABLE_NAME = "ztables"; private static final String TABLE_NAME = "tables"; @@ -49,9 +49,7 @@ public class RisingwaveCreateZtableMacro private final StringBuilder includeBuilder; private final String systemSchema; private final String user; - private final String sql; private final CreateTable command; - private final RisingwaveMacroHandler handler; public RisingwaveCreateZtableMacro( String bootstrapServer, @@ -63,11 +61,11 @@ public RisingwaveCreateZtableMacro( CreateTable command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.systemSchema = systemSchema; this.user = user; - this.sql = sql; this.command = command; - this.handler = handler; this.fieldBuilder = new StringBuilder(); this.includeBuilder = new StringBuilder(); @@ -140,7 +138,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -224,7 +222,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -312,7 +310,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -364,7 +362,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -402,7 +400,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -441,7 +439,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -498,7 +496,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } private final class InsertIntoCatalogState implements RisingwaveMacroState @@ -552,5 +550,4 @@ public RisingwaveMacroState onError( } } } - } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZviewMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZviewMacro.java index eb027b7b27..be1e5bd531 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZviewMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZviewMacro.java @@ -29,7 +29,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.types.String32FW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveCreateZviewMacro +public class RisingwaveCreateZviewMacro extends RisingwaveMacroBase { protected static final int FLAGS_INIT = 0x02; @@ -45,9 +45,7 @@ public class RisingwaveCreateZviewMacro private final String schemaRegistry; private final String systemSchema; private final String user; - private final String sql; private final CreateZview command; - private final RisingwaveMacroHandler handler; public RisingwaveCreateZviewMacro( String bootstrapServer, @@ -58,13 +56,13 @@ public RisingwaveCreateZviewMacro( CreateZview command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.bootstrapServer = bootstrapServer; this.schemaRegistry = schemaRegistry; this.systemSchema = systemSchema; this.user = user; - this.sql = sql; this.command = command; - this.handler = handler; this.columnTypes = new ArrayList<>(); this.columnDescriptions = new ArrayList<>(); @@ -113,7 +111,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -151,7 +149,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -253,7 +251,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -310,7 +308,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -327,10 +325,6 @@ private final class CreateSinkState implements RisingwaveMacroState schema.registry='%s' ) KEY ENCODE TEXT;\u0000"""; - private final StringBuilder fieldBuilder = new StringBuilder(); - private final StringBuilder primaryKeyBuilder = new StringBuilder(); - - @Override public void onStarted( long traceId, @@ -378,7 +372,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropStreamMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropStreamMacro.java index 7883d32bd8..2f5dd4c094 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropStreamMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropStreamMacro.java @@ -18,12 +18,10 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveDropStreamMacro +public class RisingwaveDropStreamMacro extends RisingwaveMacroBase { private final String systemSchema; - private final String sql; private final Drop command; - private final RisingwaveMacroHandler handler; public RisingwaveDropStreamMacro( String systemSchema, @@ -31,10 +29,10 @@ public RisingwaveDropStreamMacro( Drop command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.systemSchema = systemSchema; - this.sql = sql; this.command = command; - this.handler = handler; } public RisingwaveMacroState start() @@ -75,7 +73,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -123,5 +121,4 @@ public RisingwaveMacroState onError( return this; } } - } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZtableMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZtableMacro.java index a09f19a9af..4a4e16523e 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZtableMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZtableMacro.java @@ -18,12 +18,10 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveDropZtableMacro +public class RisingwaveDropZtableMacro extends RisingwaveMacroBase { private final String systemSchema; - private final String sql; private final Drop command; - private final RisingwaveMacroHandler handler; public RisingwaveDropZtableMacro( String systemSchema, @@ -31,10 +29,10 @@ public RisingwaveDropZtableMacro( Drop command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.systemSchema = systemSchema; - this.sql = sql; this.command = command; - this.handler = handler; } public RisingwaveMacroState start() @@ -75,7 +73,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -112,7 +110,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -149,7 +147,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -186,7 +184,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -223,7 +221,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -260,7 +258,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -308,5 +306,4 @@ public RisingwaveMacroState onError( return this; } } - } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZviewMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZviewMacro.java index 01a91a1fc8..2a15d76a08 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZviewMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveDropZviewMacro.java @@ -18,12 +18,10 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveDropZviewMacro +public class RisingwaveDropZviewMacro extends RisingwaveMacroBase { private final String systemSchema; - private final String sql; private final Drop command; - private final RisingwaveMacroHandler handler; public RisingwaveDropZviewMacro( String systemSchema, @@ -31,10 +29,10 @@ public RisingwaveDropZviewMacro( Drop command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.systemSchema = systemSchema; - this.sql = sql; this.command = command; - this.handler = handler; } public RisingwaveMacroState start() @@ -75,7 +73,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -112,7 +110,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } @@ -149,7 +147,7 @@ public RisingwaveMacroState onError( PgsqlFlushExFW flushEx) { handler.doFlushProxy(traceId, authorization, flushEx); - return this; + return errorState(); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroBase.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroBase.java new file mode 100644 index 0000000000..e32af81d28 --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroBase.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.internal.macro; + +import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; + +public abstract class RisingwaveMacroBase +{ + protected final RisingwaveMacroHandler handler; + protected final String sql; + + public RisingwaveMacroBase( + String sql, + RisingwaveMacroHandler handler) + { + this.sql = sql; + this.handler = handler; + } + + protected RisingwaveMacroState errorState() + { + return new ErrorState(); + } + + protected final class ErrorState implements RisingwaveMacroState + { + @Override + public RisingwaveMacroState onReady( + long traceId, + long authorization, + PgsqlFlushExFW flushEx) + { + handler.doReady(traceId, authorization, sql.length()); + return null; + } + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroState.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroState.java index 849fcd64fb..fcc843f462 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroState.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveMacroState.java @@ -21,9 +21,11 @@ public interface RisingwaveMacroState { - void onStarted( + default void onStarted( long traceId, - long authorization); + long authorization) + { + } default RisingwaveMacroState onRow( T client, @@ -59,8 +61,11 @@ RisingwaveMacroState onReady( long authorization, PgsqlFlushExFW flushEx); - RisingwaveMacroState onError( + default RisingwaveMacroState onError( long traceId, long authorization, - PgsqlFlushExFW flushEx); + PgsqlFlushExFW flushEx) + { + return this; + } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveOnReadyHandler.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveOnReadyHandler.java deleted file mode 100644 index 14e819fc3a..0000000000 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveOnReadyHandler.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc - * - * Licensed under the Aklivity Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://www.aklivity.io/aklivity-community-license/ - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.aklivity.zilla.runtime.binding.risingwave.internal.macro; - -@FunctionalInterface -public interface RisingwaveOnReadyHandler -{ - RisingwaveMacroState onReady( - long traceId, - long authorization - ); -} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index 52eccf9cf6..d5396991fd 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -1179,7 +1179,7 @@ private void onAppErrorFlush( PgsqlFlushExFW flushEx) { messageOffset = 0; - server.macroState.onError(traceId, authorization, flushEx); + server.macroState = server.macroState.onError(traceId, authorization, flushEx); } private void onAppReadyFlush(