Skip to content

Commit

Permalink
Update Cassandra native protocol version
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-antoniak committed Jun 14, 2024
1 parent 614d438 commit a0d08b3
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 111 deletions.
4 changes: 2 additions & 2 deletions astra/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func generateSecureBundleURLWithResponse(url, databaseID, token string, ctx cont
if err != nil {
return nil, err
}
res, err := client.GenerateSecureBundleURLWithResponse(ctx, astra.DatabaseIdParam(databaseID))
res, err := client.GenerateSecureBundleURLWithResponse(ctx, databaseID, &astra.GenerateSecureBundleURLParams{})
if err != nil {
return nil, fmt.Errorf("error generating bundle urls: %v", err)
}
Expand All @@ -158,7 +158,7 @@ func generateSecureBundleURLWithResponse(url, databaseID, token string, ctx cont
return nil, fmt.Errorf("unable to generate bundle urls, failed with status code %d", res.StatusCode())
}

return res.JSON200, nil
return &(*res.JSON200)[0], nil
}

func extract(reader *zip.Reader) (map[string][]byte, error) {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ go 1.16

require (
github.com/alecthomas/kong v0.2.17
github.com/datastax/astra-client-go/v2 v2.2.9 // indirect
github.com/datastax/go-cassandra-native-protocol v0.0.0-20211124104234-f6aea54fa801
github.com/datastax/astra-client-go/v2 v2.2.54
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220706104457-5e8aad05cf90
github.com/hashicorp/golang-lru v0.5.4
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.1
go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)
177 changes: 98 additions & 79 deletions go.sum

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions parser/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
{Keyspace: "system", Table: "local", Name: "rpc_address", Type: datatype.Inet},
{Keyspace: "system", Table: "local", Name: "data_center", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "partitioner", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "cluster_name", Type: datatype.Varchar},
Expand All @@ -43,7 +43,7 @@ var (
// language.
{Keyspace: "system", Table: "local", Name: "dse_version", Type: datatype.Varchar}, // DSE only
{Keyspace: "system", Table: "local", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "partitioner", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "cluster_name", Type: datatype.Varchar},
Expand All @@ -58,7 +58,7 @@ var (
{Keyspace: "system", Table: "peers", Name: "rpc_address", Type: datatype.Inet},
{Keyspace: "system", Table: "peers", Name: "data_center", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid},
{Keyspace: "system", Table: "peers", Name: "host_id", Type: datatype.Uuid},
Expand All @@ -70,7 +70,7 @@ var (
{Keyspace: "system", Table: "peers", Name: "data_center", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "dse_version", Type: datatype.Varchar}, // DSE only
{Keyspace: "system", Table: "peers", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid},
{Keyspace: "system", Table: "peers", Name: "host_id", Type: datatype.Uuid},
Expand All @@ -96,7 +96,7 @@ var (
{Keyspace: "system", Table: "schema_columnfamilies", Name: "compression_parameters", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "default_time_to_live", Type: datatype.Int},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "default_validator", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "dropped_columns", Type: datatype.NewMapType(datatype.Varchar, datatype.Bigint)},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "dropped_columns", Type: datatype.NewMap(datatype.Varchar, datatype.Bigint)},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "gc_grace_seconds", Type: datatype.Int},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "is_dense", Type: datatype.Boolean},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "key_validator", Type: datatype.Varchar},
Expand Down Expand Up @@ -127,8 +127,8 @@ var (
SystemSchemaUsertypes = []*message.ColumnMetadata{
{Keyspace: "system", Table: "schema_usertypes", Name: "keyspace_name", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_usertypes", Name: "type_name", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_names", Type: datatype.NewListType(datatype.Varchar)},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_types", Type: datatype.NewListType(datatype.Varchar)},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_names", Type: datatype.NewList(datatype.Varchar)},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_types", Type: datatype.NewList(datatype.Varchar)},
}
)

Expand Down
8 changes: 4 additions & 4 deletions proxy/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *partialQuery) GetOpCode() primitive.OpCode {
return primitive.OpCodeQuery
}

func (p *partialQuery) Clone() message.Message {
func (p *partialQuery) DeepCopyMessage() message.Message {
return &partialQuery{p.query}
}

Expand All @@ -78,9 +78,9 @@ func (m *partialExecute) GetOpCode() primitive.OpCode {
return primitive.OpCodeExecute
}

func (m *partialExecute) Clone() message.Message {
func (m *partialExecute) DeepCopyMessage() message.Message {
return &partialExecute{
queryId: primitive.CloneByteSlice(m.queryId),
queryId: append(m.queryId[:0:0], m.queryId...),
}
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (p partialBatch) GetOpCode() primitive.OpCode {
return primitive.OpCodeBatch
}

func (p partialBatch) Clone() message.Message {
func (p partialBatch) DeepCopyMessage() message.Message {
queryOrIds := make([]interface{}, len(p.queryOrIds))
copy(queryOrIds, p.queryOrIds)
return &partialBatch{queryOrIds}
Expand Down
4 changes: 2 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (p *Proxy) buildLocalRow() {
"key": p.encodeTypeFatal(datatype.Varchar, "local"),
"data_center": p.encodeTypeFatal(datatype.Varchar, p.localNode.dc),
"rack": p.encodeTypeFatal(datatype.Varchar, "rack1"),
"tokens": p.encodeTypeFatal(datatype.NewListType(datatype.Varchar), p.localNode.tokens),
"tokens": p.encodeTypeFatal(datatype.NewList(datatype.Varchar), p.localNode.tokens),
"release_version": p.encodeTypeFatal(datatype.Varchar, p.cluster.Info.ReleaseVersion),
"partitioner": p.encodeTypeFatal(datatype.Varchar, p.cluster.Info.Partitioner),
"cluster_name": p.encodeTypeFatal(datatype.Varchar, "cql-proxy"),
Expand Down Expand Up @@ -734,7 +734,7 @@ func (c *client) filterSystemPeerValues(stmt *parser.SelectStatement, filtered [
} else if name == "host_id" {
return proxycore.EncodeType(datatype.Uuid, c.proxy.cluster.NegotiatedVersion, nameBasedUUID(peer.addr.String()))
} else if name == "tokens" {
return proxycore.EncodeType(datatype.NewListType(datatype.Varchar), c.proxy.cluster.NegotiatedVersion, peer.tokens)
return proxycore.EncodeType(datatype.NewList(datatype.Varchar), c.proxy.cluster.NegotiatedVersion, peer.tokens)
} else if name == "peer" {
return proxycore.EncodeType(datatype.Inet, c.proxy.cluster.NegotiatedVersion, peer.addr.IP)
} else if name == "rpc_address" {
Expand Down
11 changes: 6 additions & 5 deletions proxy/proxy_retries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"write timeout error, retry once if logged batch",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: idempotentQuery},
{Query: idempotentQuery},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand All @@ -263,7 +263,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"write timeout error, retry once if logged batch w/ prepared statement",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: idempotentQueryHash[:]},
{Id: idempotentQueryHash[:]},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand All @@ -278,7 +278,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"batch w/ non-idempotent query, don't retry",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: nonIdempotentQuery},
{Query: nonIdempotentQuery},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand All @@ -293,7 +293,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"batch w/ non-idempotent prepared query, don't retry",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: nonIdempotentQueryHash[:]},
{Id: nonIdempotentQueryHash[:]},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand Down Expand Up @@ -414,7 +414,8 @@ func testProxyRetryWithConfig(t *testing.T, query *frame.Frame, response message
mu.Lock()
defer mu.Unlock()
for _, child := range msg.Children {
if id, ok := child.QueryOrId.([]byte); ok {
id := child.Id
if id != nil {
var hash [16]byte
copy(hash[:], id)
if _, ok := prepared[hash]; !ok {
Expand Down
14 changes: 7 additions & 7 deletions proxycore/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ func DecodeType(dt datatype.DataType, version primitive.ProtocolVersion, bytes [
}

func codecFromDataType(dt datatype.DataType) (datacodec.Codec, error) {
switch dt.GetDataTypeCode() {
switch dt.Code() {
case primitive.DataTypeCodeList:
listType := dt.(datatype.ListType)
return datacodec.NewList(datatype.NewListType(listType.GetElementType()))
listType := dt.(*datatype.List)
return datacodec.NewList(datatype.NewList(listType.ElementType))
case primitive.DataTypeCodeSet:
setType := dt.(datatype.SetType)
return datacodec.NewSet(datatype.NewListType(setType.GetElementType()))
setType := dt.(*datatype.Set)
return datacodec.NewSet(datatype.NewSet(setType.ElementType))
case primitive.DataTypeCodeMap:
mapType := dt.(datatype.MapType)
return datacodec.NewMap(datatype.NewMapType(mapType.GetKeyType(), mapType.GetValueType()))
mapType := dt.(*datatype.Map)
return datacodec.NewMap(datatype.NewMap(mapType.KeyType, mapType.ValueType))
default:
codec, ok := primitiveCodecs[dt]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion proxycore/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *MockClient) makeSystemValues(version primitive.ProtocolVersion, address
"rpc_address": encodeTypeFatal(version, datatype.Inet, address),
"data_center": encodeTypeFatal(version, datatype.Varchar, "dc1"),
"rack": encodeTypeFatal(version, datatype.Varchar, "rack1"),
"tokens": encodeTypeFatal(version, datatype.NewListType(datatype.Varchar), []string{"0"}),
"tokens": encodeTypeFatal(version, datatype.NewList(datatype.Varchar), []string{"0"}),
"release_version": encodeTypeFatal(version, datatype.Varchar, "3.11.10"),
"host_id": encodeTypeFatal(version, datatype.Uuid, hostID),
"schema_version": encodeTypeFatal(version, datatype.Uuid, schemaVersion),
Expand Down

0 comments on commit a0d08b3

Please sign in to comment.