-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpgwrh--0.1.0.sql
1778 lines (1638 loc) · 62.2 KB
/
pgwrh--0.1.0.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- pgwrh
-- Copyright (C) 2024 Michal Kleczek
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as published by
-- the Free Software Foundation, either version 3 of the License, or
-- (at your option) any later version.
-- This program is distributed in the hope that it will be useful,
-- but WITHOUT ANY WARRANTY; without even the implied warranty of
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-- GNU Affero General Public License for more details.
-- You should have received a copy of the GNU Affero General Public License
-- along with this program. If not, see <http://www.gnu.org/licenses/>.
\echo Use "CREATE EXTENSION pgwrh CASCADE" to load this file. \quit
GRANT USAGE ON SCHEMA @extschema@ TO PUBLIC;
ALTER DEFAULT PRIVILEGES REVOKE EXECUTE ON ROUTINES FROM PUBLIC;
--------------------
-- Global (controller and replica) helpers
--------------------
CREATE OR REPLACE FUNCTION add_ext_dependency(_classid regclass, _objid oid) RETURNS void LANGUAGE sql AS
$$INSERT INTO pg_depend (classid, objid, refclassid, refobjid, deptype, objsubid, refobjsubid)
SELECT _classid, _objid, 'pg_extension'::regclass, e.oid, 'n', 0 ,0
FROM pg_extension e WHERE e.extname = 'pgwrh'$$;
CREATE OR REPLACE FUNCTION select_add_ext_dependency(_classid regclass, oidexpr text) RETURNS text LANGUAGE sql AS
$$SELECT format('SELECT @extschema@.add_ext_dependency(%L, %s)', _classid, oidexpr)$$;
CREATE OR REPLACE FUNCTION select_add_ext_dependency(_classid regclass, name_attr text, name text) RETURNS text LANGUAGE sql AS
$$SELECT format('SELECT @extschema@.add_ext_dependency(%1$L, (SELECT oid FROM %1$s WHERE %I = %L))', _classid, name_attr, name)$$;
-------------------
-- End Global (controller and replica) helpers
-------------------
----------------------------------------------------
-- Controller start
----------------------------------------------------
CREATE TYPE config_version AS ENUM ('FLIP', 'FLOP');
COMMENT ON TYPE config_version IS
'A FLIP/FLAP enum to use as config version identifier';
CREATE OR REPLACE FUNCTION next_version(version config_version) RETURNS config_version
IMMUTABLE
SET SEARCH_PATH FROM CURRENT
LANGUAGE sql AS
$$SELECT CASE version WHEN 'FLIP' THEN 'FLOP' ELSE 'FLIP' END::config_version$$;
CREATE OR REPLACE FUNCTION prev_version(version config_version) RETURNS config_version
IMMUTABLE
LANGUAGE sql AS
$$SELECT @extschema@.next_version(version)$$;
CREATE TABLE IF NOT EXISTS replication_group (
replication_group_id text NOT NULL PRIMARY KEY,
username text NOT NULL,
password text NOT NULL,
ready_version config_version NOT NULL DEFAULT 'FLIP'
);
SELECT pg_catalog.pg_extension_config_dump('replication_group', '');
COMMENT ON TABLE replication_group IS
'Represents a specific cluster (replica group) configuration.
A single sever may be a source of data for multiple groups of replicas.
Each group may have different configuration, in particular:
* what tables should be sharded
* number of desired copies per shard
* member servers and shard hosts topology
Username and password are credentials shared between cluster members and used
to access remote shards (ie. they are used in USER MAPPINGs created on cluster members).
';
COMMENT ON COLUMN replication_group.replication_group_id IS
'Unique identifier of a replication group.';
CREATE TABLE IF NOT EXISTS replication_group_config (
replication_group_id text NOT NULL REFERENCES replication_group(replication_group_id),
version config_version NOT NULL,
PRIMARY KEY (replication_group_id, version)
);
SELECT pg_catalog.pg_extension_config_dump('replication_group_config', '');
COMMENT ON TABLE replication_group_config IS
'Represents a version of configuration of a replication group.
Each cluster (replication group) configuration is versioned to make sure
changes in cluster topology and shards configuration does not cause any downtime.
There may be two versions of configuration present at the same time.
A configuration version might be "pending" or "ready".
Version marked as "ready" (pending = false) is a configuration version that all
replicas installed and configured successfully. The shards assigned to replicas in that version are copied, indexed and available to use.
Version marked as "pending" (pending = true) is a configuration version that is under installaction/configuration by the replicas.
A replica keeps all shards from "ready" configuration even if a shard might be no longer assigned to it in "pending" configuration version.';
CREATE TABLE IF NOT EXISTS replication_group_config_lock (
replication_group_id text NOT NULL,
version config_version NOT NULL,
PRIMARY KEY (replication_group_id, version),
FOREIGN KEY (replication_group_id, version)
REFERENCES replication_group_config(replication_group_id, version)
ON DELETE CASCADE
);
ALTER TABLE replication_group ADD FOREIGN KEY (replication_group_id, ready_version)
REFERENCES replication_group_config_lock(replication_group_id, version)
DEFERRABLE INITIALLY DEFERRED;
CREATE OR REPLACE FUNCTION replication_group_prepare_config() RETURNS trigger LANGUAGE plpgsql AS
$$
BEGIN
INSERT INTO @extschema@.replication_group_config VALUES (NEW.replication_group_id, NEW.ready_version)
ON CONFLICT DO NOTHING;
INSERT INTO @extschema@.replication_group_config_lock VALUES (NEW.replication_group_id, NEW.ready_version)
ON CONFLICT DO NOTHING;
RETURN NEW;
END
$$;
CREATE OR REPLACE TRIGGER replication_group_before_insert
AFTER INSERT ON replication_group
FOR EACH ROW EXECUTE FUNCTION replication_group_prepare_config();
CREATE OR REPLACE FUNCTION is_locked(group_id text, version config_version) RETURNS boolean
SET SEARCH_PATH FROM CURRENT
LANGUAGE sql STABLE AS
$$
SELECT EXISTS (SELECT 1 FROM replication_group_config_lock WHERE replication_group_id = group_id AND version = $2)
$$;
CREATE OR REPLACE FUNCTION next_pending_version(group_id text) RETURNS config_version
LANGUAGE sql AS
$$
INSERT INTO @extschema@.replication_group_config
SELECT replication_group_id, pgwrh.next_version(ready_version)
FROM @extschema@.replication_group
WHERE replication_group_id = group_id
ON CONFLICT DO NOTHING;
SELECT pgwrh.next_version(ready_version)
FROM @extschema@.replication_group
WHERE replication_group_id = group_id
$$;
COMMENT ON FUNCTION next_pending_version(group_id text) IS
'Inserts next pending version into replication_group_config and returns it.';
CREATE OR REPLACE FUNCTION next_pending_version_trigger() RETURNS TRIGGER
LANGUAGE plpgsql AS
$$BEGIN
NEW.version := @extschema@.next_pending_version(NEW.replication_group_id);
RETURN NEW;
END$$;
CREATE OR REPLACE FUNCTION forbid_locked_version_modifications() RETURNS TRIGGER
SET SEARCH_PATH FROM CURRENT
LANGUAGE plpgsql AS
$$
BEGIN
RAISE 'This config version is locked. Modifications in % are forbidden.', TG_RELID::regclass;
RETURN NULL;
END
$$;
CREATE OR REPLACE FUNCTION clone_config(group_id text, target_version config_version) RETURNS void
SET SEARCH_PATH FROM CURRENT
LANGUAGE sql AS
$$
-- clone weights
WITH _ AS (
INSERT INTO shard_host_weight (replication_group_id, host_id, version, weight)
SELECT
replication_group_id, host_id, target_version, weight
FROM
shard_host_weight w
WHERE
(replication_group_id, version) = (group_id, prev_version(target_version))
ON CONFLICT DO NOTHING
),
-- clone shards if necessary
__ AS (
INSERT INTO sharded_table (replication_group_id, sharded_table_schema, sharded_table_name, version, replica_count)
SELECT replication_group_id, sharded_table_schema, sharded_table_name, target_version, replica_count
FROM
sharded_table
WHERE
(replication_group_id, version) = (group_id, prev_version(target_version))
ON CONFLICT DO NOTHING
)
-- clone inex templates if necessary
INSERT INTO shard_index_template (replication_group_id, version, schema_name, table_name, index_name, index_template)
SELECT replication_group_id, target_version, schema_name, table_name, index_name, index_template
FROM
shard_index_template
WHERE
(replication_group_id, version) = (group_id, prev_version(target_version))
ON CONFLICT DO NOTHING
$$;
COMMENT ON FUNCTION clone_config IS
'Copies configuration from one version to another. Ignores already existing items.';
CREATE OR REPLACE FUNCTION clone_config_trigger() RETURNS TRIGGER
SET SEARCH_PATH FROM CURRENT
LANGUAGE plpgsql AS
$$BEGIN
PERFORM clone_config(NEW.replication_group_id, NEW.version);
RETURN NULL;
END$$;
CREATE OR REPLACE FUNCTION mark_pending_version_ready(group_id text) RETURNS void
SET SEARCH_PATH FROM CURRENT
LANGUAGE sql AS
$$
INSERT INTO @extschema@.replication_group_config_lock (replication_group_id, version)
SELECT replication_group_id, @extschema@.next_version(ready_version)
FROM @extschema@.replication_group
WHERE replication_group_id = group_id
ON CONFLICT DO NOTHING;
UPDATE @extschema@.replication_group g
SET ready_version = @extschema@.next_version(ready_version)
WHERE replication_group_id = group_id
$$;
COMMENT ON FUNCTION mark_pending_version_ready(group_id text) IS
'Swaps pending and ready configuration versions for a group.
Does not do anything if there is no pending version present';
CREATE OR REPLACE FUNCTION delete_pending_version(group_id text) RETURNS void LANGUAGE sql AS
$$
DELETE FROM @extschema@.replication_group_config c
WHERE
replication_group_id = group_id AND
version <> (
SELECT ready_version
FROM @extschema@.replication_group WHERE replication_group_id = c.replication_group_id
)
$$;
COMMENT ON FUNCTION delete_pending_version(group_id text) IS
'Removes pending (ie. the one that is not pointed to by replication_group(ready_version)) configuration version.
Removal of pending version may trigger removal of no longer needed shards on the replicas.
So it must be performed with caution after verifying no replicas assume presence of these shards on other replicas';
CREATE TABLE IF NOT EXISTS replication_group_member (
replication_group_id text NOT NULL REFERENCES replication_group(replication_group_id),
host_id text NOT NULL,
member_role text NOT NULL UNIQUE,
availability_zone text NOT NULL,
same_zone_multiplier smallint NOT NULL CHECK ( same_zone_multiplier BETWEEN 1 AND 5 ) DEFAULT 2,
PRIMARY KEY (replication_group_id, host_id)
);
SELECT pg_catalog.pg_extension_config_dump('replication_group_member', '');
COMMENT ON TABLE replication_group_member IS
'Represents a node in a cluster (replication group).
A cluster consists of two types of nodes:
* shard hosts - nodes that replicate and serve data
* non replicating members - nodes that act only as proxies (ie. not hosting any shards)';
CREATE TABLE IF NOT EXISTS shard_host (
replication_group_id text NOT NULL,
host_id text NOT NULL,
host_name text NOT NULL,
port int NOT NULL CHECK ( port > 0 ),
online boolean NOT NULL DEFAULT TRUE,
PRIMARY KEY (replication_group_id, host_id),
FOREIGN KEY (replication_group_id, host_id) REFERENCES replication_group_member(replication_group_id, host_id),
UNIQUE (host_name, port)
);
SELECT pg_catalog.pg_extension_config_dump('shard_host', '');
COMMENT ON TABLE shard_host IS
'Represents a data replicating node in a cluster (replication group).';
COMMENT ON COLUMN shard_host.online IS
'Shard host marked as offline is not going to receive any requests for data from other nodes.
It is still replicating shards assigned to it.
This flag is supposed to be used in situation when a particular node must be
temporarily disconnected from a cluster for maintenance purposes.';
CREATE TABLE IF NOT EXISTS shard_host_weight (
replication_group_id text NOT NULL,
host_id text NOT NULL,
version config_version NOT NULL,
weight int NOT NULL CHECK ( weight >= 0 ),
PRIMARY KEY (replication_group_id, host_id, version),
FOREIGN KEY (replication_group_id, host_id) REFERENCES shard_host(replication_group_id, host_id),
FOREIGN KEY (replication_group_id, version) REFERENCES replication_group_config(replication_group_id, version) ON DELETE CASCADE
);
SELECT pg_catalog.pg_extension_config_dump('shard_host_weight', '');
COMMENT ON TABLE shard_host_weight IS
'Weight of a shard host in a specific configuration version';
CREATE OR REPLACE TRIGGER forbid_not_pending_version_insert BEFORE INSERT ON shard_host_weight
FOR EACH ROW
WHEN (is_locked(NEW.replication_group_id, NEW.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER forbid_not_pending_version_update BEFORE UPDATE ON shard_host_weight
FOR EACH ROW
WHEN (is_locked(OLD.replication_group_id, OLD.version) OR is_locked(NEW.replication_group_id, NEW.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER forbid_not_pending_version_delete BEFORE DELETE ON shard_host_weight
FOR EACH ROW
WHEN (is_locked(OLD.replication_group_id, OLD.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER next_pending_version BEFORE INSERT ON shard_host_weight
FOR EACH ROW EXECUTE FUNCTION next_pending_version_trigger();
CREATE OR REPLACE TRIGGER clone_config AFTER INSERT ON shard_host_weight
FOR EACH ROW EXECUTE FUNCTION clone_config_trigger();
CREATE OR REPLACE FUNCTION add_shard_host(_replication_group_id text, _host_id text, _host_name text, _port int, _member_role regrole DEFAULT NULL, _availability_zone text DEFAULT 'default', _weight int DEFAULT 100) RETURNS void
SET SEARCH_PATH FROM CURRENT
LANGUAGE sql AS
$$
WITH m AS (
INSERT INTO replication_group_member (replication_group_id, host_id, member_role, availability_zone)
VALUES (_replication_group_id, _host_id, coalesce(_member_role::text, _host_id::regrole::text), _availability_zone)
),
h AS (
INSERT INTO shard_host (replication_group_id, host_id, host_name, port)
VALUES (_replication_group_id, _host_id, _host_name, _port)
)
INSERT INTO shard_host_weight (replication_group_id, host_id, weight)
VALUES (_replication_group_id, _host_id, _weight)
$$;
CREATE TABLE IF NOT EXISTS sharded_table (
replication_group_id text NOT NULL,
sharded_table_schema text NOT NULL,
sharded_table_name text NOT NULL,
version config_version NOT NULL,
replica_count smallint NOT NULL CHECK ( replica_count >= 0 ),
sharding_key_expression text NOT NULL DEFAULT 'SELECT $1 || $2',
PRIMARY KEY (replication_group_id, sharded_table_schema, sharded_table_name, version),
FOREIGN KEY (replication_group_id, version) REFERENCES replication_group_config(replication_group_id, version) ON DELETE CASCADE
);
SELECT pg_catalog.pg_extension_config_dump('sharded_table', '');
CREATE OR REPLACE TRIGGER forbid_not_pending_version_insert BEFORE INSERT ON sharded_table
FOR EACH ROW
WHEN (is_locked(NEW.replication_group_id, NEW.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER forbid_not_pending_version_update BEFORE UPDATE ON sharded_table
FOR EACH ROW
WHEN (is_locked(OLD.replication_group_id, OLD.version) OR is_locked(NEW.replication_group_id, NEW.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER forbid_not_pending_version_delete BEFORE DELETE ON sharded_table
FOR EACH ROW
WHEN (is_locked(OLD.replication_group_id, OLD.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER next_pending_version BEFORE INSERT ON sharded_table
FOR EACH ROW EXECUTE FUNCTION next_pending_version_trigger();
CREATE OR REPLACE TRIGGER clone_config AFTER INSERT ON sharded_table
FOR EACH ROW EXECUTE FUNCTION clone_config_trigger();
CREATE TABLE IF NOT EXISTS shard_index_template (
replication_group_id text NOT NULL,
version config_version,
schema_name text NOT NULL,
table_name text NOT NULL,
index_name name NOT NULL,
index_template text NOT NULL,
PRIMARY KEY (replication_group_id, schema_name, table_name, index_name),
FOREIGN KEY (replication_group_id, version) REFERENCES replication_group_config(replication_group_id, version) ON DELETE CASCADE
);
SELECT pg_catalog.pg_extension_config_dump('shard_index_template', '');
CREATE OR REPLACE TRIGGER forbid_not_pending_version_insert BEFORE INSERT ON shard_index_template
FOR EACH ROW
WHEN (is_locked(NEW.replication_group_id, NEW.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER forbid_not_pending_version_update BEFORE UPDATE ON shard_index_template
FOR EACH ROW
WHEN (is_locked(OLD.replication_group_id, OLD.version) OR is_locked(NEW.replication_group_id, NEW.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER forbid_not_pending_version_delete BEFORE DELETE ON shard_index_template
FOR EACH ROW
WHEN (is_locked(OLD.replication_group_id, OLD.version))
EXECUTE FUNCTION forbid_locked_version_modifications();
CREATE OR REPLACE TRIGGER next_pending_version BEFORE INSERT ON shard_index_template
FOR EACH ROW EXECUTE FUNCTION next_pending_version_trigger();
CREATE OR REPLACE TRIGGER clone_config AFTER INSERT ON shard_index_template
FOR EACH ROW EXECUTE FUNCTION clone_config_trigger();
CREATE TABLE IF NOT EXISTS pg_wrh_publication (
publication_name text NOT NULL PRIMARY KEY,
published_shard oid NOT NULL UNIQUE
);
CREATE OR REPLACE FUNCTION to_regclass(st @extschema@.sharded_table) RETURNS regclass STABLE LANGUAGE sql AS
$$SELECT to_regclass(st.sharded_table_schema || '.' || st.sharded_table_name)$$;
CREATE OR REPLACE FUNCTION stable_hash(VARIADIC text[]) RETURNS int IMMUTABLE LANGUAGE sql AS
$$SELECT ('x' || substr(md5(array_to_string($1, '', '')), 1, 8))::bit(32)::int$$;
GRANT EXECUTE ON ROUTINE stable_hash(VARIADIC text[]) TO PUBLIC;
CREATE OR REPLACE FUNCTION score(weight int, VARIADIC text[]) RETURNS double precision IMMUTABLE LANGUAGE sql AS
$$SELECT weight / -ln(pgwrh.stable_hash(VARIADIC $2)::double precision / ((2147483649)::bigint - (-2147483648)::bigint) + 0.5::double precision)$$;
GRANT EXECUTE ON ROUTINE score(weight int, text[]) TO PUBLIC;
CREATE OR REPLACE FUNCTION extract_sharding_key_value(schema_name text, table_name text, sharding_key_expression text) RETURNS text IMMUTABLE LANGUAGE plpgsql AS
$$
DECLARE
result text;
BEGIN
EXECUTE sharding_key_expression INTO result USING schema_name, table_name;
RETURN result;
END
$$;
GRANT EXECUTE ON ROUTINE extract_sharding_key_value(schema_name text, table_name text, sharding_key_expression text) TO PUBLIC;
CREATE OR REPLACE VIEW published_shard AS
WITH sharded_pg_class AS (
SELECT
st.replication_group_id,
c.oid::regclass,
version,
replica_count,
sharding_key_expression
FROM
pg_class c
JOIN pg_namespace n ON relnamespace = n.oid
JOIN sharded_table st ON (nspname, relname) = (sharded_table_schema, sharded_table_name)
)
SELECT
replication_group_id,
version,
nspname AS schema_name,
relname AS table_name,
replica_count,
@extschema@.extract_sharding_key_value(nspname, relname, sharding_key_expression) AS sharding_key_value,
publication_name
FROM
pg_class c
JOIN pg_wrh_publication pwp ON c.oid = pwp.published_shard
JOIN pg_publication_rel pr ON c.oid = prrelid
JOIN pg_publication pub ON pub.oid = prpubid AND pub.pubname = pwp.publication_name
JOIN pg_namespace n ON n.oid = relnamespace
JOIN sharded_pg_class st ON st.oid = ANY (
SELECT * FROM pg_partition_ancestors(c.oid)
) AND NOT EXISTS (
SELECT 1
FROM sharded_pg_class des
WHERE
des.oid = ANY (SELECT * FROM pg_partition_ancestors(c.oid))
AND des.oid <> st.oid
AND st.oid = ANY (SELECT * FROM pg_partition_ancestors(des.oid))
)
WHERE
c.relkind = 'r';
COMMENT ON VIEW published_shard IS
'Provides shards and their number of pending and ready copies based on configuration in sharded_table.
A shard is a non-partitioned table which ancestor (can be the table iself) is in sharded_table.
The desired number of copies is specified per the whole hierarchy (ie. all partitions of a given table).
Only shards for which there is a publication are present here.';
CREATE OR REPLACE VIEW shard_assignment AS
WITH shard_assigned_host AS (
SELECT
replication_group_id,
version,
schema_name,
table_name,
publication_name,
availability_zone,
host_id,
host_name,
port,
online
FROM
published_shard s
CROSS JOIN LATERAL (
SELECT
availability_zone,
host_id,
host_name,
port,
online,
row_number() OVER (
PARTITION BY availability_zone
ORDER BY pgwrh.score(weight, sharding_key_value, host_id) DESC) AS group_rank
FROM
shard_host_weight
JOIN shard_host USING (replication_group_id, host_id)
JOIN replication_group_member USING (replication_group_id, host_id)
WHERE
(replication_group_id, version) = (s.replication_group_id, s.version)
ORDER BY
group_rank, pgwrh.score(100, sharding_key_value, availability_zone) DESC
LIMIT
s.replica_count
) h
)
SELECT
schema_name,
table_name,
-- is this member one of the assigned hosts?
bool_or(m.host_id = sah.host_id) AS local,
-- foreign server name, host and port
-- take all assigned hosts that are
-- -- ready (ie. NOT pending)
-- -- online
-- -- not this member
-- sort hosts by their id to minimize number of foreign servers
-- (ie. avoid having different foreign servers for different permutations the same hosts)
md5(coalesce(string_agg(sah.host_id, ',' ORDER BY sah.host_id)
FILTER (WHERE online AND g.ready_version = sah.version AND m.host_id <> sah.host_id), '')) AS shard_server_name,
coalesce(string_agg(host_name, ',' ORDER BY sah.host_id)
FILTER (WHERE online AND g.ready_version = sah.version AND m.host_id <> sah.host_id), '') AS host,
coalesce(string_agg(port::text, ',' ORDER BY sah.host_id)
FILTER (WHERE online AND g.ready_version = sah.version AND m.host_id <> sah.host_id), '') AS port,
current_database() AS dbname,
username AS shard_server_user,
password AS shard_server_password,
publication_name
FROM
replication_group_member m
JOIN replication_group g USING (replication_group_id)
JOIN shard_assigned_host sah USING (replication_group_id),
-- multiply hosts in the same availability zone as this member
generate_series(1, CASE WHEN m.availability_zone = sah.availability_zone THEN m.same_zone_multiplier ELSE 1 END)
WHERE member_role = CURRENT_ROLE
GROUP BY
schema_name, table_name, dbname, shard_server_user, shard_server_password, publication_name;
GRANT SELECT ON shard_assignment TO PUBLIC;
COMMENT ON VIEW shard_assignment IS
'Main view implementing shard assignment logic.
Presents a particular replication_group_member (as identified by member_role) view of the cluster (replicaton_group).
Each member sees all shards with the following information for each shard:
* "local" flag saying if this shard should be replicated to this member
* information on how to connect to remote replicas for this shard: host, port, dbname, user, password';
CREATE OR REPLACE VIEW shard_index AS
WITH parent_class AS (
SELECT DISTINCT
replication_group_id,
c.oid::regclass,
it.index_name,
it.index_template,
ready_version = version AS pending
FROM
pg_class c
JOIN pg_namespace n ON relnamespace = n.oid
JOIN shard_index_template it ON (nspname, relname) = (schema_name, table_name)
JOIN replication_group USING (replication_group_id)
)
SELECT
nspname AS schema_name,
relname AS table_name,
format('%s_%s_%s', relname, index_name, substring(md5(index_template), 0, 16)) AS index_name,
index_template,
pending
FROM
pg_class c
JOIN pg_wrh_publication pwp ON c.oid = pwp.published_shard
JOIN pg_publication_rel pr ON c.oid = prrelid
JOIN pg_publication pub ON pub.oid = prpubid AND pub.pubname = pwp.publication_name
JOIN pg_namespace n ON n.oid = relnamespace
JOIN parent_class p ON p.oid = ANY (
SELECT * FROM pg_partition_ancestors(c.oid)
)
JOIN replication_group_member m USING (replication_group_id)
WHERE
c.relkind = 'r'
AND m.member_role = CURRENT_ROLE;
GRANT SELECT ON shard_index TO PUBLIC;
COMMENT ON VIEW shard_index IS
'Provides definitions of indexes that should be created for each shard.';
-------- metadata
--
CREATE OR REPLACE FUNCTION sync_publications() RETURNS void
SET SEARCH_PATH FROM CURRENT
LANGUAGE plpgsql AS
$$DECLARE
r record;
BEGIN
INSERT INTO pg_wrh_publication (publication_name, published_shard)
SELECT
gen_random_uuid()::text,
c.oid::regclass
FROM
pg_class c
JOIN pg_namespace n ON n.oid = relnamespace
WHERE
EXISTS (
SELECT 1 FROM pg_partition_ancestors(c.oid) a JOIN sharded_table st ON a.oid = to_regclass(st)
)
AND relkind = 'r'
ON CONFLICT DO NOTHING;
FOR r IN
WITH deleted AS (
DELETE FROM pg_wrh_publication p WHERE NOT EXISTS (
SELECT 1 FROM pg_partition_ancestors(p.published_shard) a JOIN sharded_table st ON a.oid = to_regclass(st)
)
RETURNING *
)
SELECT
format('DROP PUBLICATION %I CASCADE', d.publication_name) stmt
FROM deleted d JOIN pg_publication p ON d.publication_name = p.pubname
LOOP
EXECUTE r.stmt;
END LOOP;
FOR r IN
SELECT format('CREATE PUBLICATION %I FOR TABLE %s WITH ( publish = %L )',
p.publication_name,
p.published_shard::regclass,
'insert,update,delete') stmt,
p.publication_name AS publication_name
FROM pg_wrh_publication p JOIN pg_class ON published_shard = oid
WHERE NOT EXISTS (
SELECT 1 FROM pg_publication WHERE pubname = p.publication_name
)
LOOP
EXECUTE r.stmt;
PERFORM add_ext_dependency('pg_publication'::regclass, (SELECT oid FROM pg_publication WHERE pubname = r.publication_name));
END LOOP;
RETURN;
END
$$;
CREATE OR REPLACE FUNCTION sync_publications_trigger() RETURNS TRIGGER
SET SEARCH_PATH FROM CURRENT
LANGUAGE plpgsql AS
$$BEGIN
PERFORM sync_publications();
RETURN NULL;
END$$;
-- CREATE OR REPLACE FUNCTION sync_publications_event_trigger RETURNS event_trigger LANGUAGE pgsql AS
-- $$BEGIN
-- PERFORM sync_publications();
-- END$$;
CREATE OR REPLACE TRIGGER sync_publications AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON sharded_table
FOR EACH STATEMENT EXECUTE FUNCTION sync_publications_trigger();
-- CREATE EVENT TRIGGER sync_publications ON ddl_command_end
-- WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE', 'DROP TABLE')
-- EXECUTE FUNCTION sync_publications_event_trigger();
-- -- API
CREATE OR REPLACE VIEW shard_structure AS
WITH stc AS (
SELECT
st.replication_group_id,
c.oid::regclass
FROM
pg_class c
JOIN pg_namespace n ON relnamespace = n.oid
JOIN sharded_table st ON (nspname, relname) = (sharded_table_schema, sharded_table_name)
),
roots AS (
SELECT *
FROM stc r
WHERE NOT EXISTS (SELECT 1 FROM stc WHERE replication_group_id = r.replication_group_id AND oid <> r.oid AND oid = ANY (SELECT * FROM pg_partition_ancestors(r.oid)))
)
SELECT
n.nspname AS schema_name,
c.relname AS table_name,
level,
format('CREATE TABLE IF NOT EXISTS %I.%I %s%s',
n.nspname, c.relname,
CASE WHEN level = 0
-- root of the partition tree - need to define attributes
THEN
'(' ||
(
SELECT string_agg(format('%I %s', attname, atttypid::regtype), ',')
FROM pg_attribute WHERE attrelid = t.relid AND attnum >= 1
) ||
coalesce(
', ' || (SELECT string_agg(pg_get_constraintdef(c.oid), ', ') FROM pg_constraint c WHERE conrelid = t.relid AND conislocal),
''
) ||
')'
-- partition - no attributes necessary
ELSE
format('PARTITION OF %I.%I%s %s',
pn.nspname, p.relname,
coalesce(
' (' || (SELECT string_agg(pg_get_constraintdef(c.oid), ', ') FROM pg_constraint c WHERE conrelid = t.relid AND conislocal) || ')',
''
),
pg_get_expr(c.relpartbound, c.oid))
END,
CASE WHEN t.isleaf
THEN
''
ELSE
' PARTITION BY ' || pg_get_partkeydef(t.relid)
END
) AS create_table
FROM
roots r JOIN replication_group_member m USING (replication_group_id), pg_partition_tree(oid) t
JOIN pg_class c ON t.relid = c.oid JOIN pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_class p ON t.parentrelid = p.oid LEFT JOIN pg_namespace pn ON p.relnamespace = pn.oid
WHERE
member_role = CURRENT_ROLE;
GRANT SELECT ON shard_structure TO PUBLIC;
----------------- REPLICA -------------------
CREATE SERVER IF NOT EXISTS replica_controller FOREIGN DATA WRAPPER postgres_fdw OPTIONS (load_balance_hosts 'random');
CREATE USER MAPPING FOR PUBLIC SERVER replica_controller;
CREATE OR REPLACE FUNCTION sub_num_modulus_exponent() RETURNS int LANGUAGE sql AS
$$SELECT 0$$; -- FIXME GUC
CREATE OR REPLACE FUNCTION sub_num() RETURNS int LANGUAGE sql AS
$$SELECT (2 ^ @extschema@.sub_num_modulus_exponent())$$;
CREATE TABLE IF NOT EXISTS dependent_subscription (
subname text NOT NULL PRIMARY KEY
);
SELECT pg_catalog.pg_extension_config_dump('dependent_subscription', '');
CREATE TABLE IF NOT EXISTS shard_subscription (
subname text NOT NULL PRIMARY KEY REFERENCES dependent_subscription(subname),
modulus int NOT NULL CHECK (modulus > 0),
remainder int NOT NULL CHECK (remainder >= 0 AND remainder < modulus)
);
SELECT pg_catalog.pg_extension_config_dump('shard_subscription', '');
CREATE OR REPLACE FUNCTION insert_dependent_subscription_trigger() RETURNS trigger LANGUAGE plpgsql AS
$$BEGIN
INSERT INTO @extschema@.dependent_subscription VALUES (NEW.subname);
RETURN NEW;
END$$;
CREATE OR REPLACE TRIGGER insert_dependent_subscription BEFORE INSERT ON shard_subscription
FOR EACH ROW EXECUTE FUNCTION insert_dependent_subscription_trigger();
CREATE TABLE IF NOT EXISTS config_change (
config_change_seq_number bigint PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS applied_config_change (
config_change_seq_number bigint NOT NULL PRIMARY KEY REFERENCES config_change(config_change_seq_number)
);
-- -- types.sql
CREATE TYPE rel_id AS (schema_name text, table_name text);
----------------
-- Helpers
----------------
-- options parsing
CREATE OR REPLACE FUNCTION opts(arr text[]) RETURNS TABLE(key text, value text, vals text[]) LANGUAGE sql AS
$$
SELECT kv[1] AS key, kv[2] AS value, vals FROM unnest(arr) AS o(val), string_to_array(o.val, '=') AS kv, string_to_array(kv[2], ',') vals
$$;
CREATE OR REPLACE FUNCTION update_server_options(server_name text, host text, port text, dbname text DEFAULT current_database())
RETURNS SETOF text
STABLE
LANGUAGE sql AS
$$
SELECT format('ALTER SERVER %I OPTIONS (%s)', srvname, string_agg(opts.cmd, ', '))
FROM
pg_foreign_server pfs CROSS JOIN LATERAL (
SELECT
CASE
WHEN opt.key IS NOT NULL THEN format('SET %s %L', toset.key, toset.val)
ELSE format('ADD %s %L', toset.key, toset.val)
END
FROM
unnest(ARRAY['host', 'port', 'dbname'], ARRAY[host, port, dbname]) AS toset(key, val)
LEFT JOIN (SELECT * FROM opts(pfs.srvoptions)) AS opt USING (key)
WHERE
opt.key IS NULL OR toset.val <> opt.value
) AS opts(cmd)
WHERE
srvname = server_name
GROUP BY
srvname
$$;
CREATE OR REPLACE FUNCTION update_user_mapping(server_name text, username text, password text)
RETURNS SETOF text
STABLE
LANGUAGE sql AS
$$
SELECT format('ALTER USER MAPPING FOR PUBLIC SERVER %I OPTIONS (%s)', srvname, string_agg(opts.cmd, ', '))
FROM
pg_user_mappings pum CROSS JOIN LATERAL (
SELECT
CASE
WHEN opt.key IS NOT NULL THEN format('SET %s %L', toset.key, toset.val)
ELSE format('ADD %s %L', toset.key, toset.val)
END
FROM
unnest(ARRAY['user', 'password'], ARRAY[username, password]) AS toset(key, val)
LEFT JOIN (SELECT * FROM opts(pum.umoptions)) AS opt USING (key)
WHERE
opt.key IS NULL OR toset.val <> opt.value
) AS opts(cmd)
WHERE
srvname = server_name
GROUP BY
srvname
$$;
-- rel_id functions
CREATE OR REPLACE VIEW rel AS
SELECT
pc, pn,
nspname AS schema_name,
relname AS table_name,
(nspname, relname)::rel_id AS rel_id,
pc.oid::regclass AS reg_class
FROM
pg_class pc
JOIN pg_namespace pn ON pn.oid = pc.relnamespace;
CREATE OR REPLACE VIEW local_rel AS
SELECT
r.*,
pg_get_expr((r).pc.relpartbound, (r).pc.oid) AS bound,
parent,
((r.rel_id).schema_name || '_' || 'slot', (r.rel_id).table_name)::rel_id AS slot_rel_id
FROM
rel r
LEFT JOIN pg_inherits pi ON (r).pc.oid = pi.inhrelid
LEFT JOIN rel AS parent ON (parent).pc.oid = pi.inhparent;
-- CREATE OR REPLACE VIEW server_host_port AS
-- SELECT
-- s.*,
-- host,
-- port
-- FROM
-- pg_foreign_server s,
-- LATERAL (
-- SELECT h.value AS host, p.value AS port
-- FROM opts(srvoptions) AS h, opts(srvoptions) AS p
-- WHERE h.key = 'host' AND p.key = 'port'
-- ) AS opts;
CREATE FOREIGN TABLE IF NOT EXISTS fdw_shard_assignment (
schema_name text,
table_name text,
local boolean,
shard_server_name text,
host text,
port text,
dbname text,
shard_server_user text,
shard_server_password text,
publication_name text
)
SERVER replica_controller
OPTIONS (table_name 'shard_assignment');
CREATE FOREIGN TABLE IF NOT EXISTS fdw_shard_index (
schema_name text,
table_name text,
index_name text,
index_template text,
pending boolean
)
SERVER replica_controller
OPTIONS (table_name 'shard_index');
CREATE FOREIGN TABLE IF NOT EXISTS fdw_shard_structure (
schema_name text,
table_name text,
level int,
create_table text
)
SERVER replica_controller
OPTIONS (table_name 'shard_structure');
-- TODO rename
CREATE OR REPLACE VIEW shard_assignment_r AS
SELECT
lr.rel_id AS rel_id,
lr.slot_rel_id AS slot_rel_id,
(lr).slot_rel_id.schema_name AS slot_schema_name,
(shard_server_schema, (rel_id).table_name)::rel_id AS remote_rel_id,
shard_server_name,
(rel_id).schema_name || '_' || shard_server_name AS shard_server_schema_name,
sa.local,
format('pgwrh_shard_subscription_%s_%s', sub_num(), (stable_hash(sa.schema_name, sa.table_name) % sub_num() + sub_num()) % sub_num()) AS subname,
sub_num() AS sub_modulus,
(stable_hash(sa.schema_name, sa.table_name) % sub_num() + sub_num()) % sub_num() AS sub_remainder,
sa.publication_name,
sa.shard_server_user,
sa.shard_server_password,
sa.dbname,
host,
port,
lr.reg_class,
lr.parent,
lr,
parent IS NOT NULL AND (parent).rel_id = slot_rel_id AS connected
FROM
fdw_shard_assignment sa
JOIN local_rel lr ON (sa.schema_name, sa.table_name) = ((lr).rel_id.schema_name, (lr).rel_id.table_name),
format('%s_%s', sa.schema_name, shard_server_name) AS shard_server_schema;
-- -- commands to execute
CREATE OR REPLACE VIEW sync(async, transactional, description, commands) AS
WITH shard_assignment AS MATERIALIZED (
SELECT * FROM shard_assignment_r
),
local_shard AS (
SELECT * FROM shard_assignment WHERE local
),
slot_schema AS (
SELECT DISTINCT slot_schema_name FROM shard_assignment
),
shard_structure AS MATERIALIZED (
SELECT * FROM fdw_shard_structure
),
shard_schema AS (
SELECT DISTINCT schema_name FROM shard_structure
),
shard_server AS (
SELECT DISTINCT
shard_server_name,
shard_server_schema_name,
host,
port,
dbname,
shard_server_user,
shard_server_password
FROM
shard_assignment
),
table_with_slot AS (
SELECT lr.*
FROM
local_rel lr
JOIN local_rel slot ON slot.rel_id = lr.slot_rel_id
),
server_host_port AS (
SELECT
s.*,
host,
port
FROM
pg_foreign_server s,
LATERAL (
SELECT h.value AS host, p.value AS port
FROM opts(srvoptions) AS h, opts(srvoptions) AS p
WHERE h.key = 'host' AND p.key = 'port'
) AS opts
),
owned_obj AS (
SELECT
classid,
objid
FROM
pg_depend d JOIN pg_extension e ON refclassid = 'pg_extension'::regclass AND refobjid = e.oid
WHERE
d.deptype = 'n'
),
owned_namespace AS (
SELECT
n.*
FROM
pg_namespace n JOIN owned_obj ON classid = 'pg_namespace'::regclass AND objid = n.oid
),
owned_subscription AS (
SELECT * FROM pg_subscription s JOIN shard_subscription USING (subname)
),
subscribed_publication AS (
SELECT
subname, pub.name AS subpubname
FROM
owned_subscription, unnest(subpublications) AS pub(name)
),