From 6cee36d98d224716bb1b45f083f1daf2f91595b0 Mon Sep 17 00:00:00 2001 From: gunli Date: Sun, 25 Jun 2023 11:48:56 +0800 Subject: [PATCH 1/6] [Improve] improve the perf of schema and schema cache --- pulsar/producer_partition.go | 23 ++++++++------------ pulsar/schema.go | 41 ++++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 837d1d78e6..06bcc5a7d2 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -100,29 +100,24 @@ type partitionProducer struct { } type schemaCache struct { - lock sync.RWMutex - schemas map[uint64][]byte + schemas sync.Map } func newSchemaCache() *schemaCache { - return &schemaCache{ - schemas: make(map[uint64][]byte), - } + return &schemaCache{} } func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) { - s.lock.Lock() - defer s.lock.Unlock() - key := schema.hash() - s.schemas[key] = schemaVersion + s.schemas.Store(key, schemaVersion) } func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.schemas[schema.hash()] + val, ok := s.schemas.Load(schema.hash()) + if !ok { + return nil + } + return val.([]byte) } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, @@ -1106,7 +1101,7 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - //Register transaction operation to transaction and the transaction coordinator. + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) diff --git a/pulsar/schema.go b/pulsar/schema.go index 0b413d40a6..95e045de73 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -23,6 +23,7 @@ import ( "fmt" "hash/maphash" "reflect" + "sync/atomic" "unsafe" log "github.com/sirupsen/logrus" @@ -37,27 +38,27 @@ import ( type SchemaType int const ( - NONE SchemaType = iota //No schema defined - STRING //Simple String encoding with UTF-8 - JSON //JSON object encoding and validation - PROTOBUF //Protobuf message encoding and decoding - AVRO //Serialize and deserialize via Avro + NONE SchemaType = iota // No schema defined + STRING // Simple String encoding with UTF-8 + JSON // JSON object encoding and validation + PROTOBUF // Protobuf message encoding and decoding + AVRO // Serialize and deserialize via Avro BOOLEAN // - INT8 //A 8-byte integer. - INT16 //A 16-byte integer. - INT32 //A 32-byte integer. - INT64 //A 64-byte integer. - FLOAT //A float number. - DOUBLE //A double number + INT8 // A 8-byte integer. + INT16 // A 16-byte integer. + INT32 // A 32-byte integer. + INT64 // A 64-byte integer. + FLOAT // A float number. + DOUBLE // A double number _ // _ // _ // - KeyValue //A Schema that contains Key Schema and Value Schema. - BYTES = -1 //A bytes array. + KeyValue // A Schema that contains Key Schema and Value Schema. + BYTES = -1 // A bytes array. AUTO = -2 // - AutoConsume = -3 //Auto Consume Type. + AutoConsume = -3 // Auto Consume Type. AutoPublish = -4 // Auto Publish Type. - ProtoNative = 20 //Protobuf native message encoding and decoding + ProtoNative = 20 // Protobuf native message encoding and decoding ) // Encapsulates data around the schema definition @@ -66,13 +67,21 @@ type SchemaInfo struct { Schema string Type SchemaType Properties map[string]string + hashVal atomic.Uint64 } func (s SchemaInfo) hash() uint64 { + oldHash := s.hashVal.Load() + if oldHash != 0 { + return oldHash + } + h := maphash.Hash{} h.SetSeed(seed) h.Write([]byte(s.Schema)) - return h.Sum64() + newHash := h.Sum64() + s.hashVal.CompareAndSwap(oldHash, newHash) + return newHash } type Schema interface { From 8289336d0fde6bd667d8b54f88708f9bc5799b7b Mon Sep 17 00:00:00 2001 From: gunli Date: Mon, 26 Jun 2023 11:07:29 +0800 Subject: [PATCH 2/6] [Fix] fix lint error --- pulsar/schema.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/schema.go b/pulsar/schema.go index 95e045de73..6532429f5b 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -70,7 +70,7 @@ type SchemaInfo struct { hashVal atomic.Uint64 } -func (s SchemaInfo) hash() uint64 { +func (s *SchemaInfo) hash() uint64 { oldHash := s.hashVal.Load() if oldHash != 0 { return oldHash From c147b11f2819a4f06904039451cb8a27e2d59e9c Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 20:33:04 +0800 Subject: [PATCH 3/6] [revert] revert comment format --- pulsar/producer_partition.go | 2 +- pulsar/schema.go | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 06bcc5a7d2..ece53e78e4 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1101,7 +1101,7 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - // Register transaction operation to transaction and the transaction coordinator. + //Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) diff --git a/pulsar/schema.go b/pulsar/schema.go index 6532429f5b..1708fb956b 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -38,27 +38,27 @@ import ( type SchemaType int const ( - NONE SchemaType = iota // No schema defined - STRING // Simple String encoding with UTF-8 - JSON // JSON object encoding and validation - PROTOBUF // Protobuf message encoding and decoding - AVRO // Serialize and deserialize via Avro + NONE SchemaType = iota //No schema defined + STRING //Simple String encoding with UTF-8 + JSON //JSON object encoding and validation + PROTOBUF //Protobuf message encoding and decoding + AVRO //Serialize and deserialize via Avro BOOLEAN // - INT8 // A 8-byte integer. - INT16 // A 16-byte integer. - INT32 // A 32-byte integer. - INT64 // A 64-byte integer. - FLOAT // A float number. - DOUBLE // A double number + INT8 //A 8-byte integer. + INT16 //A 16-byte integer. + INT32 //A 32-byte integer. + INT64 //A 64-byte integer. + FLOAT //A float number. + DOUBLE //A double number _ // _ // _ // - KeyValue // A Schema that contains Key Schema and Value Schema. - BYTES = -1 // A bytes array. + KeyValue //A Schema that contains Key Schema and Value Schema. + BYTES = -1 //A bytes array. AUTO = -2 // - AutoConsume = -3 // Auto Consume Type. - AutoPublish = -4 // Auto Publish Type. - ProtoNative = 20 // Protobuf native message encoding and decoding + AutoConsume = -3 //Auto Consume Type. + AutoPublish = -4 //Auto Publish Type. + ProtoNative = 20 //Protobuf native message encoding and decoding ) // Encapsulates data around the schema definition From 8e12ffccb0fbcfa976df092e8e52e8e3696e7eaa Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 20:35:13 +0800 Subject: [PATCH 4/6] [revert] revert comment format --- pulsar/schema.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/schema.go b/pulsar/schema.go index 1708fb956b..b164a92892 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -57,7 +57,7 @@ const ( BYTES = -1 //A bytes array. AUTO = -2 // AutoConsume = -3 //Auto Consume Type. - AutoPublish = -4 //Auto Publish Type. + AutoPublish = -4 // Auto Publish Type. ProtoNative = 20 //Protobuf native message encoding and decoding ) From fb4b0234b57b93a3aad41ca47bb1207797c0276f Mon Sep 17 00:00:00 2001 From: gunli Date: Tue, 4 Jul 2023 15:08:14 +0800 Subject: [PATCH 5/6] use sync.Once instead of atomic.Uint64 --- pulsar/schema.go | 54 +++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/pulsar/schema.go b/pulsar/schema.go index b164a92892..8148cd945e 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -23,7 +23,7 @@ import ( "fmt" "hash/maphash" "reflect" - "sync/atomic" + "sync" "unsafe" log "github.com/sirupsen/logrus" @@ -38,27 +38,27 @@ import ( type SchemaType int const ( - NONE SchemaType = iota //No schema defined - STRING //Simple String encoding with UTF-8 - JSON //JSON object encoding and validation - PROTOBUF //Protobuf message encoding and decoding - AVRO //Serialize and deserialize via Avro + NONE SchemaType = iota // No schema defined + STRING // Simple String encoding with UTF-8 + JSON // JSON object encoding and validation + PROTOBUF // Protobuf message encoding and decoding + AVRO // Serialize and deserialize via Avro BOOLEAN // - INT8 //A 8-byte integer. - INT16 //A 16-byte integer. - INT32 //A 32-byte integer. - INT64 //A 64-byte integer. - FLOAT //A float number. - DOUBLE //A double number + INT8 // A 8-byte integer. + INT16 // A 16-byte integer. + INT32 // A 32-byte integer. + INT64 // A 64-byte integer. + FLOAT // A float number. + DOUBLE // A double number _ // _ // _ // - KeyValue //A Schema that contains Key Schema and Value Schema. - BYTES = -1 //A bytes array. + KeyValue // A Schema that contains Key Schema and Value Schema. + BYTES = -1 // A bytes array. AUTO = -2 // - AutoConsume = -3 //Auto Consume Type. + AutoConsume = -3 // Auto Consume Type. AutoPublish = -4 // Auto Publish Type. - ProtoNative = 20 //Protobuf native message encoding and decoding + ProtoNative = 20 // Protobuf native message encoding and decoding ) // Encapsulates data around the schema definition @@ -67,21 +67,19 @@ type SchemaInfo struct { Schema string Type SchemaType Properties map[string]string - hashVal atomic.Uint64 + hashVal uint64 + hashOnce sync.Once } func (s *SchemaInfo) hash() uint64 { - oldHash := s.hashVal.Load() - if oldHash != 0 { - return oldHash - } - - h := maphash.Hash{} - h.SetSeed(seed) - h.Write([]byte(s.Schema)) - newHash := h.Sum64() - s.hashVal.CompareAndSwap(oldHash, newHash) - return newHash + s.hashOnce.Do(func() { + h := maphash.Hash{} + h.SetSeed(seed) + h.Write([]byte(s.Schema)) + s.hashVal = h.Sum64() + }) + + return s.hashVal } type Schema interface { From 78e9b1694e986da1df02f7886d9db930276fded4 Mon Sep 17 00:00:00 2001 From: gunli Date: Tue, 4 Jul 2023 15:12:59 +0800 Subject: [PATCH 6/6] revert comment format --- pulsar/schema.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pulsar/schema.go b/pulsar/schema.go index 8148cd945e..5c063e32a3 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -38,27 +38,27 @@ import ( type SchemaType int const ( - NONE SchemaType = iota // No schema defined - STRING // Simple String encoding with UTF-8 - JSON // JSON object encoding and validation - PROTOBUF // Protobuf message encoding and decoding - AVRO // Serialize and deserialize via Avro + NONE SchemaType = iota //No schema defined + STRING //Simple String encoding with UTF-8 + JSON //JSON object encoding and validation + PROTOBUF //Protobuf message encoding and decoding + AVRO //Serialize and deserialize via Avro BOOLEAN // - INT8 // A 8-byte integer. - INT16 // A 16-byte integer. - INT32 // A 32-byte integer. - INT64 // A 64-byte integer. - FLOAT // A float number. - DOUBLE // A double number + INT8 //A 8-byte integer. + INT16 //A 16-byte integer. + INT32 //A 32-byte integer. + INT64 //A 64-byte integer. + FLOAT //A float number. + DOUBLE //A double number _ // _ // _ // - KeyValue // A Schema that contains Key Schema and Value Schema. - BYTES = -1 // A bytes array. + KeyValue //A Schema that contains Key Schema and Value Schema. + BYTES = -1 //A bytes array. AUTO = -2 // - AutoConsume = -3 // Auto Consume Type. + AutoConsume = -3 //Auto Consume Type. AutoPublish = -4 // Auto Publish Type. - ProtoNative = 20 // Protobuf native message encoding and decoding + ProtoNative = 20 //Protobuf native message encoding and decoding ) // Encapsulates data around the schema definition