Skip to content

Commit

Permalink
Terminate state on error
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Dec 18, 2024
1 parent 469afdd commit b63aef5
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveAlterStreamMacro
public class RisingwaveAlterStreamMacro extends RisingwaveMacroBase
{
private final StringBuilder fieldBuilder;
private final String sql;
private final Alter command;
private final RisingwaveMacroHandler handler;

public RisingwaveAlterStreamMacro(
String sql,
Alter command,
RisingwaveMacroHandler handler)
{
this.sql = sql;
super(sql, handler);

this.command = command;
this.handler = handler;
this.fieldBuilder = new StringBuilder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveAlterZtableMacro
public class RisingwaveAlterZtableMacro extends RisingwaveMacroBase
{
private final StringBuilder fieldBuilder;
private final String sql;
private final Alter command;
private final RisingwaveMacroHandler handler;

public RisingwaveAlterZtableMacro(
String sql,
Alter command,
RisingwaveMacroHandler handler)
{
this.sql = sql;
super(sql, handler);

this.command = command;
this.handler = handler;
this.fieldBuilder = new StringBuilder();
}

Expand Down Expand Up @@ -84,7 +82,8 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;

return errorState();
}
}

Expand Down Expand Up @@ -142,5 +141,4 @@ public RisingwaveMacroState onError(
return this;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveCreateFunctionMacro
public class RisingwaveCreateFunctionMacro extends RisingwaveMacroBase
{
private final String javaServer;
private final String pythonServer;
Expand All @@ -31,7 +31,6 @@ public class RisingwaveCreateFunctionMacro
private final String user;
private final String sql;
private final Function command;
private final RisingwaveMacroHandler handler;
private final StringBuilder fieldBuilder;

public RisingwaveCreateFunctionMacro(
Expand All @@ -42,6 +41,8 @@ public RisingwaveCreateFunctionMacro(
Function command,
RisingwaveMacroHandler handler)
{
super(sql, handler);

String javaServer = null;
String pythonServer = null;

Expand All @@ -66,7 +67,6 @@ else if (udf.language.equalsIgnoreCase("python"))
this.user = user;
this.sql = sql;
this.command = command;
this.handler = handler;
this.fieldBuilder = new StringBuilder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveCreateStreamMacro
public class RisingwaveCreateStreamMacro extends RisingwaveMacroBase
{
//TODO: Remove after implementing zstream
private static final String ZILLA_CORRELATION_ID_OLD = "zilla_correlation_id";
Expand All @@ -47,10 +47,7 @@ public class RisingwaveCreateStreamMacro
private final long scanStartupMil;
private final String systemSchema;
private final String user;
private final String sql;
private final CreateStream command;
private final RisingwaveMacroHandler handler;
private final RisingwaveOnReadyHandler terminateState = this::terminateState;

public RisingwaveCreateStreamMacro(
String bootstrapServer,
Expand All @@ -62,12 +59,12 @@ public RisingwaveCreateStreamMacro(
CreateStream command,
RisingwaveMacroHandler handler)
{
super(sql, handler);

this.scanStartupMil = scanStartupMil;
this.systemSchema = systemSchema;
this.user = user;
this.sql = sql;
this.command = command;
this.handler = handler;

this.bootstrapServer = bootstrapServer;
this.schemaRegistry = schemaRegistry;
Expand All @@ -78,32 +75,11 @@ public RisingwaveMacroState start()
return new CreateTopicState();
}

private RisingwaveMacroState terminateState(
long traceId,
long authorization)
{
handler.doReady(traceId, authorization, sql.length());

return null;
}

private final class CreateTopicState implements RisingwaveMacroState
{
private final String sqlFormat = """
CREATE TOPIC IF NOT EXISTS %s (%s%s);\u0000""";

private RisingwaveOnReadyHandler onReadyHandler = this::transition;

private RisingwaveMacroState transition(
long traceId,
long authorization)
{
CreateSourceState state = new CreateSourceState();
state.onStarted(traceId, authorization);

return state;
}

@Override
public void onStarted(
long traceId,
Expand Down Expand Up @@ -133,7 +109,10 @@ public RisingwaveMacroState onReady(
long authorization,
PgsqlFlushExFW flushEx)
{
return onReadyHandler.onReady(traceId, authorization);
CreateSourceState state = new CreateSourceState();
state.onStarted(traceId, authorization);

return state;
}

@Override
Expand All @@ -143,9 +122,8 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
onReadyHandler = terminateState;

return this;
return errorState();
}
}

Expand All @@ -163,18 +141,6 @@ private final class CreateSourceState implements RisingwaveMacroState
schema.registry = '%s'
);\u0000""";

private RisingwaveOnReadyHandler onReadyHandler = this::transition;

private RisingwaveMacroState transition(
long traceId,
long authorization)
{
GrantResourceState state = new GrantResourceState();
state.onStarted(traceId, authorization);

return state;
}

@Override
public void onStarted(
long traceId,
Expand Down Expand Up @@ -206,7 +172,10 @@ public RisingwaveMacroState onReady(
long authorization,
PgsqlFlushExFW flushEx)
{
return onReadyHandler.onReady(traceId, authorization);
GrantResourceState state = new GrantResourceState();
state.onStarted(traceId, authorization);

return state;
}

@Override
Expand All @@ -216,8 +185,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
onReadyHandler = terminateState;
return this;
return errorState();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveCreateZtableMacro
public class RisingwaveCreateZtableMacro extends RisingwaveMacroBase
{
private static final String ZTABLE_NAME = "ztables";
private static final String TABLE_NAME = "tables";
Expand All @@ -49,9 +49,7 @@ public class RisingwaveCreateZtableMacro
private final StringBuilder includeBuilder;
private final String systemSchema;
private final String user;
private final String sql;
private final CreateTable command;
private final RisingwaveMacroHandler handler;

public RisingwaveCreateZtableMacro(
String bootstrapServer,
Expand All @@ -63,11 +61,11 @@ public RisingwaveCreateZtableMacro(
CreateTable command,
RisingwaveMacroHandler handler)
{
super(sql, handler);

this.systemSchema = systemSchema;
this.user = user;
this.sql = sql;
this.command = command;
this.handler = handler;
this.fieldBuilder = new StringBuilder();
this.includeBuilder = new StringBuilder();

Expand Down Expand Up @@ -140,7 +138,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}
}

Expand Down Expand Up @@ -224,7 +222,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}
}

Expand Down Expand Up @@ -312,7 +310,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}
}

Expand Down Expand Up @@ -364,7 +362,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}
}

Expand Down Expand Up @@ -402,7 +400,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}
}

Expand Down Expand Up @@ -441,7 +439,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}
}

Expand Down Expand Up @@ -498,7 +496,7 @@ public RisingwaveMacroState onError(
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);
return this;
return errorState();
}

private final class InsertIntoCatalogState implements RisingwaveMacroState
Expand Down Expand Up @@ -552,5 +550,4 @@ public RisingwaveMacroState onError(
}
}
}

}
Loading

0 comments on commit b63aef5

Please sign in to comment.