@@ -723,99 +723,196 @@ func buildTransactionQuery(tf idb.TransactionFilter) (query string, whereArgs []
723
723
return
724
724
}
725
725
726
- func buildBlockQuery (bf idb.BlockFilter ) (query string , whereArgs []interface {}, err error ) {
726
+ // buildBlockWhereTerms generates some of the terms that go in the WHERE clause for the block search query.
727
+ //
728
+ // It only generates terms that filter blocks based on round and/or timestamp.
729
+ //
730
+ // Filters related to participacion are generated elsewhere.
731
+ func buildBlockWhereTerms (bf idb.BlockFilter ) []string {
732
+
733
+ var terms []string
734
+
735
+ // Round-based filters
736
+ if bf .MaxRound != nil {
737
+ terms = append (
738
+ terms ,
739
+ fmt .Sprintf ("round <= %d" , * bf .MaxRound ),
740
+ )
741
+ }
742
+ if bf .MinRound != nil {
743
+ terms = append (
744
+ terms ,
745
+ fmt .Sprintf ("round >= %d" , * bf .MinRound ),
746
+ )
747
+ }
748
+
749
+ // Timestamp-based filters
750
+ //
751
+ // Converting the timestamp into a round usually results in faster execution plans
752
+ // (compared to the execution plans that would result from using the `block_header.realtime` column directly)
753
+ if ! bf .AfterTime .IsZero () {
754
+ tmpl := `
755
+ round >= (
756
+ SELECT tmp.round
757
+ FROM block_header tmp
758
+ WHERE tmp.realtime > (to_timestamp(%d) AT TIME ZONE 'UTC')
759
+ ORDER BY tmp.realtime ASC, tmp.round ASC
760
+ LIMIT 1
761
+ )`
762
+ terms = append (
763
+ terms ,
764
+ fmt .Sprintf (tmpl , bf .AfterTime .UTC ().Unix ()),
765
+ )
766
+ }
767
+ if ! bf .BeforeTime .IsZero () {
768
+ tmpl := `
769
+ round <= (
770
+ SELECT tmp.round
771
+ FROM block_header tmp
772
+ WHERE tmp.realtime < (to_timestamp(%d) AT TIME ZONE 'UTC')
773
+ ORDER BY tmp.realtime DESC, tmp.round DESC
774
+ LIMIT 1
775
+ )`
776
+ terms = append (
777
+ terms ,
778
+ fmt .Sprintf (tmpl , bf .BeforeTime .UTC ().Unix ()),
779
+ )
780
+ }
781
+
782
+ return terms
783
+ }
727
784
728
- // Compute the terms in the WHERE clause
729
- whereParts := make ([]string , 0 )
730
- whereArgs = make ([]interface {}, 0 )
785
+ func buildBlockQuery (bf idb.BlockFilter ) (query string , err error ) {
786
+
787
+ // helper function to build CTEs
788
+ buildCte := func (cteName string , whereTerms []string ) string {
789
+ tmpl := `%s AS (
790
+ SELECT round, header
791
+ FROM block_header
792
+ WHERE %s
793
+ ORDER BY round ASC
794
+ LIMIT %d
795
+ )`
796
+ return fmt .Sprintf (tmpl , cteName , strings .Join (whereTerms , " AND " ), bf .Limit )
797
+ }
798
+
799
+ // Build auxiliary CTEs for participation-related parameters.
800
+ //
801
+ // Using CTEs in this way turned out to be necessary to lead CockroachDB's query optimizer
802
+ // into using the execution plan we want.
803
+ //
804
+ // If we were to put the CTE filters in the main query's WHERE clause, that would result
805
+ // in a sub-optimal execution plan. At least this was the case at the time of writing.
806
+ var CTEs []string
807
+ var CteNames []string
731
808
{
732
- var partNumber int
733
-
734
- if bf .MaxRound != nil {
735
- partNumber ++
736
- whereParts = append (whereParts , fmt .Sprintf ("round <= $%d" , partNumber ))
737
- whereArgs = append (whereArgs , * bf .MaxRound )
738
- }
739
- if bf .MinRound != nil {
740
- partNumber ++
741
- whereParts = append (whereParts , fmt .Sprintf ("round >= $%d" , partNumber ))
742
- whereArgs = append (whereArgs , * bf .MinRound )
743
- }
744
- if ! bf .AfterTime .IsZero () {
745
- partNumber ++
746
- whereParts = append (
747
- whereParts ,
748
- fmt .Sprintf ("round >= (SELECT tmp.round FROM block_header tmp WHERE tmp.realtime > $%d ORDER BY tmp.realtime ASC, tmp.round ASC LIMIT 1)" , partNumber ),
749
- )
750
- whereArgs = append (whereArgs , bf .AfterTime )
751
- }
752
- if ! bf .BeforeTime .IsZero () {
753
- partNumber ++
754
- whereParts = append (
755
- whereParts ,
756
- fmt .Sprintf ("round <= (SELECT tmp.round FROM block_header tmp WHERE tmp.realtime < $%d ORDER BY tmp.realtime DESC, tmp.round DESC LIMIT 1)" , partNumber ),
757
- )
758
- whereArgs = append (whereArgs , bf .BeforeTime )
759
- }
760
809
if len (bf .Proposers ) > 0 {
810
+ terms := buildBlockWhereTerms (bf )
761
811
var proposersStr []string
762
812
for addr := range bf .Proposers {
763
813
proposersStr = append (proposersStr , `'"` + addr .String ()+ `"'` )
764
814
}
765
- whereParts = append (
766
- whereParts ,
815
+ terms = append (
816
+ terms ,
767
817
fmt .Sprintf ("( (header->'prp') IS NOT NULL AND ((header->'prp')::TEXT IN (%s)) )" , strings .Join (proposersStr , "," )),
768
818
)
819
+
820
+ cteName := "prp"
821
+ cte := buildCte (cteName , terms )
822
+ CTEs = append (CTEs , cte )
823
+ CteNames = append (CteNames , cteName )
824
+
769
825
}
770
826
if len (bf .ExpiredParticipationAccounts ) > 0 {
827
+ terms := buildBlockWhereTerms (bf )
771
828
var expiredStr []string
772
829
for addr := range bf .ExpiredParticipationAccounts {
773
830
expiredStr = append (expiredStr , `'` + addr .String ()+ `'` )
774
831
}
775
- whereParts = append (
776
- whereParts ,
832
+ terms = append (
833
+ terms ,
777
834
fmt .Sprintf ("( (header->'partupdrmv') IS NOT NULL AND (header->'partupdrmv') ?| array[%s] )" , strings .Join (expiredStr , "," )),
778
835
)
836
+
837
+ cteName := "expired"
838
+ CTE := buildCte (cteName , terms )
839
+ CTEs = append (CTEs , CTE )
840
+ CteNames = append (CteNames , "expired" )
841
+
779
842
}
780
843
if len (bf .AbsentParticipationAccounts ) > 0 {
844
+ terms := buildBlockWhereTerms (bf )
781
845
var absentStr []string
782
846
for addr := range bf .AbsentParticipationAccounts {
783
847
absentStr = append (absentStr , `'` + addr .String ()+ `'` )
784
848
}
785
- whereParts = append (
786
- whereParts ,
849
+ terms = append (
850
+ terms ,
787
851
fmt .Sprintf ("( (header->'partupdabs') IS NOT NULL AND (header->'partupdabs') ?| array[%s] )" , strings .Join (absentStr , "," )),
788
852
)
853
+
854
+ cteName := "absent"
855
+ CTE := buildCte (cteName , terms )
856
+ CTEs = append (CTEs , CTE )
857
+ CteNames = append (CteNames , cteName )
858
+ }
859
+ if len (CteNames ) > 0 {
860
+ var selects []string
861
+ for _ , cteName := range CteNames {
862
+ selects = append (selects , fmt .Sprintf ("SELECT * FROM %s" , cteName ))
863
+ }
864
+ CTE := "tmp AS (" + strings .Join (selects , " UNION " ) + ")"
865
+ CTEs = append (CTEs , CTE )
789
866
}
790
867
}
791
868
792
- // SELECT, FROM
793
- query = `SELECT header FROM block_header`
794
- // WHERE
795
- if len (whereParts ) > 0 {
796
- whereStr := strings .Join (whereParts , " AND " )
797
- query += " WHERE " + whereStr
798
- }
799
- // ORDER BY
800
- query += " ORDER BY round ASC"
801
- // LIMIT
802
- if bf .Limit != 0 {
803
- query += fmt .Sprintf (" LIMIT %d" , bf .Limit )
869
+ // Build the main query. It uses the CTEs, if any.
870
+ {
871
+ var withClause string
872
+ if len (CTEs ) > 0 {
873
+ withClause = "WITH " + strings .Join (CTEs , ",\n " )
874
+ }
875
+
876
+ var fromTable string
877
+ if len (CTEs ) > 0 {
878
+ fromTable = "tmp"
879
+ } else {
880
+ fromTable = "block_header"
881
+ }
882
+
883
+ var whereClause string
884
+ if len (CTEs ) == 0 {
885
+ terms := buildBlockWhereTerms (bf )
886
+ if len (terms ) > 0 {
887
+ whereClause = "WHERE " + strings .Join (terms , " AND " ) + "\n "
888
+ }
889
+ }
890
+
891
+ tmpl := `
892
+ %s
893
+ SELECT header
894
+ FROM %s
895
+ %s
896
+ ORDER BY round ASC
897
+ LIMIT %d`
898
+
899
+ query = fmt .Sprintf (tmpl , withClause , fromTable , whereClause , bf .Limit )
804
900
}
805
- return
901
+
902
+ return query , nil
806
903
}
807
904
808
905
// This function blocks. `tx` must be non-nil.
809
906
func (db * IndexerDb ) yieldBlocks (ctx context.Context , tx pgx.Tx , bf idb.BlockFilter , out chan <- idb.BlockRow ) {
810
907
811
- query , whereArgs , err := buildBlockQuery (bf )
908
+ query , err := buildBlockQuery (bf )
812
909
if err != nil {
813
910
err = fmt .Errorf ("block query err %v" , err )
814
911
out <- idb.BlockRow {Error : err }
815
912
return
816
913
}
817
914
818
- rows , err := tx .Query (ctx , query , whereArgs ... )
915
+ rows , err := tx .Query (ctx , query )
819
916
if err != nil {
820
917
err = fmt .Errorf ("block query %#v err %v" , query , err )
821
918
out <- idb.BlockRow {Error : err }
0 commit comments