Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/compiler/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (c *Compiler) extractCode(outputDir string) ([]*extractionResult, error) {
fmt.Printf(" Generated client for %s\n", typeSpec.Name.Name)

// Generate the delegate for this service
delegateData, err := lift.GetDelegateTemplateData(typeSpec.Name, currentLoadedPkg)
delegateData, err := lift.GetDelegateTemplateData(typeSpec.Name, currentLoadedPkg, p)
if err != nil {
fmt.Printf(" [ERROR] Failed to gather delegate template data for %s: %v\n", typeSpec.Name.Name, err)
continue
Expand Down Expand Up @@ -645,17 +645,25 @@ func generateDelegateBlockStmts(res *extractionResult, namespace string, monitor
},
}

// `decider := pragma.NewCPUDecider(monitor, 0.5)`
// `decider := pragma.New...Decider(...)`
var deciderArgs []ast.Expr
if res.Pragma.SignalType == pragma.IPSTrigger {
// For IPS, the "name" is the fully qualified interface name (e.g., "timelineservice.Service")
qualifiedName := res.PackageName + "." + res.InterfaceTypeName
deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.STRING, Value: strconv.Quote(qualifiedName)})
deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)})
} else {
deciderArgs = append(deciderArgs, monitorIdent)
deciderArgs = append(deciderArgs, &ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)})
}

deciderDecl := &ast.AssignStmt{
Lhs: []ast.Expr{deciderIdent},
Tok: token.DEFINE,
Rhs: []ast.Expr{
&ast.CallExpr{
Fun: &ast.SelectorExpr{X: ast.NewIdent("pragma"), Sel: ast.NewIdent(fmt.Sprintf("New%sDecider", res.Pragma.SignalType))},
Args: []ast.Expr{
monitorIdent,
&ast.BasicLit{Kind: token.FLOAT, Value: strconv.FormatFloat(res.Pragma.Threshold, 'f', -1, 64)},
},
Fun: &ast.SelectorExpr{X: ast.NewIdent("pragma"), Sel: ast.NewIdent(fmt.Sprintf("New%sDecider", res.Pragma.SignalType))},
Args: deciderArgs,
},
},
}
Expand Down
121 changes: 121 additions & 0 deletions pkg/ips/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package ips

import (
"sync"
"time"
)

const (
// window is the duration over which to calculate IPS.
window = 5 * time.Second
// updateInterval is how often the background worker calculates the IPS values.
updateInterval = 1 * time.Second
)

// Monitor tracks invocations per second for different named entities.
// It uses a background goroutine to periodically calculate IPS values,
// making reads from GetIPS fast and non-blocking.
type Monitor struct {
// mu protects access to both invocations and ipsValues maps.
mu sync.RWMutex
// invocations stores counts per second (unix timestamp) for each named entity.
invocations map[string]map[int64]int
// ipsValues stores the latest calculated IPS value for each named entity.
ipsValues map[string]float64
// ticker triggers the periodic calculation.
ticker *time.Ticker
// done is used to signal the background goroutine to stop.
done chan struct{}
}

// NewMonitor creates a new IPS monitor and starts its background calculation worker.
func NewMonitor() *Monitor {
m := &Monitor{
invocations: make(map[string]map[int64]int),
ipsValues: make(map[string]float64),
ticker: time.NewTicker(updateInterval),
done: make(chan struct{}),
}
go m.runCalculator()
return m
}

// RecordInvocation records a single invocation for the given name.
// It increments a counter for the current second.
func (m *Monitor) RecordInvocation(name string) {
m.mu.Lock()
defer m.mu.Unlock()

now := time.Now().Unix()
if _, ok := m.invocations[name]; !ok {
m.invocations[name] = make(map[int64]int)
}
m.invocations[name][now]++
}

// GetIPS returns the most recently calculated invocations per second for the given name.
// This is a fast read operation.
func (m *Monitor) GetIPS(name string) float64 {
m.mu.RLock()
defer m.mu.RUnlock()
// Return the pre-calculated value.
return m.ipsValues[name]
}

// Close stops the background worker.
func (m *Monitor) Close() {
close(m.done)
}

// runCalculator is the background worker loop.
func (m *Monitor) runCalculator() {
for {
select {
case <-m.ticker.C:
m.calculateAll()
case <-m.done:
m.ticker.Stop()
return
}
}
}

// calculateAll iterates through all monitored entities to update their IPS
// and clean up old data.
func (m *Monitor) calculateAll() {
m.mu.Lock()
defer m.mu.Unlock()

now := time.Now().Unix()
// Cutoff for invocations to include in the window.
cutoff := now - int64(window.Seconds())

for name, buckets := range m.invocations {
var totalInvocationsInWindow int
// Sum invocations within the window and clean up old buckets.
for timestamp, count := range buckets {
if timestamp >= cutoff {
totalInvocationsInWindow += count
} else {
// This timestamp bucket is outside the window, so delete it.
delete(buckets, timestamp)
}
}
// Update the calculated IPS value for the current entity.
m.ipsValues[name] = float64(totalInvocationsInWindow) / window.Seconds()
}
}

// --- Global Monitor Instance ---

var globalMonitor = NewMonitor()

// Record records an invocation for a globally accessible monitor.
func Record(name string) {
globalMonitor.RecordInvocation(name)
}

// Get returns the IPS for a globally accessible monitor.
func Get(name string) float64 {
return globalMonitor.GetIPS(name)
}
12 changes: 11 additions & 1 deletion pkg/lift/delegategen.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

_ "embed"

"github.com/tgoodwin/monolift/pkg/pragma"
"github.com/tgoodwin/monolift/pkg/util"
"golang.org/x/tools/go/packages"
)
Expand All @@ -27,6 +28,8 @@ type DelegateTemplateData struct {
RemoteClientStructName string // e.g., "client"
Methods []MethodConfig
Imports map[string]string
IPSMonitoringEnabled bool
ServiceName string
}

// ExecuteDelegateTemplate generates the delegate client code for a service.
Expand Down Expand Up @@ -58,7 +61,7 @@ func ExecuteDelegateTemplate(entrypointDir string, data DelegateTemplateData) er
}

// GetDelegateTemplateData gathers all necessary information to generate a delegate for a given interface.
func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Package) (*DelegateTemplateData, error) {
func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Package, p *pragma.Pragma) (*DelegateTemplateData, error) {
imports := make(map[string]string)

methodConfigs, err := GetMethodConfigsForInterface(ifaceNameIdent, definingPkg, imports)
Expand All @@ -69,6 +72,11 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa
// Add the pragma package to imports, as the delegate will use the Decider interface.
imports["github.com/tgoodwin/monolift/pkg/pragma"] = "pragma"

ipsEnabled := p != nil && p.SignalType == pragma.IPSTrigger
if ipsEnabled {
imports["github.com/tgoodwin/monolift/pkg/ips"] = "ips"
}

fmt.Println("Generating delegate for interface:", definingPkg.Name)

data := &DelegateTemplateData{
Expand All @@ -79,6 +87,8 @@ func GetDelegateTemplateData(ifaceNameIdent *ast.Ident, definingPkg *packages.Pa
InterfaceTypeName: ifaceNameIdent.Name,
Methods: methodConfigs,
Imports: imports,
IPSMonitoringEnabled: ipsEnabled,
ServiceName: definingPkg.Name + "." + ifaceNameIdent.Name,
}

return data, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/lift/templates/delegate.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ func New{{.DelegateStructName}}(local, remote {{.InterfacePackageAlias}}.{{.Inte
{{range .Methods}}
// {{.Name}} delegates the call to either the local or remote implementation.
func (d *{{$.DelegateStructName}}) {{.FullSignature}} {
{{if $.IPSMonitoringEnabled}}ips.Record("{{$.ServiceName}}"){{end}}
if d.decider.ShouldDelegate() {
return d.remote.{{.Name}}({{join .ParamNames ", "}})
}
return d.local.{{.Name}}({{join .ParamNames ", "}})
}
{{end}}
{{end}}
56 changes: 42 additions & 14 deletions pkg/pragma/pragma.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,47 @@ import (
"errors"
"fmt"
"strconv"
"strings"
)

// SignalType defines the type of signal for a pragma.
type SignalType string

const (
// CPUTrigger represents a CPU usage signal.
CPUTrigger SignalType = "CPU"
// MemTrigger represents a memory usage signal.
MemTrigger SignalType = "MEM"
// Add other signal types here in the future
// IPSTrigger represents an invocations per second signal.
IPSTrigger SignalType = "IPS"
)

var signalTypes = map[SignalType]struct{}{
CPUTrigger: {},
MemTrigger: {},
IPSTrigger: {},
}

func getSignalType(s string) (SignalType, bool) {
_, ok := signalTypes[SignalType(s)]
return SignalType(s), ok
st := SignalType(strings.ToUpper(s))
_, ok := signalTypes[st]
return st, ok
}

// Pragma represents a parsed monolift directive.
type Pragma struct {
SignalType SignalType
Threshold float64
// Add other pragma fields as needed
}

// ParsePragma parses a map of attributes into a Pragma struct.
// Example pragma: trigger=CPU threshold=0.8
// Example pragma: trigger=IPS threshold=100
func ParsePragma(attrs map[string]string) (*Pragma, error) {
// If there are no attributes, it's a simple extraction without a delegate.
if len(attrs) == 0 {
// Return a pragma with no signal type, indicating simple extraction.
return &Pragma{}, nil
}

Expand All @@ -44,17 +57,32 @@ func ParsePragma(attrs map[string]string) (*Pragma, error) {
return nil, fmt.Errorf("invalid or unsupported signal type for 'trigger': %q", triggerVal)
}

thresholdVal, ok := attrs["threshold"]
if !ok {
return nil, errors.New("pragma is missing required 'threshold' attribute for delegate")
}
threshold, err := strconv.ParseFloat(thresholdVal, 64)
if err != nil {
return nil, fmt.Errorf("invalid 'threshold' value: %q, must be a float: %w", thresholdVal, err)
}
// For CPU/MEM, the threshold is a percentage (e.g., 0.5 for 50%).
if threshold <= 0 || threshold > 1.0 {
return nil, fmt.Errorf("invalid 'threshold' value: %f, must be between 0.0 and 1.0", threshold)
var threshold float64
var err error

switch signalType {
case IPSTrigger:
valueVal, ok := attrs["threshold"]
if !ok {
return nil, errors.New("pragma with trigger=IPS is missing required 'threshold' attribute")
}
threshold, err = strconv.ParseFloat(valueVal, 64)
if err != nil {
return nil, fmt.Errorf("invalid 'value' for IPS trigger: %q, must be a float: %w", valueVal, err)
}
case CPUTrigger, MemTrigger:
thresholdVal, ok := attrs["threshold"]
if !ok {
return nil, fmt.Errorf("pragma with trigger=%s is missing required 'threshold' attribute", signalType)
}
threshold, err = strconv.ParseFloat(thresholdVal, 64)
if err != nil {
return nil, fmt.Errorf("invalid 'threshold' value: %q, must be a float: %w", thresholdVal, err)
}
// For CPU/MEM, the threshold is a percentage (e.g., 0.5 for 50%).
if threshold <= 0 || threshold > 1.0 {
return nil, fmt.Errorf("invalid 'threshold' value for %s: %f, must be between 0.0 and 1.0", signalType, threshold)
}
}

return &Pragma{
Expand Down
Loading