Skip to content

Commit

Permalink
Terminate transition on error
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Dec 18, 2024
1 parent 5da2130 commit 469afdd
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class RisingwaveCreateStreamMacro
private final String sql;
private final CreateStream command;
private final RisingwaveMacroHandler handler;
private final RisingwaveOnReadyHandler terminateState = this::terminateState;

public RisingwaveCreateStreamMacro(
String bootstrapServer,
Expand Down Expand Up @@ -77,11 +78,32 @@ public RisingwaveMacroState start()
return new CreateTopicState();
}

private RisingwaveMacroState terminateState(
long traceId,
long authorization)
{
handler.doReady(traceId, authorization, sql.length());

return null;
}

private final class CreateTopicState implements RisingwaveMacroState
{
private final String sqlFormat = """
CREATE TOPIC IF NOT EXISTS %s (%s%s);\u0000""";

private RisingwaveOnReadyHandler onReadyHandler = this::transition;

private RisingwaveMacroState transition(
long traceId,
long authorization)
{
CreateSourceState state = new CreateSourceState();
state.onStarted(traceId, authorization);

return state;
}

@Override
public void onStarted(
long traceId,
Expand Down Expand Up @@ -111,10 +133,7 @@ public RisingwaveMacroState onReady(
long authorization,
PgsqlFlushExFW flushEx)
{
CreateSourceState state = new CreateSourceState();
state.onStarted(traceId, authorization);

return state;
return onReadyHandler.onReady(traceId, authorization);
}

@Override
Expand All @@ -124,6 +143,8 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
onReadyHandler = terminateState;

return this;
}
}
Expand All @@ -142,7 +163,17 @@ private final class CreateSourceState implements RisingwaveMacroState
schema.registry = '%s'
);\u0000""";

private boolean errored;
private RisingwaveOnReadyHandler onReadyHandler = this::transition;

private RisingwaveMacroState transition(
long traceId,
long authorization)
{
GrantResourceState state = new GrantResourceState();
state.onStarted(traceId, authorization);

return state;
}

@Override
public void onStarted(
Expand Down Expand Up @@ -175,18 +206,7 @@ public RisingwaveMacroState onReady(
long authorization,
PgsqlFlushExFW flushEx)
{
GrantResourceState state = null;
if (!errored)
{
state = new GrantResourceState();
state.onStarted(traceId, authorization);
}
else
{
handler.doReady(traceId, authorization, sql.length());
}

return state;
return onReadyHandler.onReady(traceId, authorization);
}

@Override
Expand All @@ -195,9 +215,8 @@ public RisingwaveMacroState onError(
long authorization,
PgsqlFlushExFW flushEx)
{
// TODO: Handle error across all macro states
errored = true;
handler.doFlushProxy(traceId, authorization, flushEx);
onReadyHandler = terminateState;
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.risingwave.internal.macro;

@FunctionalInterface
public interface RisingwaveOnReadyHandler
{
RisingwaveMacroState onReady(
long traceId,
long authorization
);
}

0 comments on commit 469afdd

Please sign in to comment.