Skip to content

Commit

Permalink
Merge pull request #124 from datastax/116
Browse files Browse the repository at this point in the history
Update Cassandra native protocol library version
  • Loading branch information
lukasz-antoniak authored Jun 20, 2024
2 parents 614d438 + fccc8a2 commit b366cb0
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 187 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.15.x, 1.16.x]
go-version: [1.18.x]
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
4 changes: 2 additions & 2 deletions astra/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func generateSecureBundleURLWithResponse(url, databaseID, token string, ctx cont
if err != nil {
return nil, err
}
res, err := client.GenerateSecureBundleURLWithResponse(ctx, astra.DatabaseIdParam(databaseID))
res, err := client.GenerateSecureBundleURLWithResponse(ctx, databaseID, &astra.GenerateSecureBundleURLParams{})
if err != nil {
return nil, fmt.Errorf("error generating bundle urls: %v", err)
}
Expand All @@ -158,7 +158,7 @@ func generateSecureBundleURLWithResponse(url, databaseID, token string, ctx cont
return nil, fmt.Errorf("unable to generate bundle urls, failed with status code %d", res.StatusCode())
}

return res.JSON200, nil
return &(*res.JSON200)[0], nil
}

func extract(reader *zip.Reader) (map[string][]byte, error) {
Expand Down
23 changes: 17 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
module github.com/datastax/cql-proxy

go 1.16
go 1.18

require (
github.com/alecthomas/kong v0.2.17
github.com/datastax/astra-client-go/v2 v2.2.9 // indirect
github.com/datastax/go-cassandra-native-protocol v0.0.0-20211124104234-f6aea54fa801
github.com/datastax/astra-client-go/v2 v2.2.54
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220706104457-5e8aad05cf90
github.com/hashicorp/golang-lru v0.5.4
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.1
go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.12.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
176 changes: 26 additions & 150 deletions go.sum

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions parser/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
{Keyspace: "system", Table: "local", Name: "rpc_address", Type: datatype.Inet},
{Keyspace: "system", Table: "local", Name: "data_center", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "partitioner", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "cluster_name", Type: datatype.Varchar},
Expand All @@ -43,7 +43,7 @@ var (
// language.
{Keyspace: "system", Table: "local", Name: "dse_version", Type: datatype.Varchar}, // DSE only
{Keyspace: "system", Table: "local", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "partitioner", Type: datatype.Varchar},
{Keyspace: "system", Table: "local", Name: "cluster_name", Type: datatype.Varchar},
Expand All @@ -58,7 +58,7 @@ var (
{Keyspace: "system", Table: "peers", Name: "rpc_address", Type: datatype.Inet},
{Keyspace: "system", Table: "peers", Name: "data_center", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid},
{Keyspace: "system", Table: "peers", Name: "host_id", Type: datatype.Uuid},
Expand All @@ -70,7 +70,7 @@ var (
{Keyspace: "system", Table: "peers", Name: "data_center", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "dse_version", Type: datatype.Varchar}, // DSE only
{Keyspace: "system", Table: "peers", Name: "rack", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)},
{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar},
{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid},
{Keyspace: "system", Table: "peers", Name: "host_id", Type: datatype.Uuid},
Expand All @@ -96,7 +96,7 @@ var (
{Keyspace: "system", Table: "schema_columnfamilies", Name: "compression_parameters", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "default_time_to_live", Type: datatype.Int},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "default_validator", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "dropped_columns", Type: datatype.NewMapType(datatype.Varchar, datatype.Bigint)},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "dropped_columns", Type: datatype.NewMap(datatype.Varchar, datatype.Bigint)},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "gc_grace_seconds", Type: datatype.Int},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "is_dense", Type: datatype.Boolean},
{Keyspace: "system", Table: "schema_columnfamilies", Name: "key_validator", Type: datatype.Varchar},
Expand Down Expand Up @@ -127,8 +127,8 @@ var (
SystemSchemaUsertypes = []*message.ColumnMetadata{
{Keyspace: "system", Table: "schema_usertypes", Name: "keyspace_name", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_usertypes", Name: "type_name", Type: datatype.Varchar},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_names", Type: datatype.NewListType(datatype.Varchar)},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_types", Type: datatype.NewListType(datatype.Varchar)},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_names", Type: datatype.NewList(datatype.Varchar)},
{Keyspace: "system", Table: "schema_usertypes", Name: "field_types", Type: datatype.NewList(datatype.Varchar)},
}
)

Expand Down
12 changes: 6 additions & 6 deletions proxy/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *partialQuery) GetOpCode() primitive.OpCode {
return primitive.OpCodeQuery
}

func (p *partialQuery) Clone() message.Message {
func (p *partialQuery) DeepCopyMessage() message.Message {
return &partialQuery{p.query}
}

Expand All @@ -78,10 +78,10 @@ func (m *partialExecute) GetOpCode() primitive.OpCode {
return primitive.OpCodeExecute
}

func (m *partialExecute) Clone() message.Message {
return &partialExecute{
queryId: primitive.CloneByteSlice(m.queryId),
}
func (m *partialExecute) DeepCopyMessage() message.Message {
queryId := make([]byte, len(m.queryId))
copy(queryId, m.queryId)
return &partialExecute{queryId}
}

func (m *partialExecute) String() string {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (p partialBatch) GetOpCode() primitive.OpCode {
return primitive.OpCodeBatch
}

func (p partialBatch) Clone() message.Message {
func (p partialBatch) DeepCopyMessage() message.Message {
queryOrIds := make([]interface{}, len(p.queryOrIds))
copy(queryOrIds, p.queryOrIds)
return &partialBatch{queryOrIds}
Expand Down
4 changes: 2 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (p *Proxy) buildLocalRow() {
"key": p.encodeTypeFatal(datatype.Varchar, "local"),
"data_center": p.encodeTypeFatal(datatype.Varchar, p.localNode.dc),
"rack": p.encodeTypeFatal(datatype.Varchar, "rack1"),
"tokens": p.encodeTypeFatal(datatype.NewListType(datatype.Varchar), p.localNode.tokens),
"tokens": p.encodeTypeFatal(datatype.NewList(datatype.Varchar), p.localNode.tokens),
"release_version": p.encodeTypeFatal(datatype.Varchar, p.cluster.Info.ReleaseVersion),
"partitioner": p.encodeTypeFatal(datatype.Varchar, p.cluster.Info.Partitioner),
"cluster_name": p.encodeTypeFatal(datatype.Varchar, "cql-proxy"),
Expand Down Expand Up @@ -734,7 +734,7 @@ func (c *client) filterSystemPeerValues(stmt *parser.SelectStatement, filtered [
} else if name == "host_id" {
return proxycore.EncodeType(datatype.Uuid, c.proxy.cluster.NegotiatedVersion, nameBasedUUID(peer.addr.String()))
} else if name == "tokens" {
return proxycore.EncodeType(datatype.NewListType(datatype.Varchar), c.proxy.cluster.NegotiatedVersion, peer.tokens)
return proxycore.EncodeType(datatype.NewList(datatype.Varchar), c.proxy.cluster.NegotiatedVersion, peer.tokens)
} else if name == "peer" {
return proxycore.EncodeType(datatype.Inet, c.proxy.cluster.NegotiatedVersion, peer.addr.IP)
} else if name == "rpc_address" {
Expand Down
11 changes: 6 additions & 5 deletions proxy/proxy_retries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"write timeout error, retry once if logged batch",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: idempotentQuery},
{Query: idempotentQuery},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand All @@ -263,7 +263,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"write timeout error, retry once if logged batch w/ prepared statement",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: idempotentQueryHash[:]},
{Id: idempotentQueryHash[:]},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand All @@ -278,7 +278,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"batch w/ non-idempotent query, don't retry",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: nonIdempotentQuery},
{Query: nonIdempotentQuery},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand All @@ -293,7 +293,7 @@ func TestProxy_BatchRetries(t *testing.T) {
{
"batch w/ non-idempotent prepared query, don't retry",
&message.Batch{Children: []*message.BatchChild{
{QueryOrId: nonIdempotentQueryHash[:]},
{Id: nonIdempotentQueryHash[:]},
}},
&message.WriteTimeout{
ErrorMessage: "WriteTimeout",
Expand Down Expand Up @@ -414,7 +414,8 @@ func testProxyRetryWithConfig(t *testing.T, query *frame.Frame, response message
mu.Lock()
defer mu.Unlock()
for _, child := range msg.Children {
if id, ok := child.QueryOrId.([]byte); ok {
id := child.Id
if id != nil {
var hash [16]byte
copy(hash[:], id)
if _, ok := prepared[hash]; !ok {
Expand Down
14 changes: 7 additions & 7 deletions proxycore/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ func DecodeType(dt datatype.DataType, version primitive.ProtocolVersion, bytes [
}

func codecFromDataType(dt datatype.DataType) (datacodec.Codec, error) {
switch dt.GetDataTypeCode() {
switch dt.Code() {
case primitive.DataTypeCodeList:
listType := dt.(datatype.ListType)
return datacodec.NewList(datatype.NewListType(listType.GetElementType()))
listType := dt.(*datatype.List)
return datacodec.NewList(datatype.NewList(listType.ElementType))
case primitive.DataTypeCodeSet:
setType := dt.(datatype.SetType)
return datacodec.NewSet(datatype.NewListType(setType.GetElementType()))
setType := dt.(*datatype.Set)
return datacodec.NewSet(datatype.NewSet(setType.ElementType))
case primitive.DataTypeCodeMap:
mapType := dt.(datatype.MapType)
return datacodec.NewMap(datatype.NewMapType(mapType.GetKeyType(), mapType.GetValueType()))
mapType := dt.(*datatype.Map)
return datacodec.NewMap(datatype.NewMap(mapType.KeyType, mapType.ValueType))
default:
codec, ok := primitiveCodecs[dt]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion proxycore/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *MockClient) makeSystemValues(version primitive.ProtocolVersion, address
"rpc_address": encodeTypeFatal(version, datatype.Inet, address),
"data_center": encodeTypeFatal(version, datatype.Varchar, "dc1"),
"rack": encodeTypeFatal(version, datatype.Varchar, "rack1"),
"tokens": encodeTypeFatal(version, datatype.NewListType(datatype.Varchar), []string{"0"}),
"tokens": encodeTypeFatal(version, datatype.NewList(datatype.Varchar), []string{"0"}),
"release_version": encodeTypeFatal(version, datatype.Varchar, "3.11.10"),
"host_id": encodeTypeFatal(version, datatype.Uuid, hostID),
"schema_version": encodeTypeFatal(version, datatype.Uuid, schemaVersion),
Expand Down

0 comments on commit b366cb0

Please sign in to comment.