diff --git a/pkg/compiler/compiler.go b/pkg/compiler/compiler.go index 3929078..cf8f46e 100644 --- a/pkg/compiler/compiler.go +++ b/pkg/compiler/compiler.go @@ -445,7 +445,7 @@ func (c *Compiler) extractCode(outputDir string) ([]*extractionResult, error) { fmt.Printf(" Generated client for %s\n", typeSpec.Name.Name) // Generate the delegate for this service - delegateData, err := lift.GetDelegateTemplateData(typeSpec.Name, currentLoadedPkg) + delegateData, err := lift.GetDelegateTemplateData(typeSpec.Name, currentLoadedPkg, p) if err != nil { fmt.Printf(" [ERROR] Failed to gather delegate template data for %s: %v\n", typeSpec.Name.Name, err) continue @@ -645,17 +645,25 @@ func generateDelegateBlockStmts(res *extractionResult, namespace string, monitor }, } - // `decider := pragma.NewCPUDecider(monitor, 0.5)` + // `decider := pragma.New...Decider(...)` + var deciderArgs []ast.Expr + if res.Pragma.SignalType == pragma.IPSTrigger { + // For IPS, the "name" is the fully qualified interface name (e.g., "timelineservice.Service") + qualifiedName := res.PackageName + "." + res.InterfaceTypeName + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.STRING, Value: strconv.Quote(qualifiedName)}) + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}) + } else { + deciderArgs = append(deciderArgs, monitorIdent) + deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}) + } + deciderDecl := &ast.AssignStmt{ Lhs: []ast.Expr{deciderIdent}, Tok: token.DEFINE, Rhs: []ast.Expr{ &ast.CallExpr{ - Fun: &ast.SelectorExpr{X: ast.NewIdent("pragma"), Sel: ast.NewIdent(fmt.Sprintf("New%sDecider", res.Pragma.SignalType))}, - Args: []ast.Expr{ - monitorIdent, - &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)}, - }, + Fun: &ast.SelectorExpr{X: ast.NewIdent("pragma"), Sel: ast.NewIdent(fmt.Sprintf("New%sDecider", res.Pragma.SignalType))}, + Args: deciderArgs, }, }, } diff --git a/pkg/ips/monitor.go b/pkg/ips/monitor.go new file mode 100644 index 0000000..da51051 --- /dev/null +++ b/pkg/ips/monitor.go @@ -0,0 +1,121 @@ +package ips + +import ( + "sync" + "time" +) + +const ( + // window is the duration over which to calculate IPS. + window = 5 * time.Second + // updateInterval is how often the background worker calculates the IPS values. + updateInterval = 1 * time.Second +) + +// Monitor tracks invocations per second for different named entities. +// It uses a background goroutine to periodically calculate IPS values, +// making reads from GetIPS fast and non-blocking. +type Monitor struct { + // mu protects access to both invocations and ipsValues maps. + mu sync.RWMutex + // invocations stores counts per second (unix timestamp) for each named entity. + invocations map[string]map[int64]int + // ipsValues stores the latest calculated IPS value for each named entity. + ipsValues map[string]float64 + // ticker triggers the periodic calculation. + ticker *time.Ticker + // done is used to signal the background goroutine to stop. + done chan struct{} +} + +// NewMonitor creates a new IPS monitor and starts its background calculation worker. +func NewMonitor() *Monitor { + m := &Monitor{ + invocations: make(map[string]map[int64]int), + ipsValues: make(map[string]float64), + ticker: time.NewTicker(updateInterval), + done: make(chan struct{}), + } + go m.runCalculator() + return m +} + +// RecordInvocation records a single invocation for the given name. +// It increments a counter for the current second. +func (m *Monitor) RecordInvocation(name string) { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now().Unix() + if _, ok := m.invocations[name]; !ok { + m.invocations[name] = make(map[int64]int) + } + m.invocations[name][now]++ +} + +// GetIPS returns the most recently calculated invocations per second for the given name. +// This is a fast read operation. +func (m *Monitor) GetIPS(name string) float64 { + m.mu.RLock() + defer m.mu.RUnlock() + // Return the pre-calculated value. + return m.ipsValues[name] +} + +// Close stops the background worker. +func (m *Monitor) Close() { + close(m.done) +} + +// runCalculator is the background worker loop. +func (m *Monitor) runCalculator() { + for { + select { + case <-m.ticker.C: + m.calculateAll() + case <-m.done: + m.ticker.Stop() + return + } + } +} + +// calculateAll iterates through all monitored entities to update their IPS +// and clean up old data. +func (m *Monitor) calculateAll() { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now().Unix() + // Cutoff for invocations to include in the window. + cutoff := now - int64(window.Seconds()) + + for name, buckets := range m.invocations { + var totalInvocationsInWindow int + // Sum invocations within the window and clean up old buckets. + for timestamp, count := range buckets { + if timestamp >= cutoff { + totalInvocationsInWindow += count + } else { + // This timestamp bucket is outside the window, so delete it. + delete(buckets, timestamp) + } + } + // Update the calculated IPS value for the current entity. + m.ipsValues[name] = float64(totalInvocationsInWindow) / window.Seconds() + } +} + +// --- Global Monitor Instance --- + +var globalMonitor = NewMonitor() + +// Record records an invocation for a globally accessible monitor. +func Record(name string) { + globalMonitor.RecordInvocation(name) +} + +// Get returns the IPS for a globally accessible monitor. +func Get(name string) float64 { + return globalMonitor.GetIPS(name) +} diff --git a/pkg/lift/delegategen.go b/pkg/lift/delegategen.go index 8b95352..1f762d9 100644 --- a/pkg/lift/delegategen.go +++ b/pkg/lift/delegategen.go @@ -10,6 +10,7 @@ import ( _ "embed" + "github.com/tgoodwin/monolift/pkg/pragma" "github.com/tgoodwin/monolift/pkg/util" "golang.org/x/tools/go/packages" ) @@ -27,6 +28,8 @@ type DelegateTemplateData struct { RemoteClientStructName string // e.g., "client" Methods []MethodConfig Imports map[string]string + IPSMonitoringEnabled bool + ServiceName string } // ExecuteDelegateTemplate generates the delegate client code for a service. @@ -58,7 +61,7 @@ func ExecuteDelegateTemplate(entrypointDir string, data DelegateTemplateData) er } // GetDelegateTemplateData gathers all necessary information to generate a delegate for a given interface. -func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Package) (*DelegateTemplateData, error) { +func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Package, p *pragma.Pragma) (*DelegateTemplateData, error) { imports := make(map[string]string) methodConfigs, err := GetMethodConfigsForInterface(ifaceNameIdent, definingPkg, imports) @@ -69,6 +72,11 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa // Add the pragma package to imports, as the delegate will use the Decider interface. imports["github.com/tgoodwin/monolift/pkg/pragma"] = "pragma" + ipsEnabled := p != nil && p.SignalType == pragma.IPSTrigger + if ipsEnabled { + imports["github.com/tgoodwin/monolift/pkg/ips"] = "ips" + } + fmt.Println("Generating delegate for interface:", definingPkg.Name) data := &DelegateTemplateData{ @@ -79,6 +87,8 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa InterfaceTypeName: ifaceNameIdent.Name, Methods: methodConfigs, Imports: imports, + IPSMonitoringEnabled: ipsEnabled, + ServiceName: definingPkg.Name + "." + ifaceNameIdent.Name, } return data, nil diff --git a/pkg/lift/templates/delegate.go.tmpl b/pkg/lift/templates/delegate.go.tmpl index f50e4dc..90d5523 100644 --- a/pkg/lift/templates/delegate.go.tmpl +++ b/pkg/lift/templates/delegate.go.tmpl @@ -22,9 +22,10 @@ func New{{.DelegateStructName}}(local, remote {{.InterfacePackageAlias}}.{{.Inte {{range .Methods}} // {{.Name}} delegates the call to either the local or remote implementation. func (d *{{$.DelegateStructName}}) {{.FullSignature}} { + {{if $.IPSMonitoringEnabled}}ips.Record("{{$.ServiceName}}"){{end}} if d.decider.ShouldDelegate() { return d.remote.{{.Name}}({{join .ParamNames ", "}}) } return d.local.{{.Name}}({{join .ParamNames ", "}}) } -{{end}} \ No newline at end of file +{{end}} diff --git a/pkg/pragma/pragma.go b/pkg/pragma/pragma.go index 46c4f16..a8ba063 100644 --- a/pkg/pragma/pragma.go +++ b/pkg/pragma/pragma.go @@ -4,34 +4,47 @@ import ( "errors" "fmt" "strconv" + "strings" ) +// SignalType defines the type of signal for a pragma. type SignalType string const ( + // CPUTrigger represents a CPU usage signal. CPUTrigger SignalType = "CPU" + // MemTrigger represents a memory usage signal. MemTrigger SignalType = "MEM" - // Add other signal types here in the future + // IPSTrigger represents an invocations per second signal. + IPSTrigger SignalType = "IPS" ) var signalTypes = map[SignalType]struct{}{ CPUTrigger: {}, MemTrigger: {}, + IPSTrigger: {}, } func getSignalType(s string) (SignalType, bool) { - _, ok := signalTypes[SignalType(s)] - return SignalType(s), ok + st := SignalType(strings.ToUpper(s)) + _, ok := signalTypes[st] + return st, ok } +// Pragma represents a parsed monolift directive. type Pragma struct { SignalType SignalType Threshold float64 + // Add other pragma fields as needed } +// ParsePragma parses a map of attributes into a Pragma struct. +// Example pragma: trigger=CPU threshold=0.8 +// Example pragma: trigger=IPS threshold=100 func ParsePragma(attrs map[string]string) (*Pragma, error) { // If there are no attributes, it's a simple extraction without a delegate. if len(attrs) == 0 { + // Return a pragma with no signal type, indicating simple extraction. return &Pragma{}, nil } @@ -44,17 +57,32 @@ func ParsePragma(attrs map[string]string) (*Pragma, error) { return nil, fmt.Errorf("invalid or unsupported signal type for 'trigger': %q", triggerVal) } - thresholdVal, ok := attrs["threshold"] - if !ok { - return nil, errors.New("pragma is missing required 'threshold' attribute for delegate") - } - threshold, err := strconv.ParseFloat(thresholdVal, 64) - if err != nil { - return nil, fmt.Errorf("invalid 'threshold' value: %q, must be a float: %w", thresholdVal, err) - } - // For CPU/MEM, the threshold is a percentage (e.g., 0.5 for 50%). - if threshold <= 0 || threshold > 1.0 { - return nil, fmt.Errorf("invalid 'threshold' value: %f, must be between 0.0 and 1.0", threshold) + var threshold float64 + var err error + + switch signalType { + case IPSTrigger: + valueVal, ok := attrs["threshold"] + if !ok { + return nil, errors.New("pragma with trigger=IPS is missing required 'threshold' attribute") + } + threshold, err = strconv.ParseFloat(valueVal, 64) + if err != nil { + return nil, fmt.Errorf("invalid 'value' for IPS trigger: %q, must be a float: %w", valueVal, err) + } + case CPUTrigger, MemTrigger: + thresholdVal, ok := attrs["threshold"] + if !ok { + return nil, fmt.Errorf("pragma with trigger=%s is missing required 'threshold' attribute", signalType) + } + threshold, err = strconv.ParseFloat(thresholdVal, 64) + if err != nil { + return nil, fmt.Errorf("invalid 'threshold' value: %q, must be a float: %w", thresholdVal, err) + } + // For CPU/MEM, the threshold is a percentage (e.g., 0.5 for 50%). + if threshold <= 0 || threshold > 1.0 { + return nil, fmt.Errorf("invalid 'threshold' value for %s: %f, must be between 0.0 and 1.0", signalType, threshold) + } } return &Pragma{ diff --git a/pkg/pragma/resolver.go b/pkg/pragma/resolver.go index bc064af..491aa00 100644 --- a/pkg/pragma/resolver.go +++ b/pkg/pragma/resolver.go @@ -4,33 +4,39 @@ import ( "fmt" "log" + "github.com/tgoodwin/monolift/pkg/ips" "github.com/tgoodwin/monolift/pkg/metrics" ) // NewDeciderFromPragma is a factory function that creates the appropriate // Decider implementation based on the pragma configuration. -func NewDeciderFromPragma(p *Pragma, monitor *metrics.Monitor) (Decider, error) { +func NewDeciderFromPragma(p *Pragma, monitor *metrics.Monitor, name string) (Decider, error) { if p == nil { return nil, fmt.Errorf("pragma cannot be nil") } - if monitor == nil { - return nil, fmt.Errorf("metrics monitor cannot be nil") - } switch p.SignalType { case CPUTrigger: + if monitor == nil { + return nil, fmt.Errorf("metrics monitor cannot be nil for CPU trigger") + } log.Printf("Creating CPUDecider with threshold %.2f%%", p.Threshold*100) return NewCPUDecider(monitor, p.Threshold), nil case MemTrigger: + if monitor == nil { + return nil, fmt.Errorf("metrics monitor cannot be nil for MEM trigger") + } log.Printf("Creating MemDecider with threshold %.2f%%", p.Threshold*100) return NewMemDecider(monitor, p.Threshold), nil + case IPSTrigger: + log.Printf("Creating IPSDecider for '%s' with threshold %.2f", name, p.Threshold) + return NewIPSDecider(name, p.Threshold), nil default: return nil, fmt.Errorf("unsupported signal type for decider: %s", p.SignalType) } } // CPUDecider decides whether to delegate based on CPU usage. -// It provides a simple, stateless check against a threshold. type CPUDecider struct { monitor *metrics.Monitor threshold float64 @@ -45,9 +51,7 @@ func NewCPUDecider(monitor *metrics.Monitor, threshold float64) *CPUDecider { } // ShouldDelegate returns true if the current CPU usage exceeds the threshold. -// This is a non-blocking read of the monitor's last known state. func (d *CPUDecider) ShouldDelegate() bool { - // The monitor provides CPU usage as a percentage (0.0 to 1.0). cpuUsage, ok := d.monitor.CPUUsagePercent() if !ok { return false @@ -56,7 +60,6 @@ func (d *CPUDecider) ShouldDelegate() bool { } // MemDecider decides whether to delegate based on memory usage. -// It provides a simple, stateless check against a threshold. type MemDecider struct { monitor *metrics.Monitor threshold float64 @@ -71,12 +74,29 @@ func NewMemDecider(monitor *metrics.Monitor, threshold float64) *MemDecider { } // ShouldDelegate returns true if the current memory usage exceeds the threshold. -// This is a non-blocking read of the monitor's last known state. func (d *MemDecider) ShouldDelegate() bool { - // The monitor provides memory usage as a percentage (0.0 to 1.0). memUsage, ok := d.monitor.MemoryUsagePercent() if !ok { return false } return memUsage > d.threshold } + +// IPSDecider decides whether to delegate based on invocations per second. +type IPSDecider struct { + name string + threshold float64 +} + +// NewIPSDecider creates a new IPS-based decider. +func NewIPSDecider(name string, threshold float64) *IPSDecider { + return &IPSDecider{ + name: name, + threshold: threshold, + } +} + +// ShouldDelegate returns true if the current IPS exceeds the threshold. +func (d *IPSDecider) ShouldDelegate() bool { + return ips.Get(d.name) > d.threshold +}