diff --git a/controllers/network.go b/controllers/network.go index 0898824a0..ec667077e 100644 --- a/controllers/network.go +++ b/controllers/network.go @@ -601,7 +601,6 @@ func createNetwork(w http.ResponseWriter, r *http.Request) { if !featureFlags.EnableDeviceApproval { network.AutoJoin = "true" } - if len(network.NetID) > 32 { err := errors.New("network name shouldn't exceed 32 characters") logger.Log(0, r.Header.Get("user"), "failed to create network: ", @@ -656,7 +655,14 @@ func createNetwork(w http.ResponseWriter, r *http.Request) { } } } - + if network.AutoRemove == "true" { + if network.AutoRemoveThreshold == 0 { + network.AutoRemoveThreshold = 60 + } + } + if network.AutoRemoveTags == nil { + network.AutoRemoveTags = []string{} + } network, err = logic.CreateNetwork(network) if err != nil { logger.Log(0, r.Header.Get("user"), "failed to create network: ", diff --git a/controllers/network_test.go b/controllers/network_test.go index 258f4883f..1183dd46d 100644 --- a/controllers/network_test.go +++ b/controllers/network_test.go @@ -2,11 +2,12 @@ package controller import ( "context" - "github.com/gravitl/netmaker/db" - "github.com/gravitl/netmaker/schema" "os" "testing" + "github.com/gravitl/netmaker/db" + "github.com/gravitl/netmaker/schema" + "github.com/google/uuid" "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" @@ -36,7 +37,7 @@ func TestMain(m *testing.M) { PlatformRoleID: models.SuperAdminRole, }) peerUpdate := make(chan *models.Node) - go logic.ManageZombies(context.Background(), peerUpdate) + go logic.ManageZombies(context.Background()) go func() { for update := range peerUpdate { //do nothing diff --git a/controllers/server.go b/controllers/server.go index 17e452901..12a133cc6 100644 --- a/controllers/server.go +++ b/controllers/server.go @@ -301,6 +301,9 @@ func reInit(curr, new models.ServerSettings, force bool) { } } } + if new.CleanUpInterval != curr.CleanUpInterval { + logic.RestartHook("network-hook", time.Duration(new.CleanUpInterval)*time.Minute) + } go mq.PublishPeerUpdate(false) } diff --git a/functions/helpers_test.go b/functions/helpers_test.go index 7c5dd1aab..fdf472f6b 100644 --- a/functions/helpers_test.go +++ b/functions/helpers_test.go @@ -3,11 +3,12 @@ package functions import ( "context" "encoding/json" - "github.com/gravitl/netmaker/db" - "github.com/gravitl/netmaker/schema" "os" "testing" + "github.com/gravitl/netmaker/db" + "github.com/gravitl/netmaker/schema" + "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" @@ -36,7 +37,7 @@ func TestMain(m *testing.M) { PlatformRoleID: models.SuperAdminRole, }) peerUpdate := make(chan *models.Node) - go logic.ManageZombies(context.Background(), peerUpdate) + go logic.ManageZombies(context.Background()) go func() { for update := range peerUpdate { //do nothing @@ -44,7 +45,6 @@ func TestMain(m *testing.M) { } }() os.Exit(m.Run()) - } func TestNetworkExists(t *testing.T) { diff --git a/logic/host_test.go b/logic/host_test.go index 46401a459..80070bb94 100644 --- a/logic/host_test.go +++ b/logic/host_test.go @@ -3,12 +3,13 @@ package logic import ( "context" "fmt" - "github.com/gravitl/netmaker/db" - "github.com/gravitl/netmaker/schema" "net" "os" "testing" + "github.com/gravitl/netmaker/db" + "github.com/gravitl/netmaker/schema" + "github.com/google/uuid" "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/models" @@ -22,7 +23,7 @@ func TestMain(m *testing.M) { database.InitializeDatabase() defer database.CloseDB() peerUpdate := make(chan *models.Node) - go ManageZombies(context.Background(), peerUpdate) + go ManageZombies(context.Background()) go func() { for y := range peerUpdate { fmt.Printf("Pointless %v\n", y) diff --git a/logic/networks.go b/logic/networks.go index bd9c1c658..dcdab45fe 100644 --- a/logic/networks.go +++ b/logic/networks.go @@ -659,7 +659,9 @@ func UpdateNetwork(currentNetwork *models.Network, newNetwork *models.Network) e } else { currentNetwork.AutoJoin = "true" } - + currentNetwork.AutoRemove = newNetwork.AutoRemove + currentNetwork.AutoRemoveThreshold = newNetwork.AutoRemoveThreshold + currentNetwork.AutoRemoveTags = newNetwork.AutoRemoveTags currentNetwork.DefaultACL = newNetwork.DefaultACL currentNetwork.NameServers = newNetwork.NameServers data, err := json.Marshal(currentNetwork) @@ -778,6 +780,63 @@ func SortNetworks(unsortedNetworks []models.Network) { }) } +var NetworkHook models.HookFunc = func(params ...interface{}) error { + networks, err := GetNetworks() + if err != nil { + return err + } + allNodes, err := GetAllNodes() + if err != nil { + return err + } + for _, network := range networks { + if network.AutoRemove == "false" || network.AutoRemoveThreshold == 0 { + continue + } + nodes := GetNetworkNodesMemory(allNodes, network.NetID) + for _, node := range nodes { + if !node.Connected { + continue + } + exists := false + for _, tagI := range network.AutoRemoveTags { + if tagI == "*" { + exists = true + break + } + if _, ok := node.Tags[models.TagID(tagI)]; ok { + exists = true + break + } + } + if !exists { + continue + } + if time.Since(node.LastCheckIn) > time.Duration(network.AutoRemoveThreshold)*time.Minute { + if err := DeleteNode(&node, true); err != nil { + continue + } + node.PendingDelete = true + node.Action = models.NODE_DELETE + DeleteNodesCh <- &node + host, err := GetHost(node.HostID.String()) + if err == nil && len(host.Nodes) == 0 { + RemoveHostByID(host.ID.String()) + } + } + } + } + return nil +} + +func InitNetworkHooks() { + HookManagerCh <- models.HookDetails{ + ID: "network-hook", + Hook: NetworkHook, + Interval: time.Duration(GetServerSettings().CleanUpInterval) * time.Minute, + } +} + // == Private == var addressLock = &sync.Mutex{} diff --git a/logic/nodes.go b/logic/nodes.go index eab42d1dd..b7af9c75f 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -32,6 +32,7 @@ var ( nodeNetworkCacheMutex = &sync.RWMutex{} nodesCacheMap = make(map[string]models.Node) nodesNetworkCacheMap = make(map[string]map[string]models.Node) + DeleteNodesCh = make(chan *models.Node, 100) ) func getNodeFromCache(nodeID string) (node models.Node, ok bool) { @@ -654,7 +655,7 @@ func GetNodesStatusAPI(nodes []models.Node) map[string]models.ApiNodeStatus { } // DeleteExpiredNodes - goroutine which deletes nodes which are expired -func DeleteExpiredNodes(ctx context.Context, peerUpdate chan *models.Node) { +func DeleteExpiredNodes(ctx context.Context) { // Delete Expired Nodes Every Hour ticker := time.NewTicker(time.Hour) for { @@ -671,7 +672,7 @@ func DeleteExpiredNodes(ctx context.Context, peerUpdate chan *models.Node) { for _, node := range allnodes { node := node if time.Now().After(node.ExpirationDateTime) { - peerUpdate <- &node + DeleteNodesCh <- &node slog.Info("deleting expired node", "nodeid", node.ID.String()) } } diff --git a/logic/timer.go b/logic/timer.go index db36f5792..ed0a7a3a8 100644 --- a/logic/timer.go +++ b/logic/timer.go @@ -9,6 +9,7 @@ import ( "github.com/gravitl/netmaker/logger" "golang.org/x/exp/slog" + "github.com/google/uuid" "github.com/gravitl/netmaker/models" ) @@ -20,6 +21,22 @@ const timer_hours_between_runs = 24 // HookManagerCh - channel to add any new hooks var HookManagerCh = make(chan models.HookDetails, 3) +// HookCommandCh - channel to send commands to hooks (reset/stop) +var HookCommandCh = make(chan models.HookCommand, 10) + +// hookInfo - tracks running hooks +type hookInfo struct { + cancelFunc context.CancelFunc + resetCh chan struct{} + interval time.Duration + hook models.HookFunc + params []interface{} +} + +// runningHooks - map of hook ID to hook info +var runningHooks = make(map[string]*hookInfo) +var hooksMutex sync.RWMutex + // == Public == // TimerCheckpoint - Checks if 24 hours has passed since telemetry was last sent. If so, sends telemetry data to posthog @@ -48,36 +65,188 @@ func AddHook(ifaceToAdd interface{}) { timeHooks = append(timeHooks, ifaceToAdd) } -// StartHookManager - listens on `HookManagerCh` to run any hook +// ResetHook - resets the timer for a hook with the given ID +func ResetHook(hookID string) { + HookCommandCh <- models.HookCommand{ + ID: hookID, + Command: models.HookCommandReset, + } +} + +// StopHook - stops a hook with the given ID +func StopHook(hookID string) { + HookCommandCh <- models.HookCommand{ + ID: hookID, + Command: models.HookCommandStop, + } +} + +// RestartHook - restarts a hook with the given ID (stops and starts again with same configuration) +// If newInterval is 0, uses the existing interval. Otherwise, uses the new interval. +func RestartHook(hookID string, newInterval time.Duration) { + HookCommandCh <- models.HookCommand{ + ID: hookID, + Command: models.HookCommandRestart, + Interval: newInterval, + } +} + +// GetRunningHooks - returns a list of currently running hook IDs +func GetRunningHooks() []string { + hooksMutex.RLock() + defer hooksMutex.RUnlock() + + ids := make([]string, 0, len(runningHooks)) + for id := range runningHooks { + ids = append(ids, id) + } + return ids +} + +// StartHookManager - listens on `HookManagerCh` to run any hook and `HookCommandCh` for commands func StartHookManager(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-ctx.Done(): - slog.Error("## Stopping Hook Manager") + slog.Info("## Stopping Hook Manager") + // Cancel all running hooks + hooksMutex.Lock() + for _, info := range runningHooks { + info.cancelFunc() + } + runningHooks = make(map[string]*hookInfo) + hooksMutex.Unlock() return case newhook := <-HookManagerCh: + hookID := newhook.ID + if hookID == "" { + hookID = uuid.New().String() + } + + // Check if hook with this ID already exists + hooksMutex.Lock() + if existingHook, exists := runningHooks[hookID]; exists { + // Stop existing hook before replacing + existingHook.cancelFunc() + delete(runningHooks, hookID) + } + + // Create context for this hook + hookCtx, cancelFunc := context.WithCancel(ctx) + resetCh := make(chan struct{}, 1) + + info := &hookInfo{ + cancelFunc: cancelFunc, + resetCh: resetCh, + interval: newhook.Interval, + hook: newhook.Hook, + params: newhook.Params, + } + runningHooks[hookID] = info + hooksMutex.Unlock() + wg.Add(1) - go addHookWithInterval(ctx, wg, newhook.Hook, newhook.Interval) + go addHookWithInterval(hookCtx, wg, hookID, newhook.Hook, newhook.Params, newhook.Interval, resetCh) + case cmd := <-HookCommandCh: + hooksMutex.Lock() + info, exists := runningHooks[cmd.ID] + hooksMutex.Unlock() + + if !exists { + slog.Warn("hook not found", "hook_id", cmd.ID) + continue + } + + switch cmd.Command { + case models.HookCommandReset: + // Send reset signal + select { + case info.resetCh <- struct{}{}: + slog.Info("reset signal sent to hook", "hook_id", cmd.ID) + default: + // Channel is full, skip + } + case models.HookCommandStop: + // Stop the hook + info.cancelFunc() + hooksMutex.Lock() + delete(runningHooks, cmd.ID) + hooksMutex.Unlock() + slog.Info("hook stopped", "hook_id", cmd.ID) + case models.HookCommandRestart: + // Restart the hook: stop and start again with same configuration + hookID := cmd.ID + hook := info.hook + params := info.params + interval := info.interval + + // Use new interval if provided, otherwise keep existing + if cmd.Interval > 0 { + interval = cmd.Interval + } + + // Stop the existing hook + info.cancelFunc() + hooksMutex.Lock() + delete(runningHooks, hookID) + + // Create new context and restart + hookCtx, cancelFunc := context.WithCancel(ctx) + resetCh := make(chan struct{}, 1) + + newInfo := &hookInfo{ + cancelFunc: cancelFunc, + resetCh: resetCh, + interval: interval, + hook: hook, + params: params, + } + runningHooks[hookID] = newInfo + hooksMutex.Unlock() + + wg.Add(1) + go addHookWithInterval(hookCtx, wg, hookID, hook, params, interval, resetCh) + slog.Info("hook restarted", "hook_id", hookID, "interval", interval) + } } } } -func addHookWithInterval(ctx context.Context, wg *sync.WaitGroup, hook func() error, interval time.Duration) { +func addHookWithInterval(ctx context.Context, wg *sync.WaitGroup, hookID string, hook models.HookFunc, params []interface{}, interval time.Duration, resetCh chan struct{}) { defer wg.Done() + defer func() { + hooksMutex.Lock() + delete(runningHooks, hookID) + hooksMutex.Unlock() + }() + ticker := time.NewTicker(interval) defer ticker.Stop() + for { select { case <-ctx.Done(): return + case <-resetCh: + // Reset the timer by stopping the old ticker and creating a new one + ticker.Stop() + ticker = time.NewTicker(interval) + slog.Info("hook timer reset", "hook_id", hookID) case <-ticker.C: - if err := hook(); err != nil { - slog.Error(err.Error()) + if err := hook(params...); err != nil { + slog.Error("error running hook", "hook_id", hookID, "error", err.Error()) } } } +} +// WrapHook - wraps a parameterless hook function to be compatible with HookFunc +// This allows backward compatibility with existing hooks that don't accept parameters +func WrapHook(hook func() error) models.HookFunc { + return func(...interface{}) error { + return hook() + } } // == private == diff --git a/logic/zombie.go b/logic/zombie.go index 862e732a4..4678e7556 100644 --- a/logic/zombie.go +++ b/logic/zombie.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/servercfg" ) const ( @@ -75,10 +74,10 @@ func checkForZombieHosts(h *models.Host) { } // ManageZombies - goroutine which adds/removes/deletes nodes from the zombie node quarantine list -func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) { +func ManageZombies(ctx context.Context) { logger.Log(2, "Zombie management started") go InitializeZombies() - go checkPendingRemovalNodes(peerUpdate) + go checkPendingRemovalNodes() // Zombie Nodes Cleanup Four Times a Day ticker := time.NewTicker(time.Hour * ZOMBIE_TIMEOUT) @@ -86,7 +85,7 @@ func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) { select { case <-ctx.Done(): ticker.Stop() - close(peerUpdate) + close(DeleteNodesCh) return case id := <-newZombie: zombies = append(zombies, id) @@ -110,7 +109,7 @@ func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) { } node.PendingDelete = true node.Action = models.NODE_DELETE - peerUpdate <- &node + DeleteNodesCh <- &node logger.Log(1, "deleting zombie node", node.ID.String()) zombies = append(zombies[:i], zombies[i+1:]...) } @@ -136,57 +135,20 @@ func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) { } } } - if servercfg.IsAutoCleanUpEnabled() { - nodes, _ := GetAllNodes() - for _, node := range nodes { - if !node.Connected { - continue - } - if time.Since(node.LastCheckIn) > time.Hour*2 { - if err := DeleteNode(&node, true); err != nil { - continue - } - node.PendingDelete = true - node.Action = models.NODE_DELETE - peerUpdate <- &node - host, err := GetHost(node.HostID.String()) - if err == nil && len(host.Nodes) == 0 { - RemoveHostByID(host.ID.String()) - } - - } - } - } } } } -func checkPendingRemovalNodes(peerUpdate chan *models.Node) { +func checkPendingRemovalNodes() { nodes, _ := GetAllNodes() for _, node := range nodes { node := node pendingDelete := node.PendingDelete || node.Action == models.NODE_DELETE if pendingDelete { DeleteNode(&node, true) - peerUpdate <- &node + DeleteNodesCh <- &node continue } - if servercfg.IsAutoCleanUpEnabled() && node.Connected { - if time.Since(node.LastCheckIn) > time.Hour*2 { - if err := DeleteNode(&node, true); err != nil { - continue - } - node.PendingDelete = true - node.Action = models.NODE_DELETE - peerUpdate <- &node - host, err := GetHost(node.HostID.String()) - if err == nil && len(host.Nodes) == 0 { - RemoveHostByID(host.ID.String()) - } - - } - - } } } diff --git a/main.go b/main.go index 1deb1a8c0..11f68fcd8 100644 --- a/main.go +++ b/main.go @@ -135,7 +135,6 @@ func initialize() { // Client Mode Prereq Check if err != nil { logger.FatalLog("error setting defaults: ", err.Error()) } - if servercfg.IsDNSMode() { err := functions.SetDNSDir() if err != nil { @@ -179,6 +178,7 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) { wg.Add(1) go logic.StartHookManager(ctx, wg) + logic.InitNetworkHooks() } // Should we be using a context vice a waitgroup???????????? @@ -195,10 +195,9 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) { defer mq.CloseClient() go mq.Keepalive(ctx) go func() { - peerUpdate := make(chan *models.Node, 100) - go logic.ManageZombies(ctx, peerUpdate) - go logic.DeleteExpiredNodes(ctx, peerUpdate) - for nodeUpdate := range peerUpdate { + go logic.ManageZombies(ctx) + go logic.DeleteExpiredNodes(ctx) + for nodeUpdate := range logic.DeleteNodesCh { if nodeUpdate == nil { continue } diff --git a/migrate/migrate.go b/migrate/migrate.go index 0616b9753..f206fc8ed 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -84,6 +84,10 @@ func updateNetworks() { netI.AutoJoin = "true" logic.UpsertNetwork(netI) } + if netI.AutoRemove == "" { + netI.AutoRemove = "false" + logic.UpsertNetwork(netI) + } } } @@ -887,6 +891,9 @@ func migrateSettings() { if settings.PeerConnectionCheckInterval == "" { settings.PeerConnectionCheckInterval = "15" } + if settings.CleanUpInterval == 0 { + settings.CleanUpInterval = 60 + } if settings.AuditLogsRetentionPeriodInDays == 0 { settings.AuditLogsRetentionPeriodInDays = 7 } diff --git a/models/network.go b/models/network.go index a96d9587d..4d74467e0 100644 --- a/models/network.go +++ b/models/network.go @@ -26,6 +26,9 @@ type Network struct { DefaultACL string `json:"defaultacl" bson:"defaultacl" yaml:"defaultacl" validate:"checkyesorno"` NameServers []string `json:"dns_nameservers"` AutoJoin string `json:"auto_join"` + AutoRemove string `json:"auto_remove"` + AutoRemoveTags []string `json:"auto_remove_tags"` + AutoRemoveThreshold int `json:"auto_remove_threshold_mins"` } // SaveData - sensitive fields of a network that should be kept the same diff --git a/models/settings.go b/models/settings.go index cacc76a42..d24404956 100644 --- a/models/settings.go +++ b/models/settings.go @@ -50,6 +50,7 @@ type ServerSettings struct { AuditLogsRetentionPeriodInDays int `json:"audit_logs_retention_period"` OldAClsSupport bool `json:"old_acl_support"` PeerConnectionCheckInterval string `json:"peer_connection_check_interval"` + CleanUpInterval int `json:"clean_up_interval_in_mins"` } type UserSettings struct { diff --git a/models/structs.go b/models/structs.go index c1fcbbca4..2f761a987 100644 --- a/models/structs.go +++ b/models/structs.go @@ -343,12 +343,33 @@ type JoinData struct { Key string `json:"key" yaml:"key"` } +// HookFunc - function type for hooks that can accept optional parameters +type HookFunc func(...interface{}) error + // HookDetails - struct to hold hook info type HookDetails struct { - Hook func() error + ID string // Unique identifier for the hook (optional, auto-generated if empty) + Hook HookFunc // Hook function that accepts optional variadic parameters + Params []interface{} // Optional parameters to pass to the hook function Interval time.Duration } +// HookCommandType - type of command for hook management +type HookCommandType int + +const ( + HookCommandReset HookCommandType = iota + HookCommandStop + HookCommandRestart +) + +// HookCommand - command to control a hook +type HookCommand struct { + ID string // Hook ID to target + Command HookCommandType + Interval time.Duration // Optional: new interval for restart command (0 means use existing) +} + // LicenseLimits - struct license limits type LicenseLimits struct { Servers int `json:"servers"` diff --git a/pro/license.go b/pro/license.go index 8677dff16..4f3af837a 100644 --- a/pro/license.go +++ b/pro/license.go @@ -38,7 +38,8 @@ type apiServerConf struct { // AddLicenseHooks - adds the validation and cache clear hooks func AddLicenseHooks() { logic.HookManagerCh <- models.HookDetails{ - Hook: ValidateLicense, + ID: "license-validation-hook", + Hook: logic.WrapHook(ValidateLicense), Interval: time.Hour, } // logic.HookManagerCh <- models.HookDetails{ diff --git a/pro/logic/events.go b/pro/logic/events.go index 374cdebac..5e987ae6a 100644 --- a/pro/logic/events.go +++ b/pro/logic/events.go @@ -36,7 +36,8 @@ func EventRententionHook() error { func EventWatcher() { logic.HookManagerCh <- models.HookDetails{ - Hook: EventRententionHook, + ID: "events-retention-hook", + Hook: logic.WrapHook(EventRententionHook), Interval: time.Hour * 24, } for e := range EventActivityCh { diff --git a/pro/remote_access_client.go b/pro/remote_access_client.go index 41efdb6af..0bcad4610 100644 --- a/pro/remote_access_client.go +++ b/pro/remote_access_client.go @@ -19,7 +19,8 @@ const unauthorisedUserNodeCheckInterval = 3 * time.Minute func AddUnauthorisedUserNodeHooks() { slog.Debug("adding unauthorisedUserNode hook") logic.HookManagerCh <- models.HookDetails{ - Hook: unauthorisedUserNodeHook, + ID: "unauthorised-users-hook", + Hook: logic.WrapHook(unauthorisedUserNodeHook), Interval: unauthorisedUserNodeCheckInterval, } } diff --git a/pro/trial.go b/pro/trial.go index 4c1b77ab9..5b711b993 100644 --- a/pro/trial.go +++ b/pro/trial.go @@ -26,7 +26,7 @@ type TrialInfo struct { func addTrialLicenseHook() { logic.HookManagerCh <- models.HookDetails{ - Hook: TrialLicenseHook, + Hook: logic.WrapHook(TrialLicenseHook), Interval: time.Hour, } } diff --git a/scripts/netmaker.default.env b/scripts/netmaker.default.env index 8fbe56428..29664c222 100644 --- a/scripts/netmaker.default.env +++ b/scripts/netmaker.default.env @@ -100,7 +100,6 @@ STUN=true METRICS_PORT=51821 # Metrics Collection interval in minutes PUBLISH_METRIC_INTERVAL=15 -# auto delete offline nodes -AUTO_DELETE_OFFLINE_NODES=false + diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index cf1bbd5d2..72af99261 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -802,7 +802,3 @@ func GetAllowedEmailDomains() string { func GetNmBaseDomain() string { return os.Getenv("NM_DOMAIN") } - -func IsAutoCleanUpEnabled() bool { - return os.Getenv("AUTO_DELETE_OFFLINE_NODES") == "true" -}