diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 6bd90818e2..d9502b7f14 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -100,29 +100,24 @@ type partitionProducer struct { } type schemaCache struct { - lock sync.RWMutex - schemas map[uint64][]byte + schemas sync.Map } func newSchemaCache() *schemaCache { - return &schemaCache{ - schemas: make(map[uint64][]byte), - } + return &schemaCache{} } func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) { - s.lock.Lock() - defer s.lock.Unlock() - key := schema.hash() - s.schemas[key] = schemaVersion + s.schemas.Store(key, schemaVersion) } func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.schemas[schema.hash()] + val, ok := s.schemas.Load(schema.hash()) + if !ok { + return nil + } + return val.([]byte) } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, @@ -1102,7 +1097,7 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - //Register transaction operation to transaction and the transaction coordinator. + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) diff --git a/pulsar/schema.go b/pulsar/schema.go index 0b413d40a6..95e045de73 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -23,6 +23,7 @@ import ( "fmt" "hash/maphash" "reflect" + "sync/atomic" "unsafe" log "github.com/sirupsen/logrus" @@ -37,27 +38,27 @@ import ( type SchemaType int const ( - NONE SchemaType = iota //No schema defined - STRING //Simple String encoding with UTF-8 - JSON //JSON object encoding and validation - PROTOBUF //Protobuf message encoding and decoding - AVRO //Serialize and deserialize via Avro + NONE SchemaType = iota // No schema defined + STRING // Simple String encoding with UTF-8 + JSON // JSON object encoding and validation + PROTOBUF // Protobuf message encoding and decoding + AVRO // Serialize and deserialize via Avro BOOLEAN // - INT8 //A 8-byte integer. - INT16 //A 16-byte integer. - INT32 //A 32-byte integer. - INT64 //A 64-byte integer. - FLOAT //A float number. - DOUBLE //A double number + INT8 // A 8-byte integer. + INT16 // A 16-byte integer. + INT32 // A 32-byte integer. + INT64 // A 64-byte integer. + FLOAT // A float number. + DOUBLE // A double number _ // _ // _ // - KeyValue //A Schema that contains Key Schema and Value Schema. - BYTES = -1 //A bytes array. + KeyValue // A Schema that contains Key Schema and Value Schema. + BYTES = -1 // A bytes array. AUTO = -2 // - AutoConsume = -3 //Auto Consume Type. + AutoConsume = -3 // Auto Consume Type. AutoPublish = -4 // Auto Publish Type. - ProtoNative = 20 //Protobuf native message encoding and decoding + ProtoNative = 20 // Protobuf native message encoding and decoding ) // Encapsulates data around the schema definition @@ -66,13 +67,21 @@ type SchemaInfo struct { Schema string Type SchemaType Properties map[string]string + hashVal atomic.Uint64 } func (s SchemaInfo) hash() uint64 { + oldHash := s.hashVal.Load() + if oldHash != 0 { + return oldHash + } + h := maphash.Hash{} h.SetSeed(seed) h.Write([]byte(s.Schema)) - return h.Sum64() + newHash := h.Sum64() + s.hashVal.CompareAndSwap(oldHash, newHash) + return newHash } type Schema interface {