Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

feat: add leadership package #661

wants to merge 7 commits into from

Conversation

gfyrag
Copy link
Contributor

@gfyrag gfyrag commented Jan 23, 2025

No description provided.

@gfyrag gfyrag requested a review from a team as a code owner January 23, 2025 10:30
Copy link

coderabbitai bot commented Jan 23, 2025

Walkthrough

This pull request introduces a comprehensive leadership module for managing distributed leadership in a concurrent system. The new leadership package provides functionality for acquiring and managing leadership across multiple service instances. It includes mechanisms for lock acquisition, broadcasting leadership changes, and integrating with the application's dependency injection framework. The implementation supports robust leader election, with features like retry periods, context-based leadership tracking, and thread-safe broadcasting of leadership status.

Changes

File Change Summary
cmd/serve.go Added import and integration of leadership.NewFXModule() in service configuration
internal/leadership/... Created new package with multiple files implementing leadership management:
- leadership.go: Defines core Leadership struct
- locker.go: Implements database locking mechanism
- module.go: Provides FX module integration
- broadcaster.go: Implements type-safe message broadcasting
- context.go: Adds leadership context management
- manager.go: Manages leadership state and transitions
pkg/testserver/server.go Added IsLeader() method and leadership context setup
test/e2e/app_multiple_instance_test.go Updated to support leadership testing across multiple server instances

Sequence Diagram

sequenceDiagram
    participant Manager
    participant Locker
    participant Broadcaster
    participant Context

    Manager->>Locker: Attempt to Take Lock
    alt Lock Acquired
        Locker-->>Manager: Return DB Handle
        Manager->>Broadcaster: Broadcast Leadership
        Manager->>Context: Set Leadership Status
    else Lock Not Acquired
        Locker-->>Manager: Return Error
        Manager->>Manager: Retry After Interval
    end
Loading

Possibly related PRs

Suggested labels

build-images

Suggested reviewers

  • flemzord
  • paul-nicolas

Poem

🐰 Leadership's Dance, a Rabbit's Glance

In clusters of code, where servers compete,
A bunny hops in with a leadership feat
Locks acquired, broadcasters sing
One leader emerges, making systems swing
Distributed magic, with mutex so bright! 🌟

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (6)
internal/leadership/leadership.go (2)

46-52: Implement backoff strategy to prevent tight looping on errors

In the Run method, if acquire continually returns errors (other than context.Canceled), the loop could consume excessive CPU resources due to tight looping.

Introduce a backoff or delay before retrying to acquire leadership after an error:

	func (l *Leadership) Run(ctx context.Context) {
		for {
			if err := l.acquire(ctx); err != nil {
				if errors.Is(err, context.Canceled) {
					return
				}
				l.logger.Errorf("error acquiring leadership: %s", err)
+				select {
+				case <-ctx.Done():
+					return
+				case <-time.After(l.retryPeriod):
+				}
			}
		}
	}

76-80: Validate input in WithRetryPeriod to prevent invalid configurations

The WithRetryPeriod function does not validate the duration input. A non-positive duration could lead to unintended behavior, such as immediate retries without delay.

Add validation to ensure duration is positive:

	func WithRetryPeriod(duration time.Duration) Option {
+		if duration <= 0 {
+			panic("retryPeriod must be positive")
+		}
		return func(leadership *Leadership) {
			leadership.retryPeriod = duration
		}
	}

Alternatively, set a default value instead of panicking:

	func WithRetryPeriod(duration time.Duration) Option {
		return func(leadership *Leadership) {
+			if duration <= 0 {
+				leadership.logger.Warn("Invalid retryPeriod; using default of 2s")
+				duration = 2 * time.Second
+			}
			leadership.retryPeriod = duration
		}
	}
internal/leadership/locker.go (2)

43-45: Handle errors when closing the connection in the release function

Errors returned by conn.Close() are currently ignored. While closing a database connection typically doesn't fail, it's good practice to handle any potential errors.

Modify the release function to handle errors:

	return true, func() {
-		_ = conn.Close()
+		if err := conn.Close(); err != nil {
+			// Handle or log the error appropriately
+			fmt.Printf("Error closing DB connection: %v\n", err)
+		}
	}, nil

48-50: Consider returning an error if db is nil in NewDefaultLocker

If NewDefaultLocker is called with a nil db, it could lead to a runtime error later when Take is called.

Add a check for nil and return an error if necessary:

	func NewDefaultLocker(db *bun.DB) Locker {
+		if db == nil {
+			return nil, fmt.Errorf("db cannot be nil")
+		}
		return &defaultLocker{db: db}
	}
internal/leadership/signal.go (1)

74-83: Consider non-blocking signal dispatch to prevent slow listeners from blocking others.

The current implementation might block if a listener's channel is full. Consider using a non-blocking send or a separate goroutine for each listener.

Apply this diff to implement non-blocking signal dispatch:

 func (h *Signal) Signal(t bool) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	h.t = &t
 
 	for _, inner := range h.inner {
-		inner.channel <- t
+		select {
+		case inner.channel <- t:
+		default:
+			// Skip slow listeners
+		}
 	}
 }
internal/leadership/leadership_test.go (1)

60-61: Consider making the timeout configurable to prevent flaky tests.

The hard-coded 100ms timeout might be too short on slower systems or under high load, leading to flaky tests.

Apply this diff to make the timeout configurable:

+const testTimeout = 500 * time.Millisecond
+
 func TestLeaderShip(t *testing.T) {
     // ... existing code ...
-    case <-time.After(100 * time.Millisecond):
+    case <-time.After(testTimeout):
         t.Fatal("signal should have been received")
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c10b32b and 61adcea.

📒 Files selected for processing (8)
  • cmd/serve.go (3 hunks)
  • internal/README.md (99 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/leadership_test.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/locker_generated_test.go (1 hunks)
  • internal/leadership/module.go (1 hunks)
  • internal/leadership/signal.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • internal/README.md
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Tests
🔇 Additional comments (5)
internal/leadership/leadership.go (1)

19-20: ⚠️ Potential issue

Ensure locker.Take(ctx) handles context cancellation promptly

In the acquire method, if locker.Take(ctx) does not respect context cancellation, it may block indefinitely even if ctx is canceled. To prevent this, ensure that the Locker implementation properly handles context cancellation and returns immediately when ctx is done.

Verify that locker.Take(ctx) is implemented to handle context cancellation appropriately. If not, consider modifying the Locker interface or its implementations to support this behavior.

internal/leadership/module.go (1)

21-23: Avoid potential goroutine leaks by ensuring Run exits on context cancellation

The goroutine running runner.Run(ctx) should exit when the context is canceled. Ensure that Leadership.Run properly handles context cancellation to prevent goroutine leaks.

Verify that Leadership.Run exits when ctx is canceled. If not, adjust Leadership.Run to respect context cancellation.

internal/leadership/locker_generated_test.go (1)

1-55: LGTM! Well-structured mock implementation.

The mock implementation is correctly generated and provides the necessary testing capabilities for the Locker interface.

internal/leadership/leadership_test.go (1)

53-73: Add test cases for edge scenarios.

The test covers basic leadership transfer but misses important edge cases:

  • Concurrent leadership attempts
  • Network failures (error cases)
  • Leadership loss during operation

Would you like me to generate additional test cases for these scenarios?

cmd/serve.go (1)

112-112: LGTM! Clean integration of the leadership module.

The leadership module is properly integrated into the service's dependency injection setup.

Comment on lines 25 to 33
if acquired {
l.changes.Signal(true)
l.logger.Info("leadership acquired")
<-ctx.Done()
l.logger.Info("leadership lost")
release()
l.changes.Signal(false)
return ctx.Err()
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Release lock immediately upon context cancellation

There is a potential delay in releasing the lock when the context is canceled. If ctx.Done() is signaled, the lock should be released promptly to avoid holding onto leadership longer than necessary.

Consider restructuring the code to ensure the lock is released immediately upon context cancellation:

	if acquired {
		l.changes.Signal(true)
		l.logger.Info("leadership acquired")
-		<-ctx.Done()
+		select {
+		case <-ctx.Done():
+			l.logger.Info("leadership lost")
+			release()
+			l.changes.Signal(false)
+			return ctx.Err()
+		}
	} else {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(l.retryPeriod):
		}
	}

Committable suggestion skipped: line range outside the PR's diff.

)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

context.WithoutCancel is not a standard function

At line 19, context.WithoutCancel(ctx) is used, but this function does not exist in the standard library. This will result in a compilation error.

Modify the code to use a valid context function. If the intention is to create a child context that isn't canceled when the parent is, consider using context.Background(). However, be cautious as this may have implications for context propagation.

	func(ctx context.Context) error {
-		ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
+		ctx, cancel = context.WithCancel(context.Background())

Alternatively, if you want the child context to inherit cancellation, simply use:

	func(ctx context.Context) error {
-		ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
+		ctx, cancel = context.WithCancel(ctx)

Ensure that the choice aligns with the desired behavior of the application.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
ctx, cancel = context.WithCancel(context.Background())
Suggested change
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
ctx, cancel = context.WithCancel(ctx)

Comment on lines 33 to 35
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
panic(err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using panic in library code

Using panic in a library function can cause the entire application to crash unexpectedly. It's better to return an error to the caller for proper handling.

Replace the panic with an error return:

	if err := ret.Scan(&acquired); err != nil {
		_ = conn.Close()
-		panic(err)
+		return false, nil, fmt.Errorf("error scanning result: %w", err)
	}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 27 to 29
if ret.Err() != nil {
_ = conn.Close()
return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error handling after QueryRowContext

Using ret.Err() after QueryRowContext is incorrect because sql.Row does not have an Err() method. Errors should be captured from the Scan method.

Modify the code to check for errors correctly:

	ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
-	if ret.Err() != nil {
-		_ = conn.Close()
-		return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
-	}
	
	var acquired bool
	if err := ret.Scan(&acquired); err != nil {
		_ = conn.Close()
-		panic(err)
+		return false, nil, fmt.Errorf("error scanning result: %w", err)
	}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 31 to 29
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}

data, err := json.Marshal(*h.t)
if err != nil {
panic(err)
}

var ret bool
if err := json.Unmarshal(data, &ret); err != nil {
panic(err)
}

return &ret
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a simpler value copying mechanism.

The current implementation uses JSON marshaling/unmarshaling for copying a boolean value, which is unnecessarily complex and can panic.

Apply this diff for a simpler solution:

 func (h *Signal) copyValue() *bool {
 	if h.t == nil {
 		return nil
 	}
-
-	data, err := json.Marshal(*h.t)
-	if err != nil {
-		panic(err)
-	}
-
-	var ret bool
-	if err := json.Unmarshal(data, &ret); err != nil {
-		panic(err)
-	}
-
+	ret := *h.t
 	return &ret
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}
data, err := json.Marshal(*h.t)
if err != nil {
panic(err)
}
var ret bool
if err := json.Unmarshal(data, &ret); err != nil {
panic(err)
}
return &ret
}
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}
ret := *h.t
return &ret
}

Comment on lines 49 to 54
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan bool, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race condition in listener removal.

The current implementation might have a race condition if multiple goroutines try to remove listeners concurrently, as the index-based removal assumes the slice hasn't been modified.

Apply this diff to implement safer listener removal:

 func (h *Signal) Listen() (<-chan bool, func()) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	newChannel := make(chan bool, 1)
-	index := len(h.inner)
+	listener := listener{
+		channel: newChannel,
+	}
 	h.inner = append(h.inner, listener{
 		channel: newChannel,
 	})
 	if h.t != nil {
 		newChannel <- *h.copyValue()
 	}
 
 	return newChannel, func() {
 		h.mu.Lock()
 		defer h.mu.Unlock()
 
-		if index < len(h.inner)-1 {
-			h.inner = append(h.inner[:index], h.inner[index+1:]...)
-		} else {
-			h.inner = h.inner[:index]
+		for i, l := range h.inner {
+			if l.channel == newChannel {
+				h.inner = append(h.inner[:i], h.inner[i+1:]...)
+				break
+			}
 		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan bool, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
}
}
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan bool, 1)
listener := listener{
channel: newChannel,
}
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
for i, l := range h.inner {
if l.channel == newChannel {
h.inner = append(h.inner[:i], h.inner[i+1:]...)
break
}
}
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
internal/leadership/signal.go (1)

44-53: ⚠️ Potential issue

Fix potential race condition in listener removal

Using the index variable to remove listeners assumes that the inner slice hasn't been modified between appending the listener and removing it. This can lead to incorrect listener removal if other operations modify the slice concurrently.

Apply this diff to safely remove the listener based on the channel reference:

 return newChannel, func() {
 	h.mu.Lock()
 	defer h.mu.Unlock()

-	if index < len(h.inner)-1 {
-		h.inner = append(h.inner[:index], h.inner[index+1:]...)
-	} else {
-		h.inner = h.inner[:index]
-	}
+	for i, l := range h.inner {
+		if l.channel == newChannel {
+			h.inner = append(h.inner[:i], h.inner[i+1:]...)
+			break
+		}
+	}
 }

This modification searches for the listener by comparing channel references, ensuring the correct listener is removed even if the slice has been altered.

🧹 Nitpick comments (1)
internal/leadership/signal.go (1)

13-13: Simplify the Mutex field by embedding sync.Mutex directly

Currently, the mu field is a pointer to a sync.Mutex, which introduces unnecessary pointer indirection. Embedding sync.Mutex directly is the idiomatic Go approach and simplifies the code.

Apply this diff to simplify the Mutex field:

 type Signal struct {
-	mu *sync.Mutex
+	mu sync.Mutex
 	t  *bool

 	inner []listener
-	outer chan bool
 }

Update the mutex usage in your methods accordingly (the usage remains the same).

Additionally, in the NewSignal function, you no longer need to initialize mu:

 func NewSignal() *Signal {
 	return &Signal{
-		mu:    &sync.Mutex{},
 	}
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 61adcea and 6ef70af.

📒 Files selected for processing (8)
  • cmd/serve.go (3 hunks)
  • internal/README.md (99 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/leadership_test.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/locker_generated_test.go (1 hunks)
  • internal/leadership/module.go (1 hunks)
  • internal/leadership/signal.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • internal/leadership/leadership_test.go
  • internal/leadership/module.go
  • cmd/serve.go
  • internal/leadership/leadership.go
  • internal/leadership/locker_generated_test.go
  • internal/leadership/locker.go
  • internal/README.md

t *bool

inner []listener
outer chan bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove the unused outer channel field

The outer field in the Signal struct is declared but never used within the code. Removing it will clean up the struct and prevent any confusion about its purpose.

Apply this diff to remove the unused field:

 type Signal struct {
 	mu sync.Mutex
 	t  *bool

 	inner []listener
-	outer chan bool
 }

Also, remove the initialization of outer in the NewSignal function:

 func NewSignal() *Signal {
 	return &Signal{
-		outer: make(chan bool),
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
outer chan bool
type Signal struct {
mu sync.Mutex
t *bool
inner []listener
}

Comment on lines 62 to 64
for _, inner := range h.inner {
inner.channel <- t
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential blocking in Signal() when sending to listeners

If a listener's channel buffer is full (which can happen if the listener is slow to consume messages), the send operation inner.channel <- t will block indefinitely, possibly causing a deadlock.

Consider modifying the send operation to a non-blocking send using a select statement:

 for _, inner := range h.inner {
-	inner.channel <- t
+	select {
+	case inner.channel <- t:
+		// Sent successfully
+	default:
+		// Listener's channel is full; skip to prevent blocking
+	}
 }

Alternatively, you could increase the buffer size of the channels to accommodate more messages, but be cautious as this only mitigates the issue rather than fully resolving it.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, inner := range h.inner {
inner.channel <- t
}
for _, inner := range h.inner {
select {
case inner.channel <- t:
// Sent successfully
default:
// Listener's channel is full; skip to prevent blocking
}
}

Comment on lines 67 to 74
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent sending on closed channels after Close() is called

After Close() is invoked, the listener channels are closed. If Signal() is subsequently called, it will attempt to send on these closed channels, resulting in a panic.

Introduce an isClosed flag to track the closed state of the Signal and prevent signaling after closure:

 type Signal struct {
 	mu sync.Mutex
 	t  *bool

 	inner []listener
-	outer chan bool
+	isClosed bool
 }

 func (h *Signal) Signal(t bool) {
 	h.mu.Lock()
 	defer h.mu.Unlock()

+	if h.isClosed {
+		// Signal has been closed; ignore further signals
+		return
+	}

 	h.t = &t

 	for _, inner := range h.inner {
 		select {
 		case inner.channel <- t:
 			// Sent successfully
 		default:
 			// Listener's channel is full; skip to prevent blocking
 		}
 	}
 }

 func (h *Signal) Close() {
 	h.mu.Lock()
 	defer h.mu.Unlock()

+	h.isClosed = true

 	for _, inner := range h.inner {
 		close(inner.channel)
 	}
 }

This ensures that no signals are sent after closure, preventing panics from sending on closed channels.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()
for _, inner := range h.inner {
close(inner.channel)
}
}
type Signal struct {
mu sync.Mutex
t *bool
inner []listener
isClosed bool
}
func (h *Signal) Signal(t bool) {
h.mu.Lock()
defer h.mu.Unlock()
if h.isClosed {
// Signal has been closed; ignore further signals
return
}
h.t = &t
for _, inner := range h.inner {
select {
case inner.channel <- t:
// Sent successfully
default:
// Listener's channel is full; skip to prevent blocking
}
}
}
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()
h.isClosed = true
for _, inner := range h.inner {
close(inner.channel)
}
}

Copy link

codecov bot commented Jan 23, 2025

Codecov Report

Attention: Patch coverage is 90.62500% with 21 lines in your changes missing coverage. Please review.

Project coverage is 81.84%. Comparing base (c10b32b) to head (294efc4).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/leadership/locker.go 50.00% 8 Missing and 3 partials ⚠️
internal/leadership/manager.go 94.31% 4 Missing and 1 partial ⚠️
internal/leadership/context.go 86.36% 2 Missing and 1 partial ⚠️
internal/leadership/module.go 90.47% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #661      +/-   ##
==========================================
+ Coverage   81.71%   81.84%   +0.13%     
==========================================
  Files         131      137       +6     
  Lines        7059     7278     +219     
==========================================
+ Hits         5768     5957     +189     
- Misses        990     1012      +22     
- Partials      301      309       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (2)
internal/leadership/locker.go (2)

32-36: ⚠️ Potential issue

Fix incorrect error handling after QueryRowContext.

The ret.Err() check is incorrect as sql.Row doesn't have an Err() method. Errors should be captured from the Scan method.

 	ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
-	if ret.Err() != nil {
-		_ = conn.Close()
-		return nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
-	}

39-42: ⚠️ Potential issue

Replace panic with proper error handling.

Using panic in a library function can cause the entire application to crash unexpectedly.

 	if err := ret.Scan(&acquired); err != nil {
 		_ = conn.Close()
-		panic(err)
+		return nil, fmt.Errorf("error scanning result: %w", err)
 	}
🧹 Nitpick comments (12)
internal/leadership/manager.go (4)

9-15: Clarify concurrency assumptions for stopChannel.
When multiple goroutines call Stop concurrently, the code closes m.stopChannel after the first stop request. Subsequent receives from that closed channel can cause readability concerns or panic in some edge cases. Consider documenting or enforcing single-use semantics more explicitly.


17-54: Consider improved error handling on db.Close().
Currently, errors from db.Close() are ignored. If the code has any concurrency or data corruption implications, consider at least logging the error to aid debugging.


17-54: Validate loop exit strategy and resource cleanup.
After stopping, the database connection and leadership signals are cleaned up. This design is valid for a one-time usage pattern. If the manager needs to be re-runnable, closing m.stopChannel can hamper subsequent restarts.


77-91: Add usage documentation for optional parameters.
The pattern with Option functions is great for flexibility. However, consider a brief doc comment or usage example for WithRetryPeriod, so consumers know the default is 2 seconds.

test/e2e/app_multiple_instance_test.go (2)

Line range hint 25-54: Avoid channel usage when a slice suffices.
You’re collecting servers in a buffered channel, draining them into a slice, and closing the channel. Since you’re using wg to coordinate, you could directly append to the slice instead of using a channel. This might simplify the concurrency flow.


80-110: Use smaller-scoped tests to isolate leadership transitions.
This set of nested test contexts covers starting multiple instances, ensuring only one leader, then intentionally stopping the leader so a new leader emerges. This approach is valid as an E2E scenario, but consider adding smaller integration tests to confirm each step in isolation.

internal/leadership/broadcaster.go (1)

29-52: Prevent goroutine leaks and consider channel buffer size.

The Subscribe method has potential issues:

  1. Goroutine leaks if clients don't call the unsubscribe function.
  2. Buffer size of 1 might be insufficient for high-frequency updates, leading to blocked goroutines.

Consider these improvements:

  1. Add context support for automatic cleanup:
-func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
+func (h *Broadcaster) Subscribe(ctx context.Context) (<-chan Leadership, func()) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
-	newChannel := make(chan Leadership, 1)
+	newChannel := make(chan Leadership, 10) // Increased buffer
 	index := len(h.inner)
 	h.inner = append(h.inner, listener{
 		channel: newChannel,
 	})
 	if h.t != nil {
 		newChannel <- *h.t
 	}
 
+	// Auto cleanup on context cancellation
+	go func() {
+		<-ctx.Done()
+		h.mu.Lock()
+		defer h.mu.Unlock()
+		if index < len(h.inner) {
+			close(h.inner[index].channel)
+			h.inner = append(h.inner[:index], h.inner[index+1:]...)
+		}
+	}()
+
 	return newChannel, func() {
 		h.mu.Lock()
 		defer h.mu.Unlock()
internal/leadership/manager_test.go (2)

13-13: Fix typo in test function name.

The function name has incorrect capitalization.

-func TestLeaderShip(t *testing.T) {
+func TestLeadership(t *testing.T) {

24-33: Define constants for magic numbers and improve test setup.

The test uses magic numbers without explanation and could be structured better.

Apply this diff to improve the test setup:

-	const count = 10
+	const (
+		instanceCount   = 10
+		retryPeriod    = 10 * time.Millisecond
+		leaderTimeout  = 2 * time.Second
+		pollInterval   = 10 * time.Millisecond
+	)
 
-	instances := make([]*Manager, count)
-	for i := range count {
+	instances := make([]*Manager, instanceCount)
+	for i := range instanceCount {
 		m := NewDefaultLocker(db)
-		manager := NewManager(m, logging.Testing(), WithRetryPeriod(10*time.Millisecond))
+		manager := NewManager(m, logging.Testing(), WithRetryPeriod(retryPeriod))
internal/leadership/locker.go (3)

9-10: Document the advisory lock key choice.

The hardcoded advisory lock key should be documented to explain:

  1. Why this specific number was chosen
  2. How to avoid conflicts with other advisory locks in the system
  3. Whether this value should be configurable
-const leadershipAdvisoryLockKey = 123456789
+// leadershipAdvisoryLockKey is a unique identifier for the leadership advisory lock.
+// This value must not conflict with other advisory locks used in the system.
+// TODO: Consider making this configurable via environment variables.
+const leadershipAdvisoryLockKey = 123456789

16-20: Enhance interface documentation.

While the current documentation is good, it could be more specific about the return values:

-// Locker take a lock at process level
-// It returns a bun.IDB which MUST be invalidated when the lock is lost
+// Locker attempts to acquire a process-level lock.
+// Take returns:
+// - (DBHandle, nil) when the lock is successfully acquired
+// - (nil, nil) when the lock is already held by another process
+// - (nil, error) when an error occurs during the attempt
+// The returned DBHandle MUST be closed when the lock is no longer needed or lost

26-50: Consider adding a timeout for lock acquisition.

The lock acquisition might hang indefinitely if the context doesn't have a timeout. Consider adding a timeout to prevent resource exhaustion.

+func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) {
+	// Add timeout to prevent indefinite hanging
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
+
 	conn, err := p.db.Conn(ctx)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6ef70af and 8ed6d5d.

📒 Files selected for processing (28)
  • cmd/serve.go (3 hunks)
  • internal/api/bulking/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/common/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/common/mocks_system_controller_test.go (0 hunks)
  • internal/api/v1/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/v1/mocks_system_controller_test.go (0 hunks)
  • internal/api/v2/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/v2/mocks_system_controller_test.go (0 hunks)
  • internal/controller/ledger/controller_generated_test.go (0 hunks)
  • internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go (0 hunks)
  • internal/controller/ledger/listener_generated_test.go (0 hunks)
  • internal/controller/ledger/numscript_parser_generated_test.go (0 hunks)
  • internal/controller/ledger/numscript_runtime_generated_test.go (0 hunks)
  • internal/controller/ledger/store_generated_test.go (0 hunks)
  • internal/doc.go (1 hunks)
  • internal/leadership/broadcaster.go (1 hunks)
  • internal/leadership/context.go (1 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/main_test.go (1 hunks)
  • internal/leadership/manager.go (1 hunks)
  • internal/leadership/manager_test.go (1 hunks)
  • internal/leadership/module.go (1 hunks)
  • internal/storage/driver/buckets_generated_test.go (0 hunks)
  • internal/storage/driver/ledger_generated_test.go (0 hunks)
  • internal/storage/driver/system_generated_test.go (0 hunks)
  • pkg/testserver/server.go (4 hunks)
  • test/e2e/app_multiple_instance_test.go (4 hunks)
💤 Files with no reviewable changes (16)
  • internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go
  • internal/storage/driver/ledger_generated_test.go
  • internal/api/v2/mocks_system_controller_test.go
  • internal/api/bulking/mocks_ledger_controller_test.go
  • internal/api/common/mocks_ledger_controller_test.go
  • internal/api/v2/mocks_ledger_controller_test.go
  • internal/api/v1/mocks_ledger_controller_test.go
  • internal/controller/ledger/numscript_parser_generated_test.go
  • internal/storage/driver/buckets_generated_test.go
  • internal/controller/ledger/numscript_runtime_generated_test.go
  • internal/storage/driver/system_generated_test.go
  • internal/api/v1/mocks_system_controller_test.go
  • internal/api/common/mocks_system_controller_test.go
  • internal/controller/ledger/listener_generated_test.go
  • internal/controller/ledger/controller_generated_test.go
  • internal/controller/ledger/store_generated_test.go
✅ Files skipped from review due to trivial changes (1)
  • internal/doc.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/leadership/leadership.go
  • internal/leadership/module.go
  • cmd/serve.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Tests
🔇 Additional comments (9)
internal/leadership/manager.go (1)

56-71: Confirm channel closure behavior.
In Stop, you perform a non-blocking check on stopChannel followed by sending a new chan struct{}. This works for single-run usage. If calls to Stop happen simultaneously, you could have duplicated or partial stop signals. Make sure the calling code never triggers multiple stops at once.

test/e2e/app_multiple_instance_test.go (1)

65-77: Check for flakiness in leader detection.
Tests with Eventually(...).WithTimeout(5 * time.Second) can be prone to occasional flakiness in busy CI environments. If you see sporadic failures, consider extending the timeout or polling at dynamic intervals.

internal/leadership/main_test.go (1)

18-24: Validate resource teardown.
TestMain uses WithTestMain to initialize a PostgreSQL server. Ensure that any leftover resources from srv are properly closed or cleaned up upon completion of the tests to avoid resource leaks in CI.

internal/leadership/context.go (2)

11-13: LGTM!

The function correctly initializes and stores the leadership holder in the context.


31-33: LGTM!

The holder struct is well-defined with a clear and descriptive field name.

pkg/testserver/server.go (2)

206-206: LGTM!

The leadership context is correctly initialized during server startup.


328-330: LGTM!

The IsLeader method correctly delegates to the leadership package.

internal/leadership/locker.go (2)

1-8: LGTM! Clean package structure with minimal imports.


52-54: LGTM! Clean constructor implementation.

The constructor function is well-implemented, returning the interface type instead of the concrete type.

Comment on lines 7 to 9
type contextKey string

var holderContextKey contextKey = "holder"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a private type for context key to prevent collisions.

Using a string type for context keys is not recommended as it can lead to key collisions. Instead, use a private unexported type.

Apply this diff to make the context key more robust:

-type contextKey string
+type contextKey struct{}
 
-var holderContextKey contextKey = "holder"
+var holderContextKey = &contextKey{}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type contextKey string
var holderContextKey contextKey = "holder"
type contextKey struct{}
var holderContextKey = &contextKey{}

Comment on lines 15 to 29
func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}
return h.(*holder).isLeader
}

func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
h.(*holder).isLeader = isLeader
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type assertion safety checks.

The type assertions h.(*holder) could panic if the context value is of an unexpected type. Add type assertion safety checks.

Apply this diff to make the type assertions safer:

 func IsLeader(ctx context.Context) bool {
 	h := ctx.Value(holderContextKey)
 	if h == nil {
 		return false
 	}
-	return h.(*holder).isLeader
+	if h, ok := h.(*holder); ok {
+		return h.isLeader
+	}
+	return false
 }
 
 func setIsLeader(ctx context.Context, isLeader bool) {
 	h := ctx.Value(holderContextKey)
 	if h == nil {
 		return
 	}
-	h.(*holder).isLeader = isLeader
+	if h, ok := h.(*holder); ok {
+		h.isLeader = isLeader
+	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}
return h.(*holder).isLeader
}
func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
h.(*holder).isLeader = isLeader
}
func IsLeader(ctx context.Context) bool {
h := ctx.Value(holderContextKey)
if h == nil {
return false
}
if h, ok := h.(*holder); ok {
return h.isLeader
}
return false
}
func setIsLeader(ctx context.Context, isLeader bool) {
h := ctx.Value(holderContextKey)
if h == nil {
return
}
if h, ok := h.(*holder); ok {
h.isLeader = isLeader
}
}

Comment on lines 11 to 17
type Broadcaster struct {
mu *sync.Mutex
t *Leadership

inner []listener
outer chan Leadership
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use mutex by value and remove unused channel.

The Broadcaster struct has two issues:

  1. Using a pointer to mutex is unnecessary and can lead to nil pointer dereference if not properly initialized.
  2. The outer channel is declared but never used in the implementation.

Apply this diff to fix these issues:

 type Broadcaster struct {
-	mu *sync.Mutex
+	mu sync.Mutex
 	t  *Leadership
 
 	inner []listener
-	outer chan Leadership
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type Broadcaster struct {
mu *sync.Mutex
t *Leadership
inner []listener
outer chan Leadership
}
type Broadcaster struct {
mu sync.Mutex
t *Leadership
inner []listener
}

Comment on lines 81 to 86
func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
mu: &sync.Mutex{},
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve constructor naming and initialization.

The NewSignal constructor has several issues:

  1. The name doesn't clearly convey its purpose.
  2. It initializes an unused channel.
  3. It creates a pointer to mutex unnecessarily.

Apply this diff to improve the constructor:

-func NewSignal() *Broadcaster {
+func NewBroadcaster() *Broadcaster {
 	return &Broadcaster{
-		outer: make(chan Leadership),
-		mu:    &sync.Mutex{},
+		mu:    sync.Mutex{},
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
mu: &sync.Mutex{},
}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
mu: sync.Mutex{},
}
}

Comment on lines 35 to 56
selectedLeader := -1
require.Eventually(t, func() bool {
for index, manager := range instances {
actual := manager.GetSignal().Actual()
if actual.Acquired {
selectedLeader = index
return true
}
}
return false
}, 2*time.Second, 10*time.Millisecond)
require.GreaterOrEqual(t, selectedLeader, 0)

// ensure the provided db connection is still functionnal
require.NoError(t, instances[selectedLeader].
GetSignal().
Actual().DB.
NewSelect().
Model(&map[string]any{}).
ColumnExpr("1 as v").
Scan(ctx),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add verification for leadership transfer timing and handle race conditions.

The test doesn't verify the timing of leadership transfer and has potential race conditions.

Consider these improvements:

  1. Add timing verification:
+	start := time.Now()
 	selectedLeader := -1
 	require.Eventually(t, func() bool {
 		for index, manager := range instances {
 			actual := manager.GetSignal().Actual()
 			if actual.Acquired {
 				selectedLeader = index
+				// Verify leadership was acquired within expected time
+				require.Less(t, time.Since(start), 500*time.Millisecond)
 				return true
 			}
 		}
 		return false
-	}, 2*time.Second, 10*time.Millisecond)
+	}, leaderTimeout, pollInterval)
  1. Handle race conditions by ensuring only one leader exists:
+	// Verify only one leader exists
+	leaderCount := 0
+	for _, manager := range instances {
+		if manager.GetSignal().Actual().Acquired {
+			leaderCount++
+		}
+	}
+	require.Equal(t, 1, leaderCount, "Expected exactly one leader")

Committable suggestion skipped: line range outside the PR's diff.


ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
if ret.Err() != nil {
_ = conn.Close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle connection close errors.

The error from conn.Close() is consistently ignored. These errors should be wrapped and returned.

-		_ = conn.Close()
+		if closeErr := conn.Close(); closeErr != nil {
+			return nil, fmt.Errorf("error closing connection after lock acquisition failure: %w (original error: %v)", closeErr, err)
+		}

Also applies to: 40-40, 45-45

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
internal/leadership/context.go (2)

8-10: ⚠️ Potential issue

Use a private type for context key to prevent collisions.

Using a string type for context keys is not recommended as it can lead to key collisions. Instead, use a private unexported type.

Apply this diff to make the context key more robust:

-type contextKey string
+type contextKey struct{}
 
-var holderContextKey contextKey = "holder"
+var holderContextKey = &contextKey{}

16-38: ⚠️ Potential issue

Add type assertion safety checks.

The type assertions h.(*holder) could panic if the context value is of an unexpected type. Add type assertion safety checks.

Apply this diff to make the type assertions safer:

 func IsLeader(ctx context.Context) bool {
 	h := ctx.Value(holderContextKey)
 	if h == nil {
 		return false
 	}
-	holder := h.(*holder)
+	holder, ok := h.(*holder)
+	if !ok {
+		return false
+	}
 	holder.Lock()
 	defer holder.Unlock()
 
 	return holder.isLeader
 }
 
 func setIsLeader(ctx context.Context, isLeader bool) {
 	h := ctx.Value(holderContextKey)
 	if h == nil {
 		return
 	}
-	holder := h.(*holder)
+	holder, ok := h.(*holder)
+	if !ok {
+		return
+	}
 	holder.Lock()
 	defer holder.Unlock()
 
 	holder.isLeader = isLeader
 }
🧹 Nitpick comments (1)
internal/leadership/context.go (1)

12-14: Consider accepting initial leadership state.

The function could be more flexible by accepting an initial leadership state parameter.

-func ContextWithLeadershipInfo(ctx context.Context) context.Context {
-	return context.WithValue(ctx, holderContextKey, &holder{})
+func ContextWithLeadershipInfo(ctx context.Context, initialState bool) context.Context {
+	return context.WithValue(ctx, holderContextKey, &holder{isLeader: initialState})
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8ed6d5d and 0da77e0.

📒 Files selected for processing (1)
  • internal/leadership/context.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Tests
🔇 Additional comments (4)
internal/leadership/context.go (4)

1-6: LGTM!

Package structure and imports are appropriate for the leadership management functionality.


22-24: LGTM! Good use of mutex for thread safety.

The implementation correctly uses mutex to ensure thread-safe access to the leadership state.

Also applies to: 34-36


40-43: LGTM! Clean type definition.

The holder struct is well-designed with proper mutex embedding for synchronization.


1-43: Verify leadership package integration.

Let's verify how this package is being used across the codebase to ensure consistent usage patterns.

✅ Verification successful

Leadership package integration verified - limited to test server usage

The package is properly encapsulated and consistently used, with its only integration point being the test server component. No issues found in the implementation or usage patterns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for usage of leadership package functions
echo "=== Searching for ContextWithLeadershipInfo usage ==="
rg "ContextWithLeadershipInfo" -A 2

echo -e "\n=== Searching for IsLeader usage ==="
rg "leadership\.IsLeader" -A 2

echo -e "\n=== Searching for potential direct access to holderContextKey ==="
rg "holderContextKey"

Length of output: 1244

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
internal/leadership/context.go (2)

12-14: Add documentation and consider renaming for clarity.

The function lacks documentation about its purpose and usage. Additionally, the name implies read-only information, but it creates a mutable holder.

Apply this diff to improve documentation and naming:

+// ContextWithLeadershipState creates a new context with leadership state tracking capability.
+// The returned context can be used to track and modify leadership status through IsLeader and setIsLeader functions.
-func ContextWithLeadershipInfo(ctx context.Context) context.Context {
+func ContextWithLeadershipState(ctx context.Context) context.Context {
 	return context.WithValue(ctx, holderContextKey, &holder{})
 }

40-43: Document the holder struct and consider explicit mutex.

The holder struct lacks documentation. Additionally, embedding sync.Mutex makes the mutex methods directly accessible, which might not be desirable.

Apply this diff to improve the implementation:

+// holder maintains thread-safe leadership state within a context.
 type holder struct {
-	sync.Mutex
+	mu       sync.Mutex // protects isLeader
 	isLeader bool
 }

Then update the mutex usage in IsLeader and setIsLeader:

-	holder.Lock()
-	defer holder.Unlock()
+	holder.mu.Lock()
+	defer holder.mu.Unlock()
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0da77e0 and b195cc0.

📒 Files selected for processing (20)
  • internal/api/bulking/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/common/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/common/mocks_system_controller_test.go (0 hunks)
  • internal/api/v1/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/v1/mocks_system_controller_test.go (0 hunks)
  • internal/api/v2/mocks_ledger_controller_test.go (0 hunks)
  • internal/api/v2/mocks_system_controller_test.go (0 hunks)
  • internal/controller/ledger/controller_generated_test.go (0 hunks)
  • internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go (0 hunks)
  • internal/controller/ledger/listener_generated_test.go (0 hunks)
  • internal/controller/ledger/numscript_parser_generated_test.go (0 hunks)
  • internal/controller/ledger/numscript_runtime_generated_test.go (0 hunks)
  • internal/controller/ledger/store_generated_test.go (0 hunks)
  • internal/leadership/broadcaster.go (1 hunks)
  • internal/leadership/context.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/manager_test.go (1 hunks)
  • internal/storage/driver/buckets_generated_test.go (0 hunks)
  • internal/storage/driver/ledger_generated_test.go (0 hunks)
  • internal/storage/driver/system_generated_test.go (0 hunks)
💤 Files with no reviewable changes (16)
  • internal/controller/ledger/numscript_runtime_generated_test.go
  • internal/api/v2/mocks_system_controller_test.go
  • internal/api/v1/mocks_ledger_controller_test.go
  • internal/controller/ledger/numscript_parser_generated_test.go
  • internal/storage/driver/ledger_generated_test.go
  • internal/api/bulking/mocks_ledger_controller_test.go
  • internal/api/v2/mocks_ledger_controller_test.go
  • internal/api/common/mocks_system_controller_test.go
  • internal/api/v1/mocks_system_controller_test.go
  • internal/controller/ledger/listener_generated_test.go
  • internal/controller/ledger/controller_with_too_many_client_handling_generated_test.go
  • internal/storage/driver/system_generated_test.go
  • internal/api/common/mocks_ledger_controller_test.go
  • internal/storage/driver/buckets_generated_test.go
  • internal/controller/ledger/controller_generated_test.go
  • internal/controller/ledger/store_generated_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/leadership/manager_test.go
  • internal/leadership/locker.go
  • internal/leadership/broadcaster.go
🔇 Additional comments (3)
internal/leadership/context.go (3)

8-10: LGTM! Good use of struct{} for context key.

The implementation follows best practices by using a private struct type for the context key, which prevents key collisions.


16-38: Add type assertion safety checks.

The type assertions could panic if the context value is of an unexpected type.

Add documentation and clarify setIsLeader usage.

Both functions lack documentation. Additionally, it's unclear where and how setIsLeader should be used, as it's unexported but crucial for functionality.

Apply this diff to improve documentation:

+// IsLeader returns true if the context indicates current instance is the leader.
+// Returns false if the context doesn't contain leadership information.
 func IsLeader(ctx context.Context) bool {
   // ... existing implementation ...
 }

+// setIsLeader updates leadership status in the context.
+// This function is intended to be called only by the leadership manager during leader election.
+// It's unexported to ensure leadership state can only be modified through proper channels.
 func setIsLeader(ctx context.Context, isLeader bool) {
   // ... existing implementation ...
 }

1-43: Verify concurrent access patterns.

The implementation is thread-safe for individual context instances. However, ensure that the leadership manager properly handles concurrent leadership changes across different contexts.

Run this script to analyze concurrent access patterns:

✅ Verification successful

Implementation is thread-safe and properly handles concurrent access

The leadership management system is well-designed for concurrent operations:

  • Single goroutine handles leadership state changes
  • Thread-safe context access via mutex
  • Clean coordination using channels for shutdown
  • Safe broadcasting of leadership changes
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for potential concurrent access issues in leadership management

# Look for other files that might modify leadership state
echo "Files that might modify leadership state:"
rg -l "setIsLeader|IsLeader|ContextWithLeadership" --type go

# Look for goroutine usage in leadership package
echo -e "\nGoroutine usage in leadership package:"
rg "go\s+func|go\s+\w+" --type go

Length of output: 1972


Script:

#!/bin/bash
# Examine leadership manager implementation
echo "Leadership manager implementation:"
cat internal/leadership/manager.go

echo -e "\nLeadership module implementation:"
cat internal/leadership/module.go

Length of output: 2694

func (m *Manager) Run(ctx context.Context) {
var (
db DBHandle
nextRetry = time.After(time.Duration(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It spams ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry just at init

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just the first time

Copy link
Contributor

@Dav-14 Dav-14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, looks cool to learn

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
internal/leadership/mutex.go (3)

5-8: Add documentation and consider encapsulation.

The Mutex[T] type would benefit from:

  1. GoDoc comments explaining its purpose and usage
  2. Consider making the t field unexported to prevent external modifications
  3. Consider not embedding sync.Mutex to avoid exposing internal locking methods
+// Mutex provides a generic mutex that protects a value of type T.
+// It ensures thread-safe access to the protected value.
 type Mutex[T any] struct {
-	*sync.Mutex
-	t T
+	mu sync.Mutex
+	value T
 }

15-20: Add documentation and nil checks.

The constructor would benefit from:

  1. GoDoc comments explaining the function's purpose
  2. Validation for nil values when T is a pointer type
+// NewMutex creates a new Mutex instance that protects the given value.
+// The value must not be modified externally after being passed to NewMutex.
 func NewMutex[T any](t T) *Mutex[T] {
+	// Add nil check if T is a pointer type
+	if any(t) == nil {
+		panic("value cannot be nil")
+	}
 	return &Mutex[T]{
-		Mutex: &sync.Mutex{},
-		t:     t,
+		mu:    sync.Mutex{},
+		value: t,
 	}
 }

1-20: Consider distributed system requirements.

Since this mutex is part of a leadership management system, consider:

  1. Integration with distributed locking (e.g., etcd, consul) for cluster-wide leadership
  2. Persistence of leadership state across restarts
  3. Handling of network partitions and split-brain scenarios
  4. Leadership lease/TTL mechanisms

Would you like me to provide examples of integrating with distributed locking systems?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b195cc0 and f39ef3f.

📒 Files selected for processing (5)
  • internal/leadership/broadcaster.go (1 hunks)
  • internal/leadership/context.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/manager_test.go (1 hunks)
  • internal/leadership/mutex.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • internal/leadership/locker.go
  • internal/leadership/manager_test.go
  • internal/leadership/context.go
  • internal/leadership/broadcaster.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Tests
🔇 Additional comments (1)
internal/leadership/mutex.go (1)

1-3: LGTM! Clean package structure.

Package name and import align well with the functionality.

Comment on lines 10 to 13
func (m *Mutex[T]) Lock() (T, func()) {
m.Mutex.Lock()
return m.t, m.Unlock
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance safety with context support and unlock protection.

The current implementation has several potential issues:

  1. No context support for cancellation
  2. No protection against multiple unlock calls
  3. Risk of deadlock if unlock function is not called

Consider this safer implementation:

+// Lock acquires the mutex and returns the protected value along with an unlock function.
+// The caller must ensure the unlock function is called exactly once.
-func (m *Mutex[T]) Lock() (T, func()) {
+func (m *Mutex[T]) Lock(ctx context.Context) (T, func(), error) {
+	// Support cancellation
+	done := make(chan struct{})
+	go func() {
+		m.mu.Lock()
+		close(done)
+	}()
+
+	select {
+	case <-done:
+		// Create a one-time unlock function
+		var once sync.Once
+		unlock := func() {
+			once.Do(m.mu.Unlock)
+		}
+		return m.value, unlock, nil
+	case <-ctx.Done():
+		return *new(T), func() {}, ctx.Err()
+	}
 }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
internal/leadership/mutex.go (1)

13-18: 🛠️ Refactor suggestion

Enhance safety with context support and error handling.

The current implementation could be improved:

  1. No context support for cancellation (as mentioned in previous review)
  2. No error handling for the provided function
  3. Missing documentation for the method and its parameter
+// Exec executes the provided function with synchronized access to the database.
+// The function is guaranteed to run with exclusive access to the database.
+// Context is used for cancellation support.
+func (m *Mutex) Exec(ctx context.Context, fn func(db bun.IDB) error) error {
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+		m.Mutex.Lock()
+		defer m.Mutex.Unlock()
+
+		return fn(m.db)
+	}
 }
🧹 Nitpick comments (1)
internal/leadership/mutex.go (1)

20-25: Add documentation and nil check for the constructor.

The constructor should validate its input and have proper documentation.

+// NewMutex creates a new Mutex instance with the provided database handle.
+// It panics if db is nil.
 func NewMutex(db DBHandle) *Mutex {
+	if db == nil {
+		panic("db cannot be nil")
+	}
 	return &Mutex{
 		Mutex: &sync.Mutex{},
 		db:    db,
 	}
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f39ef3f and 935699b.

📒 Files selected for processing (4)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/manager.go (1 hunks)
  • internal/leadership/manager_test.go (1 hunks)
  • internal/leadership/mutex.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/leadership/leadership.go
  • internal/leadership/manager_test.go
  • internal/leadership/manager.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Tests
🔇 Additional comments (2)
internal/leadership/mutex.go (2)

1-7: LGTM!

Package name and imports are appropriate for the module's purpose.


8-11: Add documentation for the Mutex type and verify DBHandle interface.

The type needs documentation explaining its purpose and thread-safety guarantees. Also, let's verify the DBHandle interface definition.

+// Mutex provides synchronized access to database operations.
+// It ensures that only one goroutine can execute database operations at a time.
 type Mutex struct {
 	*sync.Mutex
 	db DBHandle
 }

Let's verify the DBHandle interface:

✅ Verification successful

Add documentation for the Mutex type

The DBHandle interface is well-defined, embedding bun.IDB and adding a Close() method. Let's focus on documenting the Mutex type:

+// Mutex provides synchronized access to database operations through a DBHandle.
+// It embeds sync.Mutex to ensure thread-safe access to the underlying database connection.
 type Mutex struct {
 	*sync.Mutex
 	db DBHandle
 }
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for DBHandle interface definition
ast-grep --pattern 'type DBHandle interface {
  $$$
}'

Length of output: 240


Script:

#!/bin/bash
# Search for bun imports
rg "^import.*bun" -A 5

# Search for any usage of bun.IDB
ast-grep --pattern 'bun.IDB'

Length of output: 42564

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (5)
internal/leadership/database_handle.go (2)

20-25: Add parameter validation and documentation to the constructor.

The constructor lacks input validation and documentation. Also, if the mutex becomes a regular field (as suggested earlier), the constructor needs to be updated.

+// NewDatabaseHandle creates a new DatabaseHandle with the provided database connection.
+// It returns an error if the database handle is nil.
-func NewDatabaseHandle(db DBHandle) *DatabaseHandle {
+func NewDatabaseHandle(db DBHandle) (*DatabaseHandle, error) {
+    if db == nil {
+        return nil, errors.New("database handle cannot be nil")
+    }
     return &DatabaseHandle{
-        Mutex: &sync.Mutex{},
+        mu: &sync.Mutex{},
         db:    db,
-    }
+    }, nil
 }

1-25: Consider distributed systems requirements.

Since this package is part of a distributed leadership implementation, consider adding:

  1. Timeouts for database operations
  2. Connection health checks
  3. Deadlock prevention mechanisms
  4. Metrics for monitoring lock acquisition and hold times
  5. Recovery mechanisms for connection failures

These additions would make the implementation more robust for distributed systems use cases. Would you like me to provide specific suggestions for any of these improvements?

internal/leadership/broadcaster.go (2)

7-9: Consider using buffered channels for better performance.

The listener struct uses an unbuffered channel which could cause blocking during broadcasts. While the Subscribe method creates buffered channels, the struct definition should enforce this requirement.

 type listener[T any] struct {
-	channel chan T
+	channel chan T // Must be buffered with size 1
 }

71-78: Enhance cleanup in Close method.

Consider clearing internal state and preventing further broadcasts after closing.

 func (h *Broadcaster[T]) Close() {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	for _, inner := range h.inner {
 		close(inner.channel)
 	}
+	h.inner = nil
+	h.t = nil
 }
internal/leadership/broadcaster_test.go (1)

28-29: Use configurable timeouts in tests.

Hardcoded timeouts can make tests flaky on slower systems or under heavy load. Consider making the timeout configurable or using testing.Short() to adjust timeouts in different environments.

+	const testTimeout = 5 * time.Second
+	if testing.Short() {
+		testTimeout = time.Second
+	}
 	select {
 	case <-subscription:
-	case <-time.After(time.Second):
+	case <-time.After(testTimeout):
 		t.Fatal("timeout waiting for broadcast")
 	}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 935699b and 294efc4.

📒 Files selected for processing (6)
  • internal/leadership/broadcaster.go (1 hunks)
  • internal/leadership/broadcaster_test.go (1 hunks)
  • internal/leadership/database_handle.go (1 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/manager.go (1 hunks)
  • internal/leadership/manager_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/leadership/leadership.go
  • internal/leadership/manager_test.go
  • internal/leadership/manager.go
🔇 Additional comments (3)
internal/leadership/broadcaster.go (3)

11-17: Remove unused outer channel.

The outer channel is declared but never used in the implementation. This was previously flagged in past reviews but hasn't been addressed.

 type Broadcaster[T any] struct {
 	mu sync.Mutex
 	t  *T
 
 	inner []listener[T]
-	outer chan T
 }

19-28: LGTM! Thread-safe implementation of current value retrieval.

The method correctly handles nil cases and ensures thread safety.


80-84: Fix constructor initialization.

The constructor still initializes the unused outer channel. This was previously flagged but hasn't been addressed.

 func NewBroadcaster[T any]() *Broadcaster[T] {
-	return &Broadcaster[T]{
-		outer: make(chan T),
-	}
+	return &Broadcaster[T]{}
 }

Comment on lines +8 to +11
type DatabaseHandle struct {
*sync.Mutex
db DBHandle
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Define or import the DBHandle type.

The DBHandle type is used in the struct but is not defined or imported. This could lead to compilation errors.

Please either:

  1. Define the DBHandle type interface in this package, or
  2. Import it from the appropriate package

🛠️ Refactor suggestion

Avoid embedding sync.Mutex to prevent exposing Lock/Unlock methods.

Embedding *sync.Mutex exposes the mutex methods directly, which breaks encapsulation and could lead to incorrect usage. Consider making it a regular field instead.

 type DatabaseHandle struct {
-    *sync.Mutex
+    mu *sync.Mutex
     db DBHandle
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type DatabaseHandle struct {
*sync.Mutex
db DBHandle
}
type DatabaseHandle struct {
mu *sync.Mutex
db DBHandle
}

Comment on lines +13 to +18
func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
m.Mutex.Lock()
defer m.Mutex.Unlock()

fn(m.db)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add context and error handling support to the Exec method.

The current implementation has several limitations:

  1. No context support for cancellation
  2. Panics in the callback function are not handled
  3. No way to return errors from the callback

Consider this improved implementation:

-func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
+func (m *DatabaseHandle) Exec(ctx context.Context, fn func(db bun.IDB) error) error {
     m.Mutex.Lock()
     defer m.Mutex.Unlock()
 
-    fn(m.db)
+    if err := fn(m.db); err != nil {
+        return fmt.Errorf("database operation failed: %w", err)
+    }
+    return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
m.Mutex.Lock()
defer m.Mutex.Unlock()
fn(m.db)
}
func (m *DatabaseHandle) Exec(ctx context.Context, fn func(db bun.IDB) error) error {
m.Mutex.Lock()
defer m.Mutex.Unlock()
if err := fn(m.db); err != nil {
return fmt.Errorf("database operation failed: %w", err)
}
return nil
}

Comment on lines +60 to +69
func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()

h.t = &t

for _, inner := range h.inner {
inner.channel <- t
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prevent blocking on slow consumers.

The current implementation could block if any subscriber is slow to consume messages. Consider using non-blocking sends to prevent this.

 func (h *Broadcaster[T]) Broadcast(t T) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	h.t = &t
 
 	for _, inner := range h.inner {
-		inner.channel <- t
+		select {
+		case inner.channel <- t:
+		default:
+			// Skip slow consumers
+		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()
h.t = &t
for _, inner := range h.inner {
inner.channel <- t
}
}
func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()
h.t = &t
for _, inner := range h.inner {
select {
case inner.channel <- t:
default:
// Skip slow consumers
}
}
}

Comment on lines +30 to +58
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
}

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

for index, listener := range h.inner {
if listener == l {
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
break
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize subscription cleanup.

The unsubscribe function has several areas for improvement:

  1. Linear search could be slow with many subscribers
  2. No protection against double unsubscribe
  3. Slice modification could be simplified

Consider this improved implementation:

 func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	newChannel := make(chan T, 1)
 	l := listener[T]{
 		channel: newChannel,
 	}
 	h.inner = append(h.inner, l)
 	if h.t != nil {
 		newChannel <- *h.t
 	}
 
+	var once sync.Once
 	return newChannel, func() {
+		once.Do(func() {
 			h.mu.Lock()
 			defer h.mu.Unlock()
 
 			for index, listener := range h.inner {
 				if listener == l {
-					if index < len(h.inner)-1 {
-						h.inner = append(h.inner[:index], h.inner[index+1:]...)
-					} else {
-						h.inner = h.inner[:index]
-					}
+					// Use copy for better performance
+					copy(h.inner[index:], h.inner[index+1:])
+					h.inner = h.inner[:len(h.inner)-1]
 					break
 				}
 			}
+		})
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
for index, listener := range h.inner {
if listener == l {
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
break
}
}
}
}
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
}
var once sync.Once
return newChannel, func() {
once.Do(func() {
h.mu.Lock()
defer h.mu.Unlock()
for index, listener := range h.inner {
if listener == l {
// Use copy for better performance
copy(h.inner[index:], h.inner[index+1:])
h.inner = h.inner[:len(h.inner)-1]
break
}
}
})
}
}

Comment on lines +8 to +73
func TestBroadcaster(t *testing.T) {
t.Parallel()

broadcaster := NewBroadcaster[struct{}]()
t.Cleanup(broadcaster.Close)

const nbSubscriptions = 5

subscriptions := make([]<-chan struct{}, nbSubscriptions)
releases := make([]func(), nbSubscriptions)

for i := 0; i < nbSubscriptions; i++ {
subscriptions[i], releases[i] = broadcaster.Subscribe()
}

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[2]()
subscriptions = append(subscriptions[:2], subscriptions[3:]...)
releases = append(releases[:2], releases[3:]...)

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[0]()
subscriptions = subscriptions[1:]
releases = releases[1:]

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}

releases[2]()
subscriptions = subscriptions[:2]

go broadcaster.Broadcast(struct{}{})

for _, subscription := range subscriptions {
select {
case <-subscription:
case <-time.After(time.Second):
t.Fatal("timeout waiting for broadcast")
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage.

While the current test covers basic functionality, consider adding tests for:

  1. Concurrent access to ensure thread safety
  2. Slow consumers to verify non-blocking behavior
  3. Close method to ensure proper cleanup
  4. Actual method to verify current value retrieval

Here's an example of additional test cases:

func TestBroadcasterConcurrent(t *testing.T) {
	t.Parallel()
	
	broadcaster := NewBroadcaster[int]()
	t.Cleanup(broadcaster.Close)
	
	const nbGoroutines = 10
	done := make(chan struct{})
	
	// Start multiple goroutines that subscribe and unsubscribe
	for i := 0; i < nbGoroutines; i++ {
		go func() {
			defer func() { done <- struct{}{} }()
			ch, cancel := broadcaster.Subscribe()
			defer cancel()
			
			// Receive some messages
			for j := 0; j < 5; j++ {
				select {
				case <-ch:
				case <-time.After(time.Second):
					t.Error("timeout waiting for broadcast")
				}
			}
		}()
	}
	
	// Broadcast messages while goroutines are subscribing/unsubscribing
	for i := 0; i < 10; i++ {
		broadcaster.Broadcast(i)
	}
	
	// Wait for all goroutines to finish
	for i := 0; i < nbGoroutines; i++ {
		<-done
	}
}

func TestBroadcasterSlowConsumer(t *testing.T) {
	t.Parallel()
	
	broadcaster := NewBroadcaster[int]()
	t.Cleanup(broadcaster.Close)
	
	// Create a slow consumer
	ch, cancel := broadcaster.Subscribe()
	defer cancel()
	
	// Broadcast should not block even if consumer is slow
	for i := 0; i < 100; i++ {
		broadcaster.Broadcast(i)
	}
	
	// Verify we can still receive the latest value
	select {
	case v := <-ch:
		if v != 99 {
			t.Errorf("expected 99, got %d", v)
		}
	case <-time.After(time.Second):
		t.Error("timeout waiting for broadcast")
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants