Skip to content

Unify ZFUNCTION and ZSTREAM into a single concept #1377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,6 @@ TOPIC: 'TOPIC';

TOPICS: 'TOPICS';

ZSTREAM: 'ZSTREAM';

ZVIEW: 'ZVIEW';

ZVIEWS: 'ZVIEWS';
Expand Down Expand Up @@ -558,9 +556,9 @@ FUNCTIONS: 'FUNCTIONS';

ZFUNCTION: 'ZFUNCTION';

ZFUNCTIONS: 'ZFUNCTIONS';
EVENTS: 'EVENTS';

ZSTREAMS: 'ZSTREAMS';
ZFUNCTIONS: 'ZFUNCTIONS';

GLOBAL: 'GLOBAL';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ stmt
| createseqstmt
| createstmt
| createztstmt
| createzstreamstmt
| createsubscriptionstmt
| createstatsstmt
| createtablespacestmt
Expand Down Expand Up @@ -415,39 +414,9 @@ altertablestmt
| ALTER MATERIALIZED VIEW (IF_P EXISTS)? qualified_name alter_table_cmds
| ALTER MATERIALIZED VIEW ALL IN_P TABLESPACE name (OWNED BY role_list)? SET TABLESPACE name opt_nowait
| ALTER FOREIGN TABLE (IF_P EXISTS)? relation_expr alter_table_cmds
| ALTER ZSTREAM (IF_P EXISTS)? relation_expr alter_zstream_cmds
| ALTER ZVIEW (IF_P EXISTS)? qualified_name alter_table_cmds
;

alter_zstream_cmds
: alter_zstream_cmd (COMMA alter_zstream_cmd)*
;

alter_zstream_cmd
: ADD_P columnDef
| ADD_P IF_P NOT EXISTS columnDef
| ADD_P COLUMN columnDef
| ADD_P COLUMN IF_P NOT EXISTS columnDef
| ALTER opt_column colid alter_column_default
| ALTER opt_column colid DROP NOT NULL_P
| ALTER opt_column colid SET NOT NULL_P
| ALTER opt_column colid DROP EXPRESSION
| ALTER opt_column colid DROP EXPRESSION IF_P EXISTS
| ALTER opt_column colid SET STATISTICS signediconst
| ALTER opt_column iconst SET STATISTICS signediconst
| ALTER opt_column colid SET reloptions
| ALTER opt_column colid RESET reloptions
| ALTER opt_column colid SET STORAGE colid
| ALTER opt_column colid ADD_P GENERATED generated_when AS IDENTITY_P optparenthesizedseqoptlist
| ALTER opt_column colid alter_identity_column_option_list
| ALTER opt_column colid DROP IDENTITY_P
| ALTER opt_column colid DROP IDENTITY_P IF_P EXISTS
| ALTER opt_column colid opt_set_data TYPE_P typename opt_collate_clause alter_using
| ALTER opt_column colid alter_generic_options
| DROP opt_column IF_P EXISTS colid opt_drop_behavior
| DROP opt_column colid opt_drop_behavior
;

alter_table_cmds
: alter_table_cmd (COMMA alter_table_cmd)*
;
Expand Down Expand Up @@ -694,22 +663,6 @@ copy_generic_opt_arg_list_item
: opt_boolean_or_string
;

createzstreamstmt
: CREATE ZSTREAM (IF_P NOT EXISTS)? zstream_name OPEN_PAREN zstream_columns CLOSE_PAREN opt_with_zstream
;

zstream_name
: qualified_name
;

zstream_columns
: zstream_column (COMMA zstream_column)*
;

zstream_column
: colid typename opt_generated_clause
;

opt_generated_clause
: GENERATED ALWAYS AS generation_type
| /* Empty */
Expand All @@ -720,10 +673,6 @@ generation_type
| NOW
;

opt_with_zstream
: WITH OPEN_PAREN zreloptions CLOSE_PAREN
;

zreloptions
: zreloption_elem (COMMA zreloption_elem)*
;
Expand Down Expand Up @@ -1679,7 +1628,6 @@ show_object_type_name
| ZTABLES
| ZVIEWS
| ZFUNCTIONS
| ZSTREAMS
;

dropstmt
Expand All @@ -1703,8 +1651,8 @@ object_type_any_name
| VIEW
| MATERIALIZED VIEW
| TOPIC
| ZSTREAM
| ZVIEW
| ZFUNCTION
| ZTABLE
| INDEX
| FOREIGN TABLE
Expand Down Expand Up @@ -2043,7 +1991,19 @@ opt_nulls_order

createzfunctionstmt
: CREATE ZFUNCTION func_name func_args_with_defaults
RETURNS TABLE OPEN_PAREN table_func_column_list CLOSE_PAREN LANGUAGE SQL_P AS DOLLAR_DELIMITER selectstmt SEMI DOLLAR_DELIMITER
RETURNS TABLE OPEN_PAREN table_func_column_list CLOSE_PAREN
LANGUAGE SQL_P AS DOLLAR_DELIMITER selectstmt SEMI DOLLAR_DELIMITER
zfunc_opt_with_clause
;

zfunc_opt_with_clause
: WITH OPEN_PAREN zfunc_with_option CLOSE_PAREN
| /* empty */
;

zfunc_with_option
: EVENTS EQUAL sconst
| collabel EQUAL sconst
;

createfunctionstmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
import org.antlr.v4.runtime.dfa.DFA;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlAlterZstreamTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlAlterZtableTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCommandListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateFunctionListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZfunctionListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZstreamListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZtableTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZviewListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlDropListener;
Expand All @@ -43,7 +41,6 @@
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateFunction;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZfunction;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZstream;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZview;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop;

Expand All @@ -54,10 +51,8 @@ public final class PgsqlParser
private final CommonTokenStream tokens;
private final PostgreSqlParser parser;
private final SqlCommandListener commandListener;
private final SqlCreateZstreamListener createStreamListener;
private final SqlCreateZtableTopicListener createTableListener;
private final SqlAlterZtableTopicListener alterTableListener;
private final SqlAlterZstreamTopicListener alterStreamListener;
private final SqlCreateFunctionListener createFunctionListener;
private final SqlCreateZfunctionListener createZfunctionListener;
private final SqlCreateZviewListener createMaterializedViewListener;
Expand All @@ -79,8 +74,6 @@ public PgsqlParser()
this.commandListener = new SqlCommandListener(tokens);
this.createTableListener = new SqlCreateZtableTopicListener(tokens);
this.alterTableListener = new SqlAlterZtableTopicListener(tokens);
this.alterStreamListener = new SqlAlterZstreamTopicListener(tokens);
this.createStreamListener = new SqlCreateZstreamListener(tokens);
this.createFunctionListener = new SqlCreateFunctionListener(tokens);
this.createZfunctionListener = new SqlCreateZfunctionListener(tokens);
this.createMaterializedViewListener = new SqlCreateZviewListener(tokens);
Expand Down Expand Up @@ -109,20 +102,6 @@ public Alter parseAlterTable(
return alterTableListener.alter();
}

public Alter parseAlterStream(
String sql)
{
parser(sql, alterStreamListener);
return alterStreamListener.alter();
}

public CreateZstream parseCreateZstream(
String sql)
{
parser(sql, createStreamListener);
return createStreamListener.stream();
}

public CreateFunction parseCreateFunction(
String sql)
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,6 @@ else if (ctx.ALTER() != null && ctx.TOPIC() != null)
{
command = "ALTER TOPIC";
}
else if (ctx.ALTER() != null && ctx.ZSTREAM() != null)
{
command = "ALTER ZSTREAM";
}
}

@Override
public void enterCreatezstreamstmt(
PostgreSqlParser.CreatezstreamstmtContext ctx)
{
command = "CREATE ZSTREAM";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class SqlCreateZfunctionListener extends PostgreSqlParserBaseListener
private String orderByClause;
private String limitClause;
private String offsetClause;
private String events;

private String schema;
private String name;
Expand Down Expand Up @@ -72,7 +73,8 @@ public CreateZfunction zfunction()
name,
arguments,
returnTypes,
select);
select,
events);
}

@Override
Expand Down Expand Up @@ -142,7 +144,7 @@ public void enterFrom_clause(
public void enterWhere_clause(
PostgreSqlParser.Where_clauseContext ctx)
{
whereClause = tokens.getText(ctx);
whereClause = tokens.getText(ctx).replaceAll("(?i)\\bwhere\\s", "");
}

@Override
Expand Down Expand Up @@ -172,4 +174,19 @@ public void enterOffset_clause(
{
offsetClause = tokens.getText(ctx);
}

@Override
public void enterZfunc_with_option(
PostgreSqlParser.Zfunc_with_optionContext ctx)
{
if (ctx.EVENTS() != null)
{
events = tokens.getText(ctx.sconst());

if (events != null && events.length() >= 2)
{
events = events.substring(1, events.length() - 1);
}
}
}
}
Loading
Loading