Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support selectable image #44

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 133 additions & 125 deletions pool-agent/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@ import (
slm "github.com/whywaita/shoes-lxd-multi/server/pkg/api"
)

// Agent is an agent for pool mode.
type Agent struct {
type AgentConfig struct {
ImageAlias string
InstanceSource api.InstanceSource

ResourceTypesMap []ResourceTypesMap
ResourceTypesCounts ResourceTypesCounts
Client lxd.InstanceServer

CheckInterval time.Duration
WaitIdleTime time.Duration
ZombieAllowTime time.Duration

creatingInstances map[string]instances
deletingInstances instances
currentImage struct {
Hash string
CreatedAt time.Time
}
registry *prometheus.Registry
}

// Agent is an agent for pool mode.
type Agent struct {
Config map[string]*AgentConfig
CheckInterval time.Duration
WaitIdleTime time.Duration
ZombieAllowTime time.Duration
registry *prometheus.Registry
ResourceTypesMap ResourceTypesMap
Client lxd.InstanceServer
}

var (
Expand All @@ -44,76 +47,78 @@ var (
type instances map[string]struct{}

func newAgent(ctx context.Context) (*Agent, error) {
conf, err := LoadConfig()
confmap, err := LoadConfig()
if err != nil {
return nil, fmt.Errorf("load config: %w", err)
}
source, err := slm.ParseAlias(conf.ImageAlias)
if err != nil {
return nil, err
}
c, err := lxd.ConnectLXDUnixWithContext(ctx, "", &lxd.ConnectionArgs{})
if err != nil {
return nil, fmt.Errorf("connect lxd: %w", err)
fmt.Printf("config: %+v", confmap)
ac := make(map[string]*AgentConfig, len(confmap.Config))
for version, conf := range confmap.Config {
s, err := slm.ParseAlias(conf.ImageAlias)
if err != nil {
return nil, err
}
creatingInstances := make(map[string]instances)
for k, v := range conf.ResourceTypesCounts {
configuredInstancesCount.WithLabelValues(k).Set(float64(v))
}
for name, _ := range confmap.ResourceTypesMap {
creatingInstances[name] = make(instances)
}
ac[version] = &AgentConfig{
ImageAlias: conf.ImageAlias,
InstanceSource: *s,

ResourceTypesCounts: conf.ResourceTypesCounts,

currentImage: struct {
Hash string
CreatedAt time.Time
}{Hash: "", CreatedAt: time.Time{}},

creatingInstances: creatingInstances,
deletingInstances: make(instances),
}
}
checkInterval, waitIdleTime, zombieAllowTime, err := LoadParams()
if err != nil {
return nil, fmt.Errorf("load params: %w", err)
}
creatingInstances := make(map[string]instances)
for _, rt := range conf.ResourceTypesMap {
creatingInstances[rt.Name] = make(instances)
}

registry := prometheus.NewRegistry()
registry.Register(configuredInstancesCount)
registry.Register(lxdInstances)
for k, v := range conf.ResourceTypesCounts {
configuredInstancesCount.WithLabelValues(k).Set(float64(v))
c, err := lxd.ConnectLXDUnixWithContext(ctx, "", &lxd.ConnectionArgs{})
if err != nil {
return nil, fmt.Errorf("connect lxd: %w", err)
}
agent := &Agent{
ImageAlias: conf.ImageAlias,
InstanceSource: *source,

ResourceTypesMap: conf.ResourceTypesMap,
ResourceTypesCounts: conf.ResourceTypesCounts,
Client: c,

CheckInterval: checkInterval,
WaitIdleTime: waitIdleTime,
ZombieAllowTime: zombieAllowTime,
currentImage: struct {
Hash string
CreatedAt time.Time
}{Hash: "", CreatedAt: time.Time{}},

creatingInstances: creatingInstances,
deletingInstances: make(instances),
registry: registry,
agent := &Agent{
Config: ac,
Client: c,
CheckInterval: checkInterval,
WaitIdleTime: waitIdleTime,
ZombieAllowTime: zombieAllowTime,
registry: registry,
ResourceTypesMap: confmap.ResourceTypesMap,
}

return agent, nil
}

func (a *Agent) reloadConfig() error {
conf, err := LoadConfig()
confmap, err := LoadConfig()
if err != nil {
return fmt.Errorf("reload config: %w", err)
}

for k, v := range conf.ResourceTypesCounts {
configuredInstancesCount.WithLabelValues(k).Set(float64(v))
}

if conf.ImageAlias != a.ImageAlias {
source, err := slm.ParseAlias(conf.ImageAlias)
if err != nil {
return fmt.Errorf("parse image alias: %w", err)
for version, conf := range confmap.Config {
for k, v := range conf.ResourceTypesCounts {
configuredInstancesCount.WithLabelValues(k).Set(float64(v))
}
a.InstanceSource = *source
a.ImageAlias = conf.ImageAlias
a.Config[version].ImageAlias = conf.ImageAlias
a.Config[version].ResourceTypesCounts = conf.ResourceTypesCounts
}
a.ResourceTypesMap = conf.ResourceTypesMap
a.ResourceTypesCounts = conf.ResourceTypesCounts
a.ResourceTypesMap = confmap.ResourceTypesMap
return nil
}

Expand Down Expand Up @@ -141,16 +146,16 @@ func (a *Agent) Run(ctx context.Context, sigHupCh chan os.Signal) error {
}
}

func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName string) int {
func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName, version string) int {
count := 0
for _, i := range instances {
if i.StatusCode != api.Frozen {
continue
}
if i.Config[configKeyImageAlias] != a.ImageAlias {
if i.Config[configKeyResourceType] != resourceTypeName {
continue
}
if i.Config[configKeyResourceType] != resourceTypeName {
if i.Config[configKeyImageAlias] != a.Config[version].ImageAlias {
continue
}
if _, ok := i.Config[configKeyRunnerName]; ok {
Expand Down Expand Up @@ -179,70 +184,70 @@ func (a *Agent) adjustInstancePool() error {
}

toDelete := []string{}

for _, rt := range a.ResourceTypesMap {
current := a.countPooledInstances(s, rt.Name)
creating := len(a.creatingInstances[rt.Name])
rtCount, ok := a.ResourceTypesCounts[rt.Name]
if !ok {
toDelete = append(toDelete, rt.Name)
continue
} else if rtCount == 0 {
toDelete = append(toDelete, rt.Name)
continue
}
createCount := rtCount - current - creating
if createCount < 1 {
continue
}
slog.Info("Create instances", "count", createCount, "flavor", rt.Name)
for i := 0; i < createCount; i++ {
name, err := generateInstanceName()
if err != nil {
return fmt.Errorf("generate instance name: %w", err)
for version, _ := range a.Config {
for rtName, rt := range a.ResourceTypesMap {
current := a.countPooledInstances(s, rtName, version)
creating := len(a.Config[version].creatingInstances[rtName])
rtCount, ok := a.Config[version].ResourceTypesCounts[rtName]
if !ok {
toDelete = append(toDelete, rtName)
continue
} else if rtCount == 0 {
toDelete = append(toDelete, rtName)
continue
}
l := slog.With("instance", name, "flavor", rt.Name)
a.creatingInstances[rt.Name][name] = struct{}{}
createCount := rtCount - current - creating
if createCount < 1 {
continue
}
slog.Info("Create instances", "count", createCount, "flavor", rtName)
for i := 0; i < createCount; i++ {
iname, err := generateInstanceName()
if err != nil {
return fmt.Errorf("generate instance name: %w", err)
}
l := slog.With("instance", iname, "flavor", rtName, "version", version)
a.Config[version].creatingInstances[rtName][iname] = struct{}{}

defer delete(a.creatingInstances[rt.Name], name)
defer delete(a.Config[version].creatingInstances[rtName], iname)

if err := a.createInstance(name, rt, l); err != nil {
l.Error("failed to create instance", "err", err.Error())
if err := a.createInstance(iname, rtName, rt, version, l); err != nil {
l.Error("failed to create instance", "err", err.Error())
}
}
}
}

for _, i := range s {
l := slog.With("instance", i.Name)
if _, ok := a.ResourceTypesCounts[i.Config[configKeyResourceType]]; !ok {
toDelete = append(toDelete, i.Config[configKeyResourceType])
}
for _, rt := range toDelete {
if i.Config[configKeyResourceType] == rt {
l := l.With("flavor", rt)
l.Info("Deleting disabled flavor instance")
if err := a.deleteInstance(i); err != nil {
l.Error("failed to delete instance", "err", err.Error())
continue
for _, i := range s {
l := slog.With("instance", i.Name, "version", version)
if _, ok := a.Config[version].ResourceTypesCounts[i.Config[configKeyResourceType]]; !ok {
toDelete = append(toDelete, i.Config[configKeyResourceType])
}
for _, rt := range toDelete {
if i.Config[configKeyResourceType] == rt {
l := l.With("flavor", rt)
l.Info("Deleting disabled flavor instance")
if err := a.deleteInstance(i, version); err != nil {
l.Error("failed to delete instance", "err", err.Error())
continue
}
l.Info("Deleted disabled flavor instance")
}
l.Info("Deleted disabled flavor instance")
}
}
if a.isZombieInstance(i) {
l.Info("Deleting zombie instance")
if err := a.deleteInstance(i); err != nil {
l.Error("failed to delete zombie instance", "err", err.Error())
if a.isZombieInstance(i, version) {
l.Info("Deleting zombie instance")
if err := a.deleteInstance(i, version); err != nil {
l.Error("failed to delete zombie instance", "err", err.Error())
}
l.Info("Deleted zombie instance")
}
l.Info("Deleted zombie instance")
}
if isOld, err := a.isOldImageInstance(i); err != nil {
l.Error("failed to check old image instance", "err", err.Error())
} else if isOld {
l.Info("Deleting old image instance")
if err := a.deleteInstance(i); err != nil {
l.Error("failed to delete old image instance", "err", err.Error())
if isOld, err := a.isOldImageInstance(i, version); err != nil {
l.Error("failed to check old image instance", "err", err.Error())
} else if isOld {
l.Info("Deleting old image instance")
if err := a.deleteInstance(i, version); err != nil {
l.Error("failed to delete old image instance", "err", err.Error())
}
l.Info("Deleted old image instance")
}
l.Info("Deleted old image instance")
}
}

Expand Down Expand Up @@ -282,52 +287,55 @@ func (a *Agent) collectMetrics() error {
return nil
}

func (a *Agent) isZombieInstance(i api.Instance) bool {
func (a *Agent) isZombieInstance(i api.Instance, version string) bool {
if i.StatusCode == api.Frozen {
return false
}
if _, ok := i.Config[configKeyRunnerName]; ok {
return false
}
if i.Config[configKeyImageAlias] != a.ImageAlias {
if i.Config[configKeyImageAlias] != a.Config[version].ImageAlias {
return false
}
if i.CreatedAt.Add(a.ZombieAllowTime).After(time.Now()) {
return false
}
if rt, ok := i.Config[configKeyResourceType]; !ok {
return false
} else if _, ok := a.creatingInstances[rt][i.Name]; ok {
} else if _, ok := a.Config[version].creatingInstances[rt][i.Name]; ok {
return false
}
return true
}

func (a *Agent) isOldImageInstance(i api.Instance) (bool, error) {
func (a *Agent) isOldImageInstance(i api.Instance, version string) (bool, error) {
baseImage, ok := i.Config["volatile.base_image"]
if !ok {
return false, errors.New("Failed to get volatile.base_image")
}
if baseImage != a.currentImage.Hash {
if i.CreatedAt.Before(a.currentImage.CreatedAt) {
if i.Config[configKeyImageAlias] != a.Config[version].ImageAlias {
return false, nil
}
if baseImage != a.Config[version].currentImage.Hash {
if i.CreatedAt.Before(a.Config[version].currentImage.CreatedAt) {
if i.StatusCode == api.Frozen {
return true, nil
}
return false, nil
}
a.currentImage.Hash = baseImage
a.currentImage.CreatedAt = i.CreatedAt
a.Config[version].currentImage.Hash = baseImage
a.Config[version].currentImage.CreatedAt = i.CreatedAt
return false, nil
}
return false, nil
}

func (a *Agent) deleteInstance(i api.Instance) error {
if _, ok := a.deletingInstances[i.Name]; ok {
func (a *Agent) deleteInstance(i api.Instance, version string) error {
if _, ok := a.Config[version].deletingInstances[i.Name]; ok {
return nil
}
a.deletingInstances[i.Name] = struct{}{}
defer delete(a.deletingInstances, i.Name)
a.Config[version].deletingInstances[i.Name] = struct{}{}
defer delete(a.Config[version].deletingInstances, i.Name)
_, etag, err := a.Client.GetInstance(i.Name)
if err != nil {
return fmt.Errorf("get instance: %w", err)
Expand Down
Loading
Loading