From 163cd7e5a7666ed59796608ccf0cb20fed76511d Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Tue, 4 Jul 2023 16:42:11 +0800 Subject: [PATCH] [Improve] improve the perf of schema and schema cache (#1033) * [Improve] improve the perf of schema and schema cache * [Fix] fix lint error * [revert] revert comment format * [revert] revert comment format * use sync.Once instead of atomic.Uint64 * revert comment format --------- Co-authored-by: gunli --- pulsar/producer_partition.go | 21 ++++++++------------- pulsar/schema.go | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9d04427a8..012bfd96a 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -100,29 +100,24 @@ type partitionProducer struct { } type schemaCache struct { - lock sync.RWMutex - schemas map[uint64][]byte + schemas sync.Map } func newSchemaCache() *schemaCache { - return &schemaCache{ - schemas: make(map[uint64][]byte), - } + return &schemaCache{} } func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) { - s.lock.Lock() - defer s.lock.Unlock() - key := schema.hash() - s.schemas[key] = schemaVersion + s.schemas.Store(key, schemaVersion) } func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.schemas[schema.hash()] + val, ok := s.schemas.Load(schema.hash()) + if !ok { + return nil + } + return val.([]byte) } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, diff --git a/pulsar/schema.go b/pulsar/schema.go index 0b413d40a..5c063e32a 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -23,6 +23,7 @@ import ( "fmt" "hash/maphash" "reflect" + "sync" "unsafe" log "github.com/sirupsen/logrus" @@ -66,13 +67,19 @@ type SchemaInfo struct { Schema string Type SchemaType Properties map[string]string + hashVal uint64 + hashOnce sync.Once } -func (s SchemaInfo) hash() uint64 { - h := maphash.Hash{} - h.SetSeed(seed) - h.Write([]byte(s.Schema)) - return h.Sum64() +func (s *SchemaInfo) hash() uint64 { + s.hashOnce.Do(func() { + h := maphash.Hash{} + h.SetSeed(seed) + h.Write([]byte(s.Schema)) + s.hashVal = h.Sum64() + }) + + return s.hashVal } type Schema interface {