Skip to content

Commit

Permalink
Backport pglogical3 row_filter additions
Browse files Browse the repository at this point in the history
  • Loading branch information
ringerc committed Nov 1, 2018
1 parent 2f3ec2a commit 22392fc
Showing 1 changed file with 241 additions and 1 deletion.
242 changes: 241 additions & 1 deletion sql/row_filter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false, row_filter := $rf$ other > funcn_get_curr_decade() $rf$);
SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');

-- use this filter for rest of the test
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', true, row_filter := $rf$id > 1 AND data IS DISTINCT FROM 'baz' AND data IS DISTINCT FROM 'bbb'$rf$);

SELECT nspname, relname, set_name, rel_columns, row_filter FROM pglogical.tables WHERE relname = 'basic_dml';

-- fail, the membership in repset depends on data column
\set VERBOSITY terse
ALTER TABLE basic_dml DROP COLUMN data;
Expand Down Expand Up @@ -188,12 +191,22 @@ INSERT INTO test_jsonb VALUES
('array','["zero", "one","two",null,"four","five", [1,2,3],{"f1":9}]'),
('object','{"field1":"val1","field2":"val2","field3":null, "field4": 4, "field5": [1,2,3], "field6": {"f1":9}}');

SELECT * FROM pglogical.replication_set_add_table('default', 'test_jsonb', true, row_filter := $rf$test_json ->> 'field2' IS DISTINCT FROM 'val2' $rf$);
SELECT * FROM pglogical.replication_set_add_table('default', 'test_jsonb', true, row_filter := $rf$(test_json ->> 'field2') IS DISTINCT FROM 'val2' $rf$);

SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);

\c :subscriber_dsn

DO $$
BEGIN
FOR i IN 1..100 LOOP
IF NOT EXISTS (SELECT 1 FROM pglogical.local_sync_status WHERE sync_status != 'r') THEN
EXIT;
END IF;
PERFORM pg_sleep(0.1);
END LOOP;
END;$$;

SELECT * FROM test_jsonb ORDER BY json_type;

\c :provider_dsn
Expand All @@ -214,11 +227,238 @@ SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
-- even though we filtered on it. So will SomeThing.
SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo';

\c :provider_dsn

---------------------------------------------------
-- Enhanced function tests covering basic plpgsql
---------------------------------------------------

CREATE FUNCTION func_plpgsql_simple(arg integer)
RETURNS integer
LANGUAGE plpgsql
AS $$
BEGIN
RETURN arg;
END;
$$;

SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$ func_plpgsql_simple(other) = 100 $rf$);

-- Should FAIL due to dependency
--
-- FIXME: Succeeds incorrectly (RM#5880) leading to
-- cache lookup failed for function" errors in logs if allowed to commit
--
BEGIN;
DROP FUNCTION func_plpgsql_simple(integer);
ROLLBACK;

INSERT INTO basic_dml (other) VALUES (100), (101);
SELECT other FROM basic_dml WHERE other IN (100,101);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (100,101);

\c :provider_dsn

CREATE FUNCTION func_plpgsql_logic(arg integer)
RETURNS integer
LANGUAGE plpgsql
AS $$
BEGIN
IF arg = 200 THEN
RETURN arg;
ELSE
RETURN 0;
END IF;
END;
$$;

SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$ func_plpgsql_logic(other) = other $rf$);

INSERT INTO basic_dml (other) VALUES (200), (201);
SELECT other FROM basic_dml WHERE other IN (200,201);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (200,201);

\c :provider_dsn


CREATE FUNCTION func_plpgsql_security_definer(arg integer)
RETURNS integer
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
RAISE NOTICE 'c_u: %, s_u: %', current_user, session_user;
RETURN arg;
END;
$$;
CREATE ROLE temp_owner;
ALTER FUNCTION func_plpgsql_security_definer(integer) OWNER TO temp_owner;

SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$ func_plpgsql_security_definer(other) = 300 $rf$);

INSERT INTO basic_dml (other) VALUES (300), (301);
SELECT other FROM basic_dml WHERE other IN (300,301);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (300,301);

\c :provider_dsn

CREATE FUNCTION func_plpgsql_exception(arg integer)
RETURNS integer
LANGUAGE plpgsql
AS $$
BEGIN
BEGIN
SELECT arg/0;
EXCEPTION
WHEN division_by_zero THEN
RETURN arg;
END;
RAISE EXCEPTION 'should be unreachable';
END;
$$;

SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$ func_plpgsql_exception(other) = 400 $rf$);

INSERT INTO basic_dml (other) VALUES (400), (401);
SELECT other FROM basic_dml WHERE other IN (400,401);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (400,401);

\c :provider_dsn

-- Should not be able to use a SETOF or TABLE func directly
-- but we can do it via a wrapper:
CREATE FUNCTION func_plpgsql_srf_retq(arg integer)
RETURNS TABLE (result integer, dummy boolean)
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
RETURN QUERY SELECT arg * x, true FROM generate_series(1,2) x;
RETURN;
END;
$$;

-- fails with SRF context error
BEGIN;
SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$ (func_plpgsql_srf_retq(other)).result = 500 $rf$);
ROLLBACK;

CREATE FUNCTION func_plpgsql_call_set(arg integer)
RETURNS boolean
LANGUAGE plpgsql
AS $$
BEGIN
RETURN (SELECT true FROM func_plpgsql_srf_retq(arg) WHERE result = arg * 2);
END;
$$;

SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$ func_plpgsql_call_set(other) $rf$);

INSERT INTO basic_dml (other) VALUES (500), (501);
SELECT other FROM basic_dml WHERE other IN (500,501);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (500,501);

\c :provider_dsn
DROP FUNCTION func_plpgsql_simple(integer);
DROP FUNCTION func_plpgsql_logic(integer);
DROP FUNCTION func_plpgsql_security_definer(integer);
DROP FUNCTION func_plpgsql_exception(integer);
DROP FUNCTION func_plpgsql_srf_retq(integer);
DROP FUNCTION func_plpgsql_call_set(integer);
DROP ROLE temp_owner;

---------------------------------------------------
-- ^^^ End plpgsql tests
---------------------------------------------------

---------------------------------------------------
-- DROPping objects used by row-filters
---------------------------------------------------

CREATE FUNCTION dont_drop_me_baby_sql()
RETURNS integer LANGUAGE sql AS $$ SELECT 1; $$;

CREATE FUNCTION dont_drop_me_baby_plpgsql()
RETURNS integer LANGUAGE plpgsql AS $$
BEGIN
RETURN 1;
END;
$$;

-- No-inline view
CREATE VIEW dont_drop_me_baby_view
WITH (security_barrier = true)
AS SELECT 1 AS ret;

CREATE FUNCTION dont_drop_me_baby_view_sql()
RETURNS INTEGER LANGUAGE sql
AS $$ SELECT ret FROM dont_drop_me_baby_view; $$;

SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml');
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false,
row_filter := $rf$
dont_drop_me_baby_sql() = 1 AND
dont_drop_me_baby_plpgsql() = 1 AND
dont_drop_me_baby_view_sql() = 1
$rf$);

-- Works?
INSERT INTO basic_dml (other) VALUES (1000), (1001);

SELECT other FROM basic_dml WHERE other IN (1000,1001);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (1000,1001);

\c :provider_dsn

-- What if we drop the functions, but leave their use in the repset intact?

DROP FUNCTION dont_drop_me_baby_sql();
DROP FUNCTION dont_drop_me_baby_plpgsql();
DROP FUNCTION dont_drop_me_baby_view_sql();
DROP VIEW dont_drop_me_baby_view;

-- oops?
INSERT INTO basic_dml (other) VALUES (1100), (1101);

SELECT other FROM basic_dml WHERE other IN (1100,1101);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT other FROM basic_dml WHERE other IN (1100,1101);

---------------------------------------------------
-- ^^^ End DROPping objects used by row-filters
---------------------------------------------------

\c :provider_dsn
\set VERBOSITY terse
DROP FUNCTION funcn_add(integer, integer);
DROP FUNCTION funcn_nochange(text);
DROP FUNCTION funcn_get_curr_decade();

SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.basic_dml CASCADE;
DROP TABLE public.test_jsonb CASCADE;
Expand Down

0 comments on commit 22392fc

Please sign in to comment.