From eed9464e3cbd30c4397850e704c628750a115710 Mon Sep 17 00:00:00 2001 From: Tim Goodwin Date: Tue, 15 Jul 2025 20:42:30 -0700 Subject: [PATCH 1/3] implement RPS signal --- pkg/compiler/compiler.go | 21 +++++--- pkg/ips/monitor.go | 83 +++++++++++++++++++++++++++++ pkg/lift/delegategen.go | 12 ++++- pkg/lift/templates/delegate.go.tmpl | 3 +- pkg/pragma/pragma.go | 56 ++++++++++++++----- pkg/pragma/resolver.go | 40 ++++++++++---- 6 files changed, 182 insertions(+), 33 deletions(-) create mode 100644 pkg/ips/monitor.go diff --git a/pkg/compiler/compiler.go b/pkg/compiler/compiler.go index e4b704d..55eaafb 100644 --- a/pkg/compiler/compiler.go +++ b/pkg/compiler/compiler.go @@ -445,7 +445,7 @@ func (c *Compiler) extractCode(outputDir string) ([]*extractionResult, error) { fmt.Printf(" Generated client for %s\n", typeSpec.Name.Name) // Generate the delegate for this service - delegateData, err := lift.GetDelegateTemplateData(typeSpec.Name, currentLoadedPkg) + delegateData, err := lift.GetDelegateTemplateData(typeSpec.Name, currentLoadedPkg, p) if err != nil { fmt.Printf(" [ERROR] Failed to gather delegate template data for %s: %v\n", typeSpec.Name.Name, err) continue @@ -645,17 +645,24 @@ func generateDelegateBlockStmts(res *extractionResult, namespace string, monitor }, } - // `decider := pragma.NewCPUDecider(monitor, 0.5)` + // `decider := pragma.New...Decider(...)` + var deciderArgs []ast.Expr + if res.Pragma.SignalType == pragma.IPSTrigger { + // For IPS, the "name" is the service name, which we'll use to record invocations. + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.STRING, Value: strconv.Quote(res.PackageName)}) + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}) + } else { + deciderArgs = append(deciderArgs, monitorIdent) + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}) + } + deciderDecl := &ast.AssignStmt{ Lhs: []ast.Expr{deciderIdent}, Tok: token.DEFINE, Rhs: []ast.Expr{ &ast.CallExpr{ - Fun: &ast.SelectorExpr{X: ast.NewIdent("pragma"), Sel: ast.NewIdent(fmt.Sprintf("New%sDecider", res.Pragma.SignalType))}, - Args: []ast.Expr{ - monitorIdent, - &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}, - }, + Fun: &ast.SelectorExpr{X: ast.NewIdent("pragma"), Sel: ast.NewIdent(fmt.Sprintf("New%sDecider", res.Pragma.SignalType))}, + Args: deciderArgs, }, }, } diff --git a/pkg/ips/monitor.go b/pkg/ips/monitor.go new file mode 100644 index 0000000..b3258de --- /dev/null +++ b/pkg/ips/monitor.go @@ -0,0 +1,83 @@ +package ips + +import ( + "sync" + "time" +) + +const ( + // window is the duration over which to calculate IPS. + window = 10 * time.Second +) + +// Monitor tracks invocations per second for different named entities (functions or services). +type Monitor struct { + mu sync.Mutex + invocations map[string][]time.Time +} + +// NewMonitor creates a new IPS monitor. +func NewMonitor() *Monitor { + return &Monitor{ + invocations: make(map[string][]time.Time), + } +} + +// RecordInvocation records a single invocation for the given name. +func (m *Monitor) RecordInvocation(name string) { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now() + m.invocations[name] = append(m.invocations[name], now) + m.cleanup(name, now) +} + +// GetIPS returns the invocations per second for the given name over the last `window` duration. +func (m *Monitor) GetIPS(name string) float64 { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now() + m.cleanup(name, now) + + invocationsInWindow := len(m.invocations[name]) + return float64(invocationsInWindow) / window.Seconds() +} + +// cleanup removes timestamps that are older than the window. +// This must be called with the mutex held. +func (m *Monitor) cleanup(name string, now time.Time) { + if invs, ok := m.invocations[name]; ok { + cutoff := now.Add(-window) + + // Find the first index that is not older than the window. + // This is faster than re-slicing in a loop for long slices. + firstValidIndex := 0 + for i, ts := range invs { + if !ts.Before(cutoff) { + firstValidIndex = i + break + } + // If the last element is still before the cutoff, all elements are old. + if i == len(invs)-1 { + m.invocations[name] = []time.Time{} + return + } + } + m.invocations[name] = invs[firstValidIndex:] + } +} + +// Global monitor instance +var globalMonitor = NewMonitor() + +// Record records an invocation for a globally accessible monitor. +func Record(name string) { + globalMonitor.RecordInvocation(name) +} + +// Get returns the IPS for a globally accessible monitor. +func Get(name string) float64 { + return globalMonitor.GetIPS(name) +} diff --git a/pkg/lift/delegategen.go b/pkg/lift/delegategen.go index 8b95352..6e8f517 100644 --- a/pkg/lift/delegategen.go +++ b/pkg/lift/delegategen.go @@ -10,6 +10,7 @@ import ( _ "embed" + "github.com/tgoodwin/monolift/pkg/pragma" "github.com/tgoodwin/monolift/pkg/util" "golang.org/x/tools/go/packages" ) @@ -27,6 +28,8 @@ type DelegateTemplateData struct { RemoteClientStructName string // e.g., "client" Methods []MethodConfig Imports map[string]string + IPSMonitoringEnabled bool + ServiceName string } // ExecuteDelegateTemplate generates the delegate client code for a service. @@ -58,7 +61,7 @@ func ExecuteDelegateTemplate(entrypointDir string, data DelegateTemplateData) er } // GetDelegateTemplateData gathers all necessary information to generate a delegate for a given interface. -func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Package) (*DelegateTemplateData, error) { +func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Package, p *pragma.Pragma) (*DelegateTemplateData, error) { imports := make(map[string]string) methodConfigs, err := GetMethodConfigsForInterface(ifaceNameIdent, definingPkg, imports) @@ -69,6 +72,11 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa // Add the pragma package to imports, as the delegate will use the Decider interface. imports["github.com/tgoodwin/monolift/pkg/pragma"] = "pragma" + ipsEnabled := p != nil && p.SignalType == pragma.IPSTrigger + if ipsEnabled { + imports["github.com/tgoodwin/monolift/pkg/ips"] = "ips" + } + fmt.Println("Generating delegate for interface:", definingPkg.Name) data := &DelegateTemplateData{ @@ -79,6 +87,8 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa InterfaceTypeName: ifaceNameIdent.Name, Methods: methodConfigs, Imports: imports, + IPSMonitoringEnabled: ipsEnabled, + ServiceName: definingPkg.Name, } return data, nil diff --git a/pkg/lift/templates/delegate.go.tmpl b/pkg/lift/templates/delegate.go.tmpl index f50e4dc..90d5523 100644 --- a/pkg/lift/templates/delegate.go.tmpl +++ b/pkg/lift/templates/delegate.go.tmpl @@ -22,9 +22,10 @@ func New{{.DelegateStructName}}(local, remote {{.InterfacePackageAlias}}.{{.Inte {{range .Methods}} // {{.Name}} delegates the call to either the local or remote implementation. func (d *{{$.DelegateStructName}}) {{.FullSignature}} { + {{if $.IPSMonitoringEnabled}}ips.Record("{{$.ServiceName}}"){{end}} if d.decider.ShouldDelegate() { return d.remote.{{.Name}}({{join .ParamNames ", "}}) } return d.local.{{.Name}}({{join .ParamNames ", "}}) } -{{end}} \ No newline at end of file +{{end}} diff --git a/pkg/pragma/pragma.go b/pkg/pragma/pragma.go index 46c4f16..4d7e2c9 100644 --- a/pkg/pragma/pragma.go +++ b/pkg/pragma/pragma.go @@ -4,34 +4,47 @@ import ( "errors" "fmt" "strconv" + "strings" ) +// SignalType defines the type of signal for a pragma. type SignalType string const ( + // CPUTrigger represents a CPU usage signal. CPUTrigger SignalType = "CPU" + // MemTrigger represents a memory usage signal. MemTrigger SignalType = "MEM" - // Add other signal types here in the future + // IPSTrigger represents an invocations per second signal. + IPSTrigger SignalType = "IPS" ) var signalTypes = map[SignalType]struct{}{ CPUTrigger: {}, MemTrigger: {}, + IPSTrigger: {}, } func getSignalType(s string) (SignalType, bool) { - _, ok := signalTypes[SignalType(s)] - return SignalType(s), ok + st := SignalType(strings.ToUpper(s)) + _, ok := signalTypes[st] + return st, ok } +// Pragma represents a parsed monolift directive. type Pragma struct { SignalType SignalType Threshold float64 + // Add other pragma fields as needed } +// ParsePragma parses a map of attributes into a Pragma struct. +// Example pragma: trigger=CPU threshold=0.8 +// Example pragma: trigger=IPS value=100 func ParsePragma(attrs map[string]string) (*Pragma, error) { // If there are no attributes, it's a simple extraction without a delegate. if len(attrs) == 0 { + // Return a pragma with no signal type, indicating simple extraction. return &Pragma{}, nil } @@ -44,17 +57,32 @@ func ParsePragma(attrs map[string]string) (*Pragma, error) { return nil, fmt.Errorf("invalid or unsupported signal type for 'trigger': %q", triggerVal) } - thresholdVal, ok := attrs["threshold"] - if !ok { - return nil, errors.New("pragma is missing required 'threshold' attribute for delegate") - } - threshold, err := strconv.ParseFloat(thresholdVal, 64) - if err != nil { - return nil, fmt.Errorf("invalid 'threshold' value: %q, must be a float: %w", thresholdVal, err) - } - // For CPU/MEM, the threshold is a percentage (e.g., 0.5 for 50%). - if threshold <= 0 || threshold > 1.0 { - return nil, fmt.Errorf("invalid 'threshold' value: %f, must be between 0.0 and 1.0", threshold) + var threshold float64 + var err error + + switch signalType { + case IPSTrigger: + valueVal, ok := attrs["threshold"] + if !ok { + return nil, errors.New("pragma with trigger=IPS is missing required 'threshold' attribute") + } + threshold, err = strconv.ParseFloat(valueVal, 64) + if err != nil { + return nil, fmt.Errorf("invalid 'value' for IPS trigger: %q, must be a float: %w", valueVal, err) + } + case CPUTrigger, MemTrigger: + thresholdVal, ok := attrs["threshold"] + if !ok { + return nil, fmt.Errorf("pragma with trigger=%s is missing required 'threshold' attribute", signalType) + } + threshold, err = strconv.ParseFloat(thresholdVal, 64) + if err != nil { + return nil, fmt.Errorf("invalid 'threshold' value: %q, must be a float: %w", thresholdVal, err) + } + // For CPU/MEM, the threshold is a percentage (e.g., 0.5 for 50%). + if threshold <= 0 || threshold > 1.0 { + return nil, fmt.Errorf("invalid 'threshold' value for %s: %f, must be between 0.0 and 1.0", signalType, threshold) + } } return &Pragma{ diff --git a/pkg/pragma/resolver.go b/pkg/pragma/resolver.go index bc064af..491aa00 100644 --- a/pkg/pragma/resolver.go +++ b/pkg/pragma/resolver.go @@ -4,33 +4,39 @@ import ( "fmt" "log" + "github.com/tgoodwin/monolift/pkg/ips" "github.com/tgoodwin/monolift/pkg/metrics" ) // NewDeciderFromPragma is a factory function that creates the appropriate // Decider implementation based on the pragma configuration. -func NewDeciderFromPragma(p *Pragma, monitor *metrics.Monitor) (Decider, error) { +func NewDeciderFromPragma(p *Pragma, monitor *metrics.Monitor, name string) (Decider, error) { if p == nil { return nil, fmt.Errorf("pragma cannot be nil") } - if monitor == nil { - return nil, fmt.Errorf("metrics monitor cannot be nil") - } switch p.SignalType { case CPUTrigger: + if monitor == nil { + return nil, fmt.Errorf("metrics monitor cannot be nil for CPU trigger") + } log.Printf("Creating CPUDecider with threshold %.2f%%", p.Threshold*100) return NewCPUDecider(monitor, p.Threshold), nil case MemTrigger: + if monitor == nil { + return nil, fmt.Errorf("metrics monitor cannot be nil for MEM trigger") + } log.Printf("Creating MemDecider with threshold %.2f%%", p.Threshold*100) return NewMemDecider(monitor, p.Threshold), nil + case IPSTrigger: + log.Printf("Creating IPSDecider for '%s' with threshold %.2f", name, p.Threshold) + return NewIPSDecider(name, p.Threshold), nil default: return nil, fmt.Errorf("unsupported signal type for decider: %s", p.SignalType) } } // CPUDecider decides whether to delegate based on CPU usage. -// It provides a simple, stateless check against a threshold. type CPUDecider struct { monitor *metrics.Monitor threshold float64 @@ -45,9 +51,7 @@ func NewCPUDecider(monitor *metrics.Monitor, threshold float64) *CPUDecider { } // ShouldDelegate returns true if the current CPU usage exceeds the threshold. -// This is a non-blocking read of the monitor's last known state. func (d *CPUDecider) ShouldDelegate() bool { - // The monitor provides CPU usage as a percentage (0.0 to 1.0). cpuUsage, ok := d.monitor.CPUUsagePercent() if !ok { return false @@ -56,7 +60,6 @@ func (d *CPUDecider) ShouldDelegate() bool { } // MemDecider decides whether to delegate based on memory usage. -// It provides a simple, stateless check against a threshold. type MemDecider struct { monitor *metrics.Monitor threshold float64 @@ -71,12 +74,29 @@ func NewMemDecider(monitor *metrics.Monitor, threshold float64) *MemDecider { } // ShouldDelegate returns true if the current memory usage exceeds the threshold. -// This is a non-blocking read of the monitor's last known state. func (d *MemDecider) ShouldDelegate() bool { - // The monitor provides memory usage as a percentage (0.0 to 1.0). memUsage, ok := d.monitor.MemoryUsagePercent() if !ok { return false } return memUsage > d.threshold } + +// IPSDecider decides whether to delegate based on invocations per second. +type IPSDecider struct { + name string + threshold float64 +} + +// NewIPSDecider creates a new IPS-based decider. +func NewIPSDecider(name string, threshold float64) *IPSDecider { + return &IPSDecider{ + name: name, + threshold: threshold, + } +} + +// ShouldDelegate returns true if the current IPS exceeds the threshold. +func (d *IPSDecider) ShouldDelegate() bool { + return ips.Get(d.name) > d.threshold +} From 83651bbdc5a73f1001c0bf6edcc8956d86a18d1f Mon Sep 17 00:00:00 2001 From: Tim Goodwin Date: Tue, 15 Jul 2025 20:54:09 -0700 Subject: [PATCH 2/3] move rate calculation off the critical path --- pkg/ips/monitor.go | 112 +++++++++++++++++++++++++++++-------------- pkg/pragma/pragma.go | 2 +- 2 files changed, 76 insertions(+), 38 deletions(-) diff --git a/pkg/ips/monitor.go b/pkg/ips/monitor.go index b3258de..da51051 100644 --- a/pkg/ips/monitor.go +++ b/pkg/ips/monitor.go @@ -7,69 +7,107 @@ import ( const ( // window is the duration over which to calculate IPS. - window = 10 * time.Second + window = 5 * time.Second + // updateInterval is how often the background worker calculates the IPS values. + updateInterval = 1 * time.Second ) -// Monitor tracks invocations per second for different named entities (functions or services). +// Monitor tracks invocations per second for different named entities. +// It uses a background goroutine to periodically calculate IPS values, +// making reads from GetIPS fast and non-blocking. type Monitor struct { - mu sync.Mutex - invocations map[string][]time.Time + // mu protects access to both invocations and ipsValues maps. + mu sync.RWMutex + // invocations stores counts per second (unix timestamp) for each named entity. + invocations map[string]map[int64]int + // ipsValues stores the latest calculated IPS value for each named entity. + ipsValues map[string]float64 + // ticker triggers the periodic calculation. + ticker *time.Ticker + // done is used to signal the background goroutine to stop. + done chan struct{} } -// NewMonitor creates a new IPS monitor. +// NewMonitor creates a new IPS monitor and starts its background calculation worker. func NewMonitor() *Monitor { - return &Monitor{ - invocations: make(map[string][]time.Time), + m := &Monitor{ + invocations: make(map[string]map[int64]int), + ipsValues: make(map[string]float64), + ticker: time.NewTicker(updateInterval), + done: make(chan struct{}), } + go m.runCalculator() + return m } // RecordInvocation records a single invocation for the given name. +// It increments a counter for the current second. func (m *Monitor) RecordInvocation(name string) { m.mu.Lock() defer m.mu.Unlock() - now := time.Now() - m.invocations[name] = append(m.invocations[name], now) - m.cleanup(name, now) + now := time.Now().Unix() + if _, ok := m.invocations[name]; !ok { + m.invocations[name] = make(map[int64]int) + } + m.invocations[name][now]++ } -// GetIPS returns the invocations per second for the given name over the last `window` duration. +// GetIPS returns the most recently calculated invocations per second for the given name. +// This is a fast read operation. func (m *Monitor) GetIPS(name string) float64 { - m.mu.Lock() - defer m.mu.Unlock() + m.mu.RLock() + defer m.mu.RUnlock() + // Return the pre-calculated value. + return m.ipsValues[name] +} - now := time.Now() - m.cleanup(name, now) +// Close stops the background worker. +func (m *Monitor) Close() { + close(m.done) +} - invocationsInWindow := len(m.invocations[name]) - return float64(invocationsInWindow) / window.Seconds() +// runCalculator is the background worker loop. +func (m *Monitor) runCalculator() { + for { + select { + case <-m.ticker.C: + m.calculateAll() + case <-m.done: + m.ticker.Stop() + return + } + } } -// cleanup removes timestamps that are older than the window. -// This must be called with the mutex held. -func (m *Monitor) cleanup(name string, now time.Time) { - if invs, ok := m.invocations[name]; ok { - cutoff := now.Add(-window) - - // Find the first index that is not older than the window. - // This is faster than re-slicing in a loop for long slices. - firstValidIndex := 0 - for i, ts := range invs { - if !ts.Before(cutoff) { - firstValidIndex = i - break - } - // If the last element is still before the cutoff, all elements are old. - if i == len(invs)-1 { - m.invocations[name] = []time.Time{} - return +// calculateAll iterates through all monitored entities to update their IPS +// and clean up old data. +func (m *Monitor) calculateAll() { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now().Unix() + // Cutoff for invocations to include in the window. + cutoff := now - int64(window.Seconds()) + + for name, buckets := range m.invocations { + var totalInvocationsInWindow int + // Sum invocations within the window and clean up old buckets. + for timestamp, count := range buckets { + if timestamp >= cutoff { + totalInvocationsInWindow += count + } else { + // This timestamp bucket is outside the window, so delete it. + delete(buckets, timestamp) } } - m.invocations[name] = invs[firstValidIndex:] + // Update the calculated IPS value for the current entity. + m.ipsValues[name] = float64(totalInvocationsInWindow) / window.Seconds() } } -// Global monitor instance +// --- Global Monitor Instance --- + var globalMonitor = NewMonitor() // Record records an invocation for a globally accessible monitor. diff --git a/pkg/pragma/pragma.go b/pkg/pragma/pragma.go index 4d7e2c9..a8ba063 100644 --- a/pkg/pragma/pragma.go +++ b/pkg/pragma/pragma.go @@ -40,7 +40,7 @@ type Pragma struct { // ParsePragma parses a map of attributes into a Pragma struct. // Example pragma: trigger=CPU threshold=0.8 -// Example pragma: trigger=IPS value=100 +// Example pragma: trigger=IPS threshold=100 func ParsePragma(attrs map[string]string) (*Pragma, error) { // If there are no attributes, it's a simple extraction without a delegate. if len(attrs) == 0 { From 87e4934ffd0cdd9a854a54cd6b191264f82e8a8a Mon Sep 17 00:00:00 2001 From: Tim Goodwin Date: Tue, 15 Jul 2025 20:59:05 -0700 Subject: [PATCH 3/3] use packagename+servicename instead of just package name for tracking IPS --- pkg/compiler/compiler.go | 5 +++-- pkg/lift/delegategen.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/compiler/compiler.go b/pkg/compiler/compiler.go index 55eaafb..7f8309b 100644 --- a/pkg/compiler/compiler.go +++ b/pkg/compiler/compiler.go @@ -648,8 +648,9 @@ func generateDelegateBlockStmts(res *extractionResult, namespace string, monitor // `decider := pragma.New...Decider(...)` var deciderArgs []ast.Expr if res.Pragma.SignalType == pragma.IPSTrigger { - // For IPS, the "name" is the service name, which we'll use to record invocations. - deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.STRING, Value: strconv.Quote(res.PackageName)}) + // For IPS, the "name" is the fully qualified interface name (e.g., "timelineservice.Service") + qualifiedName := res.PackageName + "." + res.InterfaceTypeName + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.STRING, Value: strconv.Quote(qualifiedName)}) deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}) } else { deciderArgs = append(deciderArgs, monitorIdent) diff --git a/pkg/lift/delegategen.go b/pkg/lift/delegategen.go index 6e8f517..1f762d9 100644 --- a/pkg/lift/delegategen.go +++ b/pkg/lift/delegategen.go @@ -88,7 +88,7 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa Methods: methodConfigs, Imports: imports, IPSMonitoringEnabled: ipsEnabled, - ServiceName: definingPkg.Name, + ServiceName: definingPkg.Name + "." + ifaceNameIdent.Name, } return data, nil