Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/api/common"
"github.com/formancehq/ledger/internal/leadership"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewServeCommand() *cobra.Command {
}),
bus.NewFxModule(),
ballast.Module(serveConfiguration.ballastSize),
leadership.NewFXModule(),
api.Module(api.Config{
Version: Version,
Debug: service.IsDebug(cmd),
Expand All @@ -122,15 +124,15 @@ func NewServeCommand() *cobra.Command {
}),
fx.Decorate(func(
params struct {
fx.In
fx.In

Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger
Handler chi.Router
HealthController *health.HealthController
Logger logging.Logger

MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
MeterProvider *metric.MeterProvider `optional:"true"`
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
},
) chi.Router {
return assembleFinalRouter(
service.IsDebug(cmd),
Expand Down
3 changes: 0 additions & 3 deletions internal/api/bulking/mocks_ledger_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/api/common/mocks_ledger_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/api/common/mocks_system_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/api/v1/mocks_ledger_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/api/v1/mocks_system_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/api/v2/mocks_ledger_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/api/v2/mocks_system_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/controller/ledger/controller_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/controller/ledger/listener_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions internal/controller/ledger/numscript_parser_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions internal/controller/ledger/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//go:generate gomarkdoc -o README.md --repository.default-branch main
//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger
package ledger
86 changes: 86 additions & 0 deletions internal/leadership/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package leadership

import (
"sync"
)

type listener struct {
channel chan Leadership
}

type Broadcaster struct {
mu *sync.Mutex
t *Leadership

inner []listener
outer chan Leadership
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use mutex by value and remove unused channel.

The Broadcaster struct has two issues:

  1. Using a pointer to mutex is unnecessary and can lead to nil pointer dereference if not properly initialized.
  2. The outer channel is declared but never used in the implementation.

Apply this diff to fix these issues:

 type Broadcaster struct {
-	mu *sync.Mutex
+	mu sync.Mutex
 	t  *Leadership
 
 	inner []listener
-	outer chan Leadership
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type Broadcaster struct {
mu *sync.Mutex
t *Leadership
inner []listener
outer chan Leadership
}
type Broadcaster struct {
mu sync.Mutex
t *Leadership
inner []listener
}


func (h *Broadcaster) Actual() Leadership {
h.mu.Lock()
defer h.mu.Unlock()

if h.t == nil {
return Leadership{}
}
return *h.t
}

func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan Leadership, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.t
paul-nicolas marked this conversation as resolved.
Show resolved Hide resolved
}

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
} else {
h.inner = h.inner[:index]
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (h *Broadcaster) Broadcast(t Leadership) {
h.mu.Lock()
defer h.mu.Unlock()

h.t = &t

for _, inner := range h.inner {
inner.channel <- t
}
}

func (h *Broadcaster) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}
}

func (h *Broadcaster) CountListeners() int {
h.mu.Lock()
defer h.mu.Unlock()

return len(h.inner)
}

func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
mu: &sync.Mutex{},
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve constructor naming and initialization.

The NewSignal constructor has several issues:

  1. The name doesn't clearly convey its purpose.
  2. It initializes an unused channel.
  3. It creates a pointer to mutex unnecessarily.

Apply this diff to improve the constructor:

-func NewSignal() *Broadcaster {
+func NewBroadcaster() *Broadcaster {
 	return &Broadcaster{
-		outer: make(chan Leadership),
-		mu:    &sync.Mutex{},
+		mu:    sync.Mutex{},
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewSignal() *Broadcaster {
return &Broadcaster{
outer: make(chan Leadership),
mu: &sync.Mutex{},
}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
mu: sync.Mutex{},
}
}

Loading
Loading