@@ -18,8 +18,10 @@ import (
1818 "bytes"
1919 "context"
2020 "crypto/sha256"
21- "encoding/gob"
22- "reflect"
21+ "encoding/binary"
22+ "hash"
23+ "math"
24+ "sort"
2325
2426 "cloud.google.com/go/spanner"
2527 sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
@@ -32,16 +34,6 @@ import (
3234
3335var errNextAfterSTop = status .Errorf (codes .FailedPrecondition , "Next called after Stop" )
3436
35- // init registers the protobuf types with gob so they can be encoded.
36- func init () {
37- gob .Register (structpb.Value_BoolValue {})
38- gob .Register (structpb.Value_NumberValue {})
39- gob .Register (structpb.Value_StringValue {})
40- gob .Register (structpb.Value_NullValue {})
41- gob .Register (structpb.Value_ListValue {})
42- gob .Register (structpb.Value_StructValue {})
43- }
44-
4537var _ rowIterator = & checksumRowIterator {}
4638
4739// checksumRowIterator implements rowIterator and keeps track of a running
@@ -66,12 +58,12 @@ type checksumRowIterator struct {
6658 // the retry has finished.
6759 stopped bool
6860
69- // checksum contains the current checksum for the results that have been
61+ // hash contains the current hash for the results that have been
7062 // seen. It is calculated as a SHA256 checksum over all rows that so far
7163 // have been returned.
72- checksum * [ 32 ] byte
73- buffer * bytes. Buffer
74- enc * gob. Encoder
64+ hash hash. Hash
65+ int32Buf [ 4 ] byte
66+ float64Buf [ 8 ] byte
7567
7668 // errIndex and err indicate any error and the index in the result set
7769 // where the error occurred.
@@ -110,7 +102,7 @@ func (it *checksumRowIterator) Next() (row *spanner.Row, err error) {
110102 // checksum of the columns that are included in this result. This is
111103 // also used to detect the possible difference between two empty
112104 // result sets with a different set of columns.
113- it .checksum , err = createMetadataChecksum ( it .enc , it . buffer , it .metadata )
105+ it .hash , err = it .createMetadataChecksum ( it .metadata )
114106 if err != nil {
115107 return err
116108 }
@@ -119,45 +111,79 @@ func (it *checksumRowIterator) Next() (row *spanner.Row, err error) {
119111 return it .err
120112 }
121113 // Update the current checksum.
122- it .checksum , err = updateChecksum (it .enc , it .buffer , it .checksum , row )
123- return err
114+ return it .updateChecksum (it .hash , row )
124115 })
125116 return row , err
126117}
127118
128119// updateChecksum calculates the following checksum based on a current checksum
129120// and a new row.
130- func updateChecksum (enc * gob.Encoder , buffer * bytes.Buffer , currentChecksum * [32 ]byte , row * spanner.Row ) (* [32 ]byte , error ) {
131- buffer .Reset ()
132- buffer .Write (currentChecksum [:])
121+ func (it * checksumRowIterator ) updateChecksum (hash hash.Hash , row * spanner.Row ) error {
133122 for i := 0 ; i < row .Size (); i ++ {
134123 var v spanner.GenericColumnValue
135124 err := row .Column (i , & v )
136125 if err != nil {
137- return nil , err
126+ return err
138127 }
139- err = enc .Encode (v )
140- if err != nil {
141- return nil , err
128+ it .hashValue (v .Value , hash )
129+ }
130+ return nil
131+ }
132+
133+ func (it * checksumRowIterator ) hashValue (value * structpb.Value , digest hash.Hash ) {
134+ switch value .GetKind ().(type ) {
135+ case * structpb.Value_StringValue :
136+ digest .Write (intToByte (it .int32Buf , len (value .GetStringValue ())))
137+ digest .Write ([]byte (value .GetStringValue ()))
138+ case * structpb.Value_NullValue :
139+ digest .Write ([]byte {0 })
140+ case * structpb.Value_NumberValue :
141+ digest .Write (float64ToByte (it .float64Buf , value .GetNumberValue ()))
142+ case * structpb.Value_BoolValue :
143+ if value .GetBoolValue () {
144+ digest .Write ([]byte {1 })
145+ } else {
146+ digest .Write ([]byte {0 })
147+ }
148+ case * structpb.Value_StructValue :
149+ fields := make ([]string , 0 , len (value .GetStructValue ().Fields ))
150+ for field := range value .GetStructValue ().Fields {
151+ fields = append (fields , field )
152+ }
153+ sort .Strings (fields )
154+ for _ , field := range fields {
155+ digest .Write (intToByte (it .int32Buf , len (field )))
156+ digest .Write ([]byte (field ))
157+ it .hashValue (value .GetStructValue ().Fields [field ], digest )
158+ }
159+ case * structpb.Value_ListValue :
160+ for _ , v := range value .GetListValue ().GetValues () {
161+ it .hashValue (v , digest )
142162 }
143163 }
144- res := sha256 .Sum256 (buffer .Bytes ())
145- return & res , nil
164+ }
165+
166+ func intToByte (buf [4 ]byte , v int ) []byte {
167+ binary .BigEndian .PutUint32 (buf [:], uint32 (v ))
168+ return buf [:]
169+ }
170+
171+ func float64ToByte (buf [8 ]byte , f float64 ) []byte {
172+ binary .BigEndian .PutUint64 (buf [:], math .Float64bits (f ))
173+ return buf [:]
146174}
147175
148176// createMetadataChecksum calculates the checksum of the metadata of a result.
149177// Only the column names and types are included in the checksum. Any transaction
150178// metadata is not included.
151- func createMetadataChecksum ( enc * gob. Encoder , buffer * bytes. Buffer , metadata * sppb.ResultSetMetadata ) (* [ 32 ] byte , error ) {
152- buffer . Reset ()
179+ func ( it * checksumRowIterator ) createMetadataChecksum ( metadata * sppb.ResultSetMetadata ) (hash. Hash , error ) {
180+ digest := sha256 . New ()
153181 for _ , field := range metadata .RowType .Fields {
154- err := enc .Encode (field )
155- if err != nil {
156- return nil , err
157- }
182+ digest .Write (intToByte (it .int32Buf , len (field .Name )))
183+ digest .Write ([]byte (field .Name ))
184+ digest .Write (intToByte (it .int32Buf , int (field .Type .Code .Number ())))
158185 }
159- res := sha256 .Sum256 (buffer .Bytes ())
160- return & res , nil
186+ return digest , nil
161187}
162188
163189// retry implements retriableStatement.retry for queries. It will execute the
@@ -167,8 +193,6 @@ func createMetadataChecksum(enc *gob.Encoder, buffer *bytes.Buffer, metadata *sp
167193// initial iterator was also returned by the new iterator, and that the errors
168194// were returned by the same row index.
169195func (it * checksumRowIterator ) retry (ctx context.Context , tx * spanner.ReadWriteStmtBasedTransaction ) error {
170- buffer := & bytes.Buffer {}
171- enc := gob .NewEncoder (buffer )
172196 retryIt := tx .QueryWithOptions (ctx , it .stmt , it .options )
173197 // If the original iterator had been stopped, we should also always stop the
174198 // new iterator.
@@ -193,12 +217,13 @@ func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteS
193217 // Iterate over the new result set as many times as we iterated over the initial
194218 // result set. The checksums of the two should be equal. Also, the new result set
195219 // should return any error on the same index as the original.
196- var newChecksum * [32 ]byte
220+ // var newChecksum *[32]byte
197221 var checksumErr error
222+ newHash := sha256 .New ()
198223 for n := int64 (0 ); n < it .nc ; n ++ {
199224 row , err := retryIt .Next ()
200225 if n == 0 && (err == nil || err == iterator .Done ) {
201- newChecksum , checksumErr = createMetadataChecksum (enc , buffer , retryIt .Metadata )
226+ newHash , checksumErr = it . createMetadataChecksum (retryIt .Metadata )
202227 if checksumErr != nil {
203228 return failRetry (checksumErr )
204229 }
@@ -211,14 +236,14 @@ func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteS
211236 }
212237 if errorsEqualForRetry (err , it .err ) && n == it .errIndex {
213238 // Check that the checksums are also equal.
214- if ! checksumsEqual (newChecksum , it .checksum ) {
239+ if ! checksumsEqual (newHash , it .hash ) {
215240 return failRetry (ErrAbortedDueToConcurrentModification )
216241 }
217242 return replaceIt (nil )
218243 }
219244 return failRetry (ErrAbortedDueToConcurrentModification )
220245 }
221- newChecksum , err = updateChecksum (enc , buffer , newChecksum , row )
246+ err = it . updateChecksum (newHash , row )
222247 if err != nil {
223248 return failRetry (err )
224249 }
@@ -230,16 +255,21 @@ func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteS
230255 if it .err != nil {
231256 return failRetry (ErrAbortedDueToConcurrentModification )
232257 }
233- if ! checksumsEqual (newChecksum , it .checksum ) {
258+ if ! checksumsEqual (newHash , it .hash ) {
234259 return failRetry (ErrAbortedDueToConcurrentModification )
235260 }
236261 // Everything seems to be equal, replace the underlying iterator and return
237262 // a nil error.
238263 return replaceIt (nil )
239264}
240265
241- func checksumsEqual (c1 , c2 * [32 ]byte ) bool {
242- return (reflect .ValueOf (c1 ).IsNil () && reflect .ValueOf (c2 ).IsNil ()) || * c1 == * c2
266+ func checksumsEqual (h1 , h2 hash.Hash ) bool {
267+ if h1 == nil || h2 == nil {
268+ return h1 == h2
269+ }
270+ c1 := h1 .Sum (nil )
271+ c2 := h2 .Sum (nil )
272+ return bytes .Equal (c1 , c2 )
243273}
244274
245275func (it * checksumRowIterator ) Stop () {
0 commit comments