Skip to content

Commit

Permalink
Refactor to make publish by device
Browse files Browse the repository at this point in the history
  • Loading branch information
elct9620 authored and tjjh89017 committed Sep 11, 2024
1 parent bdaaec1 commit 1c72a57
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 36 deletions.
2 changes: 1 addition & 1 deletion internal/ctrl/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (ctrl *BootstrapController) Execute(ctx context.Context) {

deviceEntity := entity.NewDevice(
entity.DeviceId(device.Name),
device.ListenPort,
device.PrivateKey[:],
)

Expand All @@ -43,7 +44,6 @@ func (ctrl *BootstrapController) Execute(ctx context.Context) {
peer := entity.NewPeer(
entity.NewPeerId(device.PublicKey[:], p.PublicKey[:]),
device.Name,
device.ListenPort,
p.PublicKey,
)

Expand Down
53 changes: 29 additions & 24 deletions internal/ctrl/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"log"

"github.com/tjjh89017/stunmesh-go/internal/entity"
"github.com/tjjh89017/stunmesh-go/plugin"
)

Expand All @@ -26,36 +25,42 @@ func NewPublishController(devices DeviceRepository, peers PeerRepository, store
}
}

func (c *PublishController) Execute(ctx context.Context, peerId entity.PeerId) {
peer, err := c.peers.Find(ctx, peerId)
func (c *PublishController) Execute(ctx context.Context) {
devices, err := c.devices.List(ctx)
if err != nil {
log.Print(err)
return
}

device, err := c.devices.Find(ctx, entity.DeviceId(peer.DeviceName()))
if err != nil {
log.Print(err)
return
}
for _, device := range devices {
host, port, err := c.resolver.Resolve(ctx, uint16(device.ListenPort()))
if err != nil {
log.Panic(err)
}

host, port, err := c.resolver.Resolve(ctx, uint16(peer.ListenPort()))
if err != nil {
log.Panic(err)
}
peers, err := c.peers.ListByDevice(ctx, device.Name())
if err != nil {
log.Print(err)
continue
}

res, err := c.encryptor.Encrypt(ctx, &EndpointEncryptRequest{
PeerPublicKey: peer.PublicKey(),
PrivateKey: device.PrivateKey(),
Host: host,
Port: port,
})
if err != nil {
log.Panic(err)
}
for _, peer := range peers {
res, err := c.encryptor.Encrypt(ctx, &EndpointEncryptRequest{
PeerPublicKey: peer.PublicKey(),
PrivateKey: device.PrivateKey(),
Host: host,
Port: port,
})
if err != nil {
log.Print(err)
continue
}

err = c.store.Set(ctx, peer.LocalId(), res.Data)
if err != nil {
log.Panic(err)
err = c.store.Set(ctx, peer.LocalId(), res.Data)
if err != nil {
log.Print(err)
continue
}
}
}
}
2 changes: 2 additions & 0 deletions internal/ctrl/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
)

type DeviceRepository interface {
List(ctx context.Context) ([]*entity.Device, error)
Find(ctx context.Context, name entity.DeviceId) (*entity.Device, error)
Save(ctx context.Context, device *entity.Device)
}

type PeerRepository interface {
List(ctx context.Context) ([]*entity.Peer, error)
ListByDevice(ctx context.Context, deviceName entity.DeviceId) ([]*entity.Peer, error)
Find(ctx context.Context, id entity.PeerId) (*entity.Peer, error)
Save(ctx context.Context, peer *entity.Peer)
}
2 changes: 1 addition & 1 deletion internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func (d *Daemon) Run(ctx context.Context) {
case peerId := <-d.queue.Dequeue():
log.Printf("Processing peer %s", peerId)

go d.publishCtrl.Execute(daemonCtx, peerId)
go d.establishCtrl.Execute(daemonCtx, peerId)
case <-ticker.C:
log.Println("Refreshing peers")

go d.publishCtrl.Execute(daemonCtx)
go d.refreshCtrl.Execute(daemonCtx)
}
}
Expand Down
8 changes: 7 additions & 1 deletion internal/entity/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ type DeviceId string

type Device struct {
name DeviceId
listenPort int
privateKey []byte
}

func NewDevice(name DeviceId, privateKey []byte) *Device {
func NewDevice(name DeviceId, listenPort int, privateKey []byte) *Device {
return &Device{
name: name,
listenPort: listenPort,
privateKey: privateKey,
}
}
Expand All @@ -29,3 +31,7 @@ func (d *Device) PrivateKey() [32]byte {
copy(key[:], d.privateKey)
return key
}

func (d *Device) ListenPort() int {
return d.listenPort
}
8 changes: 1 addition & 7 deletions internal/entity/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ type Peer struct {
id PeerId
deviceName string
publicKey [32]byte
listenPort int
}

func NewPeer(id PeerId, deviceName string, listenPort int, publicKey [32]byte) *Peer {
func NewPeer(id PeerId, deviceName string, publicKey [32]byte) *Peer {
return &Peer{
id: id,
deviceName: deviceName,
listenPort: listenPort,
publicKey: publicKey,
}
}
Expand All @@ -41,7 +39,3 @@ func (p *Peer) DeviceName() string {
func (p *Peer) PublicKey() [32]byte {
return p.publicKey
}

func (p *Peer) ListenPort() int {
return p.listenPort
}
12 changes: 12 additions & 0 deletions internal/repo/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ func NewDevices() *Devices {
}
}

func (r *Devices) List(ctx context.Context) ([]*entity.Device, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()

devices := make([]*entity.Device, 0, len(r.items))
for _, device := range r.items {
devices = append(devices, device)
}

return devices, nil
}

func (r *Devices) Find(ctx context.Context, name entity.DeviceId) (*entity.Device, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
Expand Down
51 changes: 50 additions & 1 deletion internal/repo/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func Test_DeviceFind(t *testing.T) {
deviceName := entity.DeviceId("wg0")
device := entity.NewDevice(deviceName, []byte{})
device := entity.NewDevice(deviceName, 6379, []byte{})

devices := repo.NewDevices()
devices.Save(context.TODO(), device)
Expand Down Expand Up @@ -44,3 +44,52 @@ func Test_DeviceFind(t *testing.T) {
})
}
}

func Test_DeviceList(t *testing.T) {
tests := []struct {
name string
devices []*entity.Device
}{
{
name: "no devices",
devices: []*entity.Device{},
},
{
name: "single device",
devices: []*entity.Device{
entity.NewDevice(entity.DeviceId("wg0"), 6379, []byte{}),
},
},
{
name: "multiple devices",
devices: []*entity.Device{
entity.NewDevice(entity.DeviceId("wg0"), 6379, []byte{}),
entity.NewDevice(entity.DeviceId("wg1"), 6380, []byte{}),
entity.NewDevice(entity.DeviceId("wg2"), 6381, []byte{}),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

devices := repo.NewDevices()
for _, device := range tt.devices {
devices.Save(context.TODO(), device)
}

entities, err := devices.List(context.TODO())
if err != nil {
t.Errorf("DeviceRepository.List() error = %v", err)
return
}

expectedSize := len(tt.devices)
actualSize := len(entities)
if actualSize != expectedSize {
t.Errorf("DeviceRepository.List() size = %v, want %v", actualSize, expectedSize)
}
})
}
}
14 changes: 14 additions & 0 deletions internal/repo/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ func (r *Peers) List(ctx context.Context) ([]*entity.Peer, error) {
return peers, nil
}

func (r *Peers) ListByDevice(ctx context.Context, deviceName entity.DeviceId) ([]*entity.Peer, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()

peers := make([]*entity.Peer, 0)
for _, peer := range r.entities {
if peer.DeviceName() == string(deviceName) {
peers = append(peers, peer)
}
}

return peers, nil
}

func (r *Peers) Find(ctx context.Context, id entity.PeerId) (*entity.Peer, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
Expand Down
Loading

0 comments on commit 1c72a57

Please sign in to comment.