diff --git a/proxy/processor_default.go b/proxy/processor_default.go index fc243514..db700e13 100644 --- a/proxy/processor_default.go +++ b/proxy/processor_default.go @@ -159,6 +159,7 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ return false, nil, err } + // Update proxy/protocol/responses.go apiKeyProduceMaxVersion when adding new Produce version support case 3, 4, 5, 6, 7, 8, 9, 10, 11: // CorrelationID + ClientID if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil { diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index fcd5cb65..f77d86aa 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -3,17 +3,28 @@ package protocol import ( "errors" "fmt" + "math" "github.com/grepplabs/kafka-proxy/config" ) const ( + apiKeyProduce = 0 apiKeyMetadata = 3 apiKeyFindCoordinator = 10 + apiKeyApiVersions = 18 + + // Update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy + apiKeyApiVersionsMaxVersion = 4 + apiKeyMetadataMaxVersion = 13 + apiKeyFindCoordinatorMaxVersion = 6 + // produce requests are parsed by proxy/processor_default.go mustReply() + apiKeyProduceMaxVersion = 11 brokersKeyName = "brokers" hostKeyName = "host" portKeyName = "port" + apiKeysKeyname = "api_keys" coordinatorKeyName = "coordinator" coordinatorsKeyName = "coordinators" @@ -22,6 +33,7 @@ const ( var ( metadataResponseSchemaVersions = createMetadataResponseSchemaVersions() findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions() + apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions() ) func createMetadataResponseSchemaVersions() []Schema { @@ -243,7 +255,33 @@ func createMetadataResponseSchemaVersions() []Schema { &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12} + metadataResponseV13 := NewSchema("metadata_response_v13", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9}, + &Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + // Update apiKeyMetadataMaxVersion when adding new versions + return []Schema{ + metadataResponseV0, + metadataResponseV1, + metadataResponseV2, + metadataResponseV3, + metadataResponseV4, + metadataResponseV5, + metadataResponseV6, + metadataResponseV7, + metadataResponseV8, + metadataResponseV9, + metadataResponseV10, + metadataResponseV11, + metadataResponseV12, + metadataResponseV13, + } } func createFindCoordinatorResponseSchemaVersions() []Schema { @@ -296,9 +334,117 @@ func createFindCoordinatorResponseSchemaVersions() []Schema { findCoordinatorResponseV5 := findCoordinatorResponseV4 findCoordinatorResponseV6 := findCoordinatorResponseV5 + // Update apiKeyFindCoordinatorMaxVersion when adding new versions return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6} } +func createApiVersionsResponseSchemaVersions() []Schema { + apiVersionKeyV0 := NewSchema("api_versions_key_v0", + &Mfield{Name: "api_key", Ty: TypeInt16}, + &Mfield{Name: "min_version", Ty: TypeInt16}, + &Mfield{Name: "max_version", Ty: TypeInt16}, + ) + + apiVersionSchemaV3 := NewSchema("api_versions_key_schema3", + &Mfield{Name: "api_key", Ty: TypeInt16}, + &Mfield{Name: "min_version", Ty: TypeInt16}, + &Mfield{Name: "max_version", Ty: TypeInt16}, + &SchemaTaggedFields{"api_versions_tagged_fields"}, + ) + + apiVersionsResponseV0 := NewSchema("api_versions_response_v0", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0}, + ) + + // Version 1 adds throttle time to the response. + apiVersionsResponseV1 := NewSchema("api_versions_response_v1", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + ) + + // Starting in version 2, on quota violation, brokers send out responses before throttling. + apiVersionsResponseV2 := apiVersionsResponseV1 + + // Version 3 is the first flexible version. Tagged fields are only supported in the body but + // not in the header. The length of the header must not change in order to guarantee the + // backward compatibility. + // + // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported + // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. + apiVersionsResponseV3 := NewSchema("api_versions_response_v3", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0. + apiVersionsResponseV4 := apiVersionsResponseV3 + + // Update apiKeyApiVersionsMaxVersion when adding new versions + return []Schema{ + apiVersionsResponseV0, + apiVersionsResponseV1, + apiVersionsResponseV2, + apiVersionsResponseV3, + apiVersionsResponseV4, + } +} + +func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { + if decodedStruct == nil { + return errors.New("decoded struct must not be nil") + } + if fn == nil { + return errors.New("net address mapper must not be nil") + } + apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{}) + if !ok { + return errors.New("api versions not found") + } + for _, apiVersionElement := range apiVersionsArray { + apiVersion := apiVersionElement.(*Struct) + apiKey, ok := apiVersion.Get("api_key").(int16) + if !ok { + return errors.New("api_keys.api_key not found") + } + maxVersion, ok := apiVersion.Get("max_version").(int16) + if !ok { + return errors.New("api_keys.max_version not found") + } + + limitVersion := int16(math.MaxInt16) + switch apiKey { + case apiKeyProduce: + if maxVersion > apiKeyProduceMaxVersion { + limitVersion = apiKeyProduceMaxVersion + } + case apiKeyMetadata: + if maxVersion > apiKeyMetadataMaxVersion { + limitVersion = apiKeyMetadataMaxVersion + } + case apiKeyFindCoordinator: + if maxVersion > apiKeyFindCoordinatorMaxVersion { + limitVersion = apiKeyFindCoordinatorMaxVersion + } + case apiKeyApiVersions: + if maxVersion > apiKeyApiVersionsMaxVersion { + limitVersion = apiKeyApiVersionsMaxVersion + } + } + if maxVersion > limitVersion { + err := apiVersion.Replace("max_version", limitVersion) + if err != nil { + return err + } + } + } + + return nil +} + func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { if decodedStruct == nil { return errors.New("decoded struct must not be nil") @@ -437,6 +583,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse) case apiKeyFindCoordinator: return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse) + case apiKeyApiVersions: + return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse) default: return nil, nil }