@@ -17,13 +17,11 @@ limitations under the License.
17
17
package vdiff
18
18
19
19
import (
20
- "encoding/json"
21
20
"fmt"
22
21
"html/template"
23
22
"io"
24
23
"math"
25
24
"reflect"
26
- "sort"
27
25
"strings"
28
26
"time"
29
27
@@ -579,7 +577,7 @@ func buildRecentListings(resp *vtctldatapb.VDiffShowResponse) ([]*listing, error
579
577
func displayShowSingleSummary (out io.Writer , format , keyspace , workflowName , uuid string , resp * vtctldatapb.VDiffShowResponse , verbose bool ) (vdiff.VDiffState , error ) {
580
578
state := vdiff .UnknownState
581
579
var output string
582
- summary , err := buildSingleSummary (keyspace , workflowName , uuid , resp , verbose )
580
+ summary , err := workflow . BuildSummary (keyspace , workflowName , uuid , resp , verbose )
583
581
if err != nil {
584
582
return state , err
585
583
}
@@ -616,225 +614,6 @@ func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uui
616
614
return state , nil
617
615
}
618
616
619
- func buildSingleSummary (keyspace , workflow , uuid string , resp * vtctldatapb.VDiffShowResponse , verbose bool ) (* summary , error ) {
620
- summary := & summary {
621
- Workflow : workflow ,
622
- Keyspace : keyspace ,
623
- UUID : uuid ,
624
- State : vdiff .UnknownState ,
625
- RowsCompared : 0 ,
626
- StartedAt : "" ,
627
- CompletedAt : "" ,
628
- HasMismatch : false ,
629
- Shards : "" ,
630
- Reports : make (map [string ]map [string ]vdiff.DiffReport ),
631
- Errors : make (map [string ]string ),
632
- Progress : nil ,
633
- }
634
-
635
- var tableSummaryMap map [string ]tableSummary
636
- var reports map [string ]map [string ]vdiff.DiffReport
637
- // Keep a tally of the states across all tables in all shards.
638
- tableStateCounts := map [vdiff.VDiffState ]int {
639
- vdiff .UnknownState : 0 ,
640
- vdiff .PendingState : 0 ,
641
- vdiff .StartedState : 0 ,
642
- vdiff .StoppedState : 0 ,
643
- vdiff .ErrorState : 0 ,
644
- vdiff .CompletedState : 0 ,
645
- }
646
- // Keep a tally of the summary states across all shards.
647
- shardStateCounts := map [vdiff.VDiffState ]int {
648
- vdiff .UnknownState : 0 ,
649
- vdiff .PendingState : 0 ,
650
- vdiff .StartedState : 0 ,
651
- vdiff .StoppedState : 0 ,
652
- vdiff .ErrorState : 0 ,
653
- vdiff .CompletedState : 0 ,
654
- }
655
- // Keep a tally of the approximate total rows to process as we'll use this for our progress
656
- // report.
657
- totalRowsToCompare := int64 (0 )
658
- var shards []string
659
- for shard , resp := range resp .TabletResponses {
660
- first := true
661
- if resp != nil && resp .Output != nil {
662
- shards = append (shards , shard )
663
- qr := sqltypes .Proto3ToResult (resp .Output )
664
- if tableSummaryMap == nil {
665
- tableSummaryMap = make (map [string ]tableSummary , 0 )
666
- reports = make (map [string ]map [string ]vdiff.DiffReport , 0 )
667
- }
668
- for _ , row := range qr .Named ().Rows {
669
- // Update the global VDiff summary based on the per shard level summary.
670
- // Since these values will be the same for all subsequent rows we only use
671
- // the first row.
672
- if first {
673
- first = false
674
- // Our timestamps are strings in `2022-06-26 20:43:25` format so we sort
675
- // them lexicographically.
676
- // We should use the earliest started_at across all shards.
677
- if sa := row .AsString ("started_at" , "" ); summary .StartedAt == "" || sa < summary .StartedAt {
678
- summary .StartedAt = sa
679
- }
680
- // And we should use the latest completed_at across all shards.
681
- if ca := row .AsString ("completed_at" , "" ); summary .CompletedAt == "" || ca > summary .CompletedAt {
682
- summary .CompletedAt = ca
683
- }
684
- // If we had an error on the shard, then let's add that to the summary.
685
- if le := row .AsString ("last_error" , "" ); le != "" {
686
- summary .Errors [shard ] = le
687
- }
688
- // Keep track of how many shards are marked as a specific state. We check
689
- // this combined with the shard.table states to determine the VDiff summary
690
- // state.
691
- shardStateCounts [vdiff .VDiffState (strings .ToLower (row .AsString ("vdiff_state" , "" )))]++
692
- }
693
-
694
- // Global VDiff summary updates that take into account the per table details
695
- // per shard.
696
- {
697
- summary .RowsCompared += row .AsInt64 ("rows_compared" , 0 )
698
- totalRowsToCompare += row .AsInt64 ("table_rows" , 0 )
699
-
700
- // If we had a mismatch on any table on any shard then the global VDiff
701
- // summary does too.
702
- if mm , _ := row .ToBool ("has_mismatch" ); mm {
703
- summary .HasMismatch = true
704
- }
705
- }
706
-
707
- // Table summary information that must be accounted for across all shards.
708
- {
709
- table := row .AsString ("table_name" , "" )
710
- if table == "" { // This occurs when the table diff has not started on 1 or more shards
711
- continue
712
- }
713
- // Create the global VDiff table summary object if it doesn't exist.
714
- if _ , ok := tableSummaryMap [table ]; ! ok {
715
- tableSummaryMap [table ] = tableSummary {
716
- TableName : table ,
717
- State : vdiff .UnknownState ,
718
- }
719
-
720
- }
721
- ts := tableSummaryMap [table ]
722
- // This is the shard level VDiff table state.
723
- sts := vdiff .VDiffState (strings .ToLower (row .AsString ("table_state" , "" )))
724
- tableStateCounts [sts ]++
725
-
726
- // The error state must be sticky, and we should not override any other
727
- // known state with completed.
728
- switch sts {
729
- case vdiff .CompletedState :
730
- if ts .State == vdiff .UnknownState {
731
- ts .State = sts
732
- }
733
- case vdiff .ErrorState :
734
- ts .State = sts
735
- default :
736
- if ts .State != vdiff .ErrorState {
737
- ts .State = sts
738
- }
739
- }
740
-
741
- diffReport := row .AsString ("report" , "" )
742
- dr := vdiff.DiffReport {}
743
- if diffReport != "" {
744
- err := json .Unmarshal ([]byte (diffReport ), & dr )
745
- if err != nil {
746
- return nil , err
747
- }
748
- ts .RowsCompared += dr .ProcessedRows
749
- ts .MismatchedRows += dr .MismatchedRows
750
- ts .MatchingRows += dr .MatchingRows
751
- ts .ExtraRowsTarget += dr .ExtraRowsTarget
752
- ts .ExtraRowsSource += dr .ExtraRowsSource
753
- }
754
- if _ , ok := reports [table ]; ! ok {
755
- reports [table ] = make (map [string ]vdiff.DiffReport )
756
- }
757
-
758
- reports [table ][shard ] = dr
759
- tableSummaryMap [table ] = ts
760
- }
761
- }
762
- }
763
- }
764
-
765
- // The global VDiff summary should progress from pending->started->completed with
766
- // stopped for any shard and error for any table being sticky for the global summary.
767
- // We should only consider the VDiff to be complete if it's completed for every table
768
- // on every shard.
769
- if shardStateCounts [vdiff .StoppedState ] > 0 {
770
- summary .State = vdiff .StoppedState
771
- } else if shardStateCounts [vdiff .ErrorState ] > 0 || tableStateCounts [vdiff .ErrorState ] > 0 {
772
- summary .State = vdiff .ErrorState
773
- } else if tableStateCounts [vdiff .StartedState ] > 0 {
774
- summary .State = vdiff .StartedState
775
- } else if tableStateCounts [vdiff .PendingState ] > 0 {
776
- summary .State = vdiff .PendingState
777
- } else if tableStateCounts [vdiff .CompletedState ] == (len (tableSummaryMap ) * len (shards )) {
778
- // When doing shard consolidations/merges, we cannot rely solely on the
779
- // vdiff_table state as there are N sources that we process rows from sequentially
780
- // with each one writing to the shared _vt.vdiff_table record for the target shard.
781
- // So we only mark the vdiff for the shard as completed when we've finished
782
- // processing rows from all of the sources -- which is recorded by marking the
783
- // vdiff done for the shard by setting _vt.vdiff.state = completed.
784
- if shardStateCounts [vdiff .CompletedState ] == len (shards ) {
785
- summary .State = vdiff .CompletedState
786
- } else {
787
- summary .State = vdiff .StartedState
788
- }
789
- } else {
790
- summary .State = vdiff .UnknownState
791
- }
792
-
793
- // If the vdiff has been started then we can calculate the progress.
794
- if summary .State == vdiff .StartedState {
795
- summary .Progress = BuildProgressReport (summary .RowsCompared , totalRowsToCompare , summary .StartedAt )
796
- }
797
-
798
- sort .Strings (shards ) // Sort for predictable output
799
- summary .Shards = strings .Join (shards , "," )
800
- summary .TableSummaryMap = tableSummaryMap
801
- summary .Reports = reports
802
- if ! summary .HasMismatch && ! verbose {
803
- summary .Reports = nil
804
- summary .TableSummaryMap = nil
805
- }
806
- // If we haven't completed the global VDiff then be sure to reflect that with no
807
- // CompletedAt value.
808
- if summary .State != vdiff .CompletedState {
809
- summary .CompletedAt = ""
810
- }
811
- return summary , nil
812
- }
813
-
814
- func BuildProgressReport (rowsCompared int64 , rowsToCompare int64 , startedAt string ) * vdiff.ProgressReport {
815
- report := & vdiff.ProgressReport {}
816
- if rowsCompared >= 1 {
817
- // Round to 2 decimal points.
818
- report .Percentage = math .Round (math .Min ((float64 (rowsCompared )/ float64 (rowsToCompare ))* 100 , 100.00 )* 100 ) / 100
819
- }
820
- if math .IsNaN (report .Percentage ) {
821
- report .Percentage = 0
822
- }
823
- pctToGo := math .Abs (report .Percentage - 100.00 )
824
- startTime , _ := time .Parse (vdiff .TimestampFormat , startedAt )
825
- curTime := time .Now ().UTC ()
826
- runTime := curTime .Unix () - startTime .Unix ()
827
- if report .Percentage >= 1 {
828
- // Calculate how long 1% took, on avg, and multiply that by the % left.
829
- eta := time .Unix (((int64 (runTime )/ int64 (report .Percentage ))* int64 (pctToGo ))+ curTime .Unix (), 1 ).UTC ()
830
- // Cap the ETA at 1 year out to prevent providing nonsensical ETAs.
831
- if eta .Before (time .Now ().UTC ().AddDate (1 , 0 , 0 )) {
832
- report .ETA = eta .Format (vdiff .TimestampFormat )
833
- }
834
- }
835
- return report
836
- }
837
-
838
617
func commandShow (cmd * cobra.Command , args []string ) error {
839
618
format , err := common .GetOutputFormat (cmd )
840
619
if err != nil {
0 commit comments