Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ coverage.html
perf/perf
pulsar-perf
bin

vendor/
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.32.0
go.uber.org/atomic v1.7.0
go.uber.org/atomic v1.11.0
golang.org/x/mod v0.20.0
golang.org/x/oauth2 v0.11.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lI
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
8 changes: 4 additions & 4 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (p *availablePermits) flowIfNeed() {
availablePermits := current
requestedPermits := current
// check if permits changed
if !p.permits.CAS(current, 0) {
if !p.permits.CompareAndSwap(current, 0) {
return
}

Expand Down Expand Up @@ -2084,13 +2084,13 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
if !pc.options.autoReceiverQueueSize {
return
}
if pc.scaleReceiverQueueHint.CAS(true, false) {
if pc.scaleReceiverQueueHint.CompareAndSwap(true, false) {
oldSize := pc.currentQueueSize.Load()
maxSize := int32(pc.options.receiverQueueSize)
newSize := int32(math.Min(float64(maxSize), float64(oldSize*2)))
usagePercent := pc.client.memLimit.CurrentUsagePercent()
if usagePercent < receiverQueueExpansionMemThreshold && newSize > oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.currentQueueSize.CompareAndSwap(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
Expand All @@ -2116,7 +2116,7 @@ func (pc *partitionConsumer) shrinkReceiverQueueSize() {
minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize)))
newSize := int32(math.Max(float64(minSize), float64(oldSize/2)))
if newSize < oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.currentQueueSize.CompareAndSwap(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
Expand Down
11 changes: 0 additions & 11 deletions pulsar/message_chunking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ func TestInvalidChunkingConfig(t *testing.T) {
}

func TestLargeMessage(t *testing.T) {
rand.Seed(time.Now().Unix())

client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down Expand Up @@ -208,7 +206,6 @@ func TestMaxPendingChunkMessages(t *testing.T) {
}

func TestExpireIncompleteChunks(t *testing.T) {
rand.Seed(time.Now().Unix())
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down Expand Up @@ -240,8 +237,6 @@ func TestExpireIncompleteChunks(t *testing.T) {
}

func TestChunksEnqueueFailed(t *testing.T) {
rand.Seed(time.Now().Unix())

client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down Expand Up @@ -278,8 +273,6 @@ func TestChunksEnqueueFailed(t *testing.T) {
}

func TestSeekChunkMessages(t *testing.T) {
rand.Seed(time.Now().Unix())

client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down Expand Up @@ -343,8 +336,6 @@ func TestSeekChunkMessages(t *testing.T) {
}

func TestChunkAckAndNAck(t *testing.T) {
rand.Seed(time.Now().Unix())

client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down Expand Up @@ -400,8 +391,6 @@ func TestChunkAckAndNAck(t *testing.T) {
}

func TestChunkSize(t *testing.T) {
rand.Seed(time.Now().Unix())

client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand Down
9 changes: 5 additions & 4 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
doneCh := make(chan struct{})

p.internalSendAsync(ctx, msg, func(ID MessageID, _ *ProducerMessage, e error) {
if isDone.CAS(false, true) {
if isDone.CompareAndSwap(false, true) {
err = e
msgID = ID
close(doneCh)
Expand Down Expand Up @@ -1394,7 +1394,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
)

if sr.totalChunks > 1 {
if sr.chunkID == 0 {
switch sr.chunkID {
case 0:
sr.chunkRecorder.setFirstChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
Expand All @@ -1403,7 +1404,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
p.partitionIdx,
0,
})
} else if sr.chunkID == sr.totalChunks-1 {
case sr.totalChunks - 1:
sr.chunkRecorder.setLastChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
Expand Down Expand Up @@ -1546,7 +1547,7 @@ func (p *partitionProducer) setProducerState(state producerState) {
// set a new producerState and return the last state
// returns bool if the new state has been set or not
func (p *partitionProducer) casProducerState(oldState, newState producerState) bool {
return p.state.CAS(int32(oldState), int32(newState))
return p.state.CompareAndSwap(int32(oldState), int32(newState))
}

func (p *partitionProducer) Close() {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1977,8 +1977,8 @@ func TestWaitForExclusiveProducer(t *testing.T) {
Topic: topicName,
ProducerAccessMode: ProducerAccessModeWaitForExclusive,
})
defer producer2.Close()
assert.NoError(t, err)
defer producer2.Close()
assert.NotNil(t, producer2)

id, err := producer2.Send(context.Background(), &ProducerMessage{
Expand Down
2 changes: 1 addition & 1 deletion pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (t *transactionHandler) endTxn(op *endTxnOp) {
}

func (t *transactionHandler) close() {
if !t.state.CAS(txnHandlerReady, txnHandlerClosed) {
if !t.state.CompareAndSwap(txnHandlerReady, txnHandlerClosed) {
return
}
close(t.closeCh)
Expand Down
3 changes: 3 additions & 0 deletions pulsaradmin/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Client interface {
ResourceQuotas() ResourceQuotas
FunctionsWorker() FunctionsWorker
Packages() Packages
Transactions() Transactions
Proxy() Proxy
LoadBalancer() LoadBalancer
}

type pulsarClient struct {
Expand Down
30 changes: 30 additions & 0 deletions pulsaradmin/pkg/admin/broker_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ type BrokerStats interface {

// GetAllocatorStats returns stats from broker
GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error)

// GetFunctionsMetrics returns Functions metrics from broker
GetFunctionsMetrics() ([]utils.Metrics, error)

// GetBrokerResourceAvailability returns broker resource availability
GetBrokerResourceAvailability(namespace string) (map[string]utils.ResourceUsage, error)

// GetPendingBookieOpsStats returns pending bookie operations stats
GetPendingBookieOpsStats() (map[string]interface{}, error)
}

type brokerStats struct {
Expand Down Expand Up @@ -103,3 +112,24 @@ func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*utils.Allocator
}
return &allocatorStats, nil
}

func (bs *brokerStats) GetFunctionsMetrics() ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/functions-metrics")
var metrics []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &metrics)
return metrics, err
}

func (bs *brokerStats) GetBrokerResourceAvailability(namespace string) (map[string]utils.ResourceUsage, error) {
var resources map[string]utils.ResourceUsage
endpoint := bs.pulsar.endpoint(bs.basePath, "/broker-resource-availability", namespace)
err := bs.pulsar.Client.Get(endpoint, &resources)
return resources, err
}

func (bs *brokerStats) GetPendingBookieOpsStats() (map[string]interface{}, error) {
var stats map[string]interface{}
endpoint := bs.pulsar.endpoint(bs.basePath, "/pending-bookie-ops-stats")
err := bs.pulsar.Client.Get(endpoint, &stats)
return stats, err
}
76 changes: 76 additions & 0 deletions pulsaradmin/pkg/admin/load_balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package admin

import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)

// LoadBalancer is admin interface for load balancer management
type LoadBalancer interface {
// GetLoadBalancerBrokerRanking returns the broker ranking
GetLoadBalancerBrokerRanking() (map[string]interface{}, error)

// GetBundleUnloadingMetrics returns bundle unloading metrics
GetBundleUnloadingMetrics() (*utils.BundleUnloadingMetrics, error)

// GetLeaderBroker returns the leader broker for load balancing
GetLeaderBroker() (*utils.BrokerInfo, error)

// UpdateLoadManagerLeader updates the load manager leader
UpdateLoadManagerLeader() error
}

type loadBalancer struct {
pulsar *pulsarClient
basePath string
}

// LoadBalancer is used to access the load balancer endpoints
func (c *pulsarClient) LoadBalancer() LoadBalancer {
return &loadBalancer{
pulsar: c,
basePath: "/load-manager",
}
}

func (lb *loadBalancer) GetLoadBalancerBrokerRanking() (map[string]interface{}, error) {
var ranking map[string]interface{}
endpoint := lb.pulsar.endpoint(lb.basePath, "brokerRanking")
err := lb.pulsar.Client.Get(endpoint, &ranking)
return ranking, err
}

func (lb *loadBalancer) GetBundleUnloadingMetrics() (*utils.BundleUnloadingMetrics, error) {
var metrics utils.BundleUnloadingMetrics
endpoint := lb.pulsar.endpoint(lb.basePath, "bundle-unloading")
err := lb.pulsar.Client.Get(endpoint, &metrics)
return &metrics, err
}

func (lb *loadBalancer) GetLeaderBroker() (*utils.BrokerInfo, error) {
var broker utils.BrokerInfo
endpoint := lb.pulsar.endpoint(lb.basePath, "leader")
err := lb.pulsar.Client.Get(endpoint, &broker)
return &broker, err
}

func (lb *loadBalancer) UpdateLoadManagerLeader() error {
endpoint := lb.pulsar.endpoint(lb.basePath, "leader")
return lb.pulsar.Client.Post(endpoint, nil)
}
60 changes: 60 additions & 0 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ type Namespaces interface {
// When deduplication is enabled, the broker will prevent to store the same Message multiple times
SetDeduplicationStatus(namespace string, enableDeduplication bool) error

// GetDeduplicationStatus returns the deduplication status for all topics within a namespace
GetDeduplicationStatus(namespace string) (bool, error)

// SetPersistence sets the persistence configuration for all the topics on a namespace
SetPersistence(namespace string, persistence utils.PersistencePolicies) error

Expand Down Expand Up @@ -212,9 +215,15 @@ type Namespaces interface {
// SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace
SetSubscriptionAuthMode(namespace utils.NameSpaceName, mode utils.SubscriptionAuthMode) error

// GetSubscriptionAuthMode returns the subscription auth mode for a namespace
GetSubscriptionAuthMode(namespace utils.NameSpaceName) (utils.SubscriptionAuthMode, error)

// SetEncryptionRequiredStatus sets the encryption required status for all topics within a namespace
SetEncryptionRequiredStatus(namespace utils.NameSpaceName, encrypt bool) error

// GetEncryptionRequiredStatus returns the encryption required status for all topics within a namespace
GetEncryptionRequiredStatus(namespace utils.NameSpaceName) (bool, error)

// UnsubscribeNamespace unsubscribe the given subscription on all topics on a namespace
UnsubscribeNamespace(namespace utils.NameSpaceName, sName string) error

Expand Down Expand Up @@ -295,6 +304,12 @@ type Namespaces interface {
// RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace,
// defaulting to broker settings
RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error

// GetBundles returns the bundles for a namespace
GetBundles(namespace utils.NameSpaceName) (*utils.BundlesData, error)

// GetNamespaceStats returns stats for a namespace
GetNamespaceStats(namespace utils.NameSpaceName) (map[string]interface{}, error)
}

type namespaces struct {
Expand Down Expand Up @@ -940,3 +955,48 @@ func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceN
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime")
return n.pulsar.Client.Delete(endpoint)
}

func (n *namespaces) GetDeduplicationStatus(namespace string) (bool, error) {
var result bool
nsName, err := utils.GetNamespaceName(namespace)
if err != nil {
return false, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "deduplication")
err = n.pulsar.Client.Get(endpoint, &result)
return result, err
}

func (n *namespaces) GetSubscriptionAuthMode(namespace utils.NameSpaceName) (utils.SubscriptionAuthMode, error) {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionAuthMode")
data, err := n.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
if err != nil {
return "", err
}
mode, err := utils.ParseSubscriptionAuthMode(strings.ReplaceAll(string(data), "\"", ""))
if err != nil {
return "", err
}
return mode, nil
}

func (n *namespaces) GetEncryptionRequiredStatus(namespace utils.NameSpaceName) (bool, error) {
var result bool
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "encryptionRequired")
err := n.pulsar.Client.Get(endpoint, &result)
return result, err
}

func (n *namespaces) GetBundles(namespace utils.NameSpaceName) (*utils.BundlesData, error) {
var result utils.BundlesData
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "bundles")
err := n.pulsar.Client.Get(endpoint, &result)
return &result, err
}

func (n *namespaces) GetNamespaceStats(namespace utils.NameSpaceName) (map[string]interface{}, error) {
var result map[string]interface{}
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "stats")
err := n.pulsar.Client.Get(endpoint, &result)
return result, err
}
Loading