Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Jan 29, 2025
1 parent 99a039d commit 66390de
Show file tree
Hide file tree
Showing 13 changed files with 1,099 additions and 1,085 deletions.
5 changes: 2 additions & 3 deletions backend/provisioner/runner_scaling_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ func provisionRunner(scaling scaling.RunnerScaling) InMemResourceProvisionerFn {
logger.Debugf("updating module runtime for %s with endpoint %s", module, endpointURI)
dk := deployment.String()
_, err = schemaClient.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{Event: &schemapb.ModuleRuntimeEvent{
Module: moduleName,
DeploymentKey: &dk,
DeploymentKey: dk,
Deployment: &schemapb.ModuleRuntimeDeployment{
DeploymentKey: deployment.String(),
Endpoint: endpointURI,
Expand All @@ -114,7 +113,7 @@ func provisionRunner(scaling scaling.RunnerScaling) InMemResourceProvisionerFn {
return nil, fmt.Errorf("failed to update module runtime: %w", err)
}
return &schema.ModuleRuntimeEvent{
Module: moduleName,
DeploymentKey: deployment,
Deployment: optional.Some(schema.ModuleRuntimeDeployment{
DeploymentKey: deployment,
Endpoint: endpointURI,
Expand Down
8 changes: 4 additions & 4 deletions backend/schemaservice/changesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import (
"github.com/block/ftl/internal/key"
)

func (r *SchemaState) ActiveChangeset() optional.Option[*schema.Changeset] {
func (r *SchemaState) ActiveChangeset() optional.Option[*changesetDetails] {
for _, changeset := range r.changesets {
if changeset.State == schema.ChangesetStateProvisioning {
return optional.Some(changeset)
}
}
return optional.None[*schema.Changeset]()
return optional.None[*changesetDetails]()
}

func (r *SchemaState) GetChangeset(changeset key.Changeset) (*schema.Changeset, error) {
func (r *SchemaState) GetChangeset(changeset key.Changeset) (*changesetDetails, error) {
c, ok := r.changesets[changeset]
if !ok {
return nil, fmt.Errorf("changeset %s not found", changeset)
}
return c, nil
}

func (r *SchemaState) GetChangesets() map[key.Changeset]*schema.Changeset {
func (r *SchemaState) GetChangesets() map[key.Changeset]*changesetDetails {
return r.changesets
}
75 changes: 35 additions & 40 deletions backend/schemaservice/eventextractor.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package schemaservice

import (
"fmt"
"iter"
"slices"

"github.com/alecthomas/types/tuple"
"golang.org/x/exp/maps"

"github.com/block/ftl/common/schema"
islices "github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/iterops"
"github.com/block/ftl/internal/key"
)
Expand All @@ -27,62 +25,59 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[schema.E

previousAllChangesets := previous.GetChangesets()
allChangesets := maps.Values(current.GetChangesets())
slices.SortFunc(allChangesets, func(a, b *schema.Changeset) int {
newDeployments := map[key.Deployment]bool{}
slices.SortFunc(allChangesets, func(a, b *changesetDetails) int {
return a.CreatedAt.Compare(b.CreatedAt)
})
for _, changeset := range allChangesets {
pc, ok := previousAllChangesets[changeset.Key]
if !ok {
events = append(events, &schema.ChangesetCreatedEvent{
Changeset: changeset,
})
continue
}

// Find changes in modules
prevModules := islices.Reduce(pc.Modules, map[key.Deployment]*schema.Module{}, func(acc map[key.Deployment]*schema.Module, m *schema.Module) map[key.Deployment]*schema.Module {
acc[m.Runtime.Deployment.DeploymentKey] = m
return acc
})
for _, module := range changeset.Modules {
prevModule, ok := prevModules[module.Runtime.Deployment.DeploymentKey]
if !ok {
panic(fmt.Sprintf("can not create deployment %s in %s", module.Runtime.Deployment.DeploymentKey, changeset.Key))
}
if !prevModule.Equals(module) {
events = append(events, &schema.DeploymentSchemaUpdatedEvent{
Key: module.Runtime.Deployment.DeploymentKey,
Schema: module,
Changeset: &changeset.Key,
if ok {
// Commit final state of changeset
if changeset.State == schema.ChangesetStateCommitted && pc.State != schema.ChangesetStateCommitted {
events = append(events, &schema.ChangesetCommittedEvent{
Key: changeset.Key,
})
} else if changeset.State == schema.ChangesetStateFailed && pc.State != schema.ChangesetStateFailed {
events = append(events, &schema.ChangesetFailedEvent{
Key: changeset.Key,
Error: changeset.Error,
})
}
continue
}

// Commit final state of changeset
if changeset.State == schema.ChangesetStateCommitted && pc.State != schema.ChangesetStateCommitted {
events = append(events, &schema.ChangesetCommittedEvent{
Key: changeset.Key,
})
// Maintain previous state to avoid unnecessary events
for _, module := range changeset.Modules {
previousAllDeployments[module.Runtime.Deployment.DeploymentKey] = module
}
} else if changeset.State == schema.ChangesetStateFailed && pc.State != schema.ChangesetStateFailed {
events = append(events, &schema.ChangesetFailedEvent{
Key: changeset.Key,
Error: changeset.Error,
// New changeset and associated modules
events = append(events, &schema.ChangesetCreatedEvent{
Changeset: hydrateChangeset(&current, changeset),
})
// Find new deployments from the changeset
for _, deployment := range changeset.Deployments {
// changeset is always a new deployment
events = append(events, &schema.DeploymentCreatedEvent{
Key: deployment,
Schema: current.deployments[deployment],
Changeset: &changeset.Key,
})
newDeployments[deployment] = true
}
continue

}

for key, deployment := range current.GetDeployments() {
for key, deployment := range current.GetAllActiveDeployments() {
if newDeployments[key] {
// Already handled in the changeset
continue
}
pd, ok := previousAllDeployments[key]
if !ok {
// We have lost the changeset that created this, this should only happen
// if the changeset was deleted as part of raft cleanup.
events = append(events, &schema.DeploymentCreatedEvent{
Key: key,
Schema: deployment,
})
} else if !pd.Equals(deployment) {
// TODO: this seems super inefficient, we should not need to do equality checks on every deployment
events = append(events, &schema.DeploymentSchemaUpdatedEvent{
Key: key,
Schema: deployment,
Expand Down
85 changes: 53 additions & 32 deletions backend/schemaservice/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,25 @@ func handleDeploymentActivatedEvent(t SchemaState, e *schema.DeploymentActivated
}
existing.ModRuntime().ModDeployment().ActivatedAt = optional.Some(e.ActivatedAt)
existing.ModRuntime().ModScaling().MinReplicas = int32(e.MinReplicas)
t.activeDeployments[e.Key] = optional.Ptr(e.Changeset) //TODO
t.activeDeployments[existing.Name] = e.Key
return t, nil
}

func handleDeploymentDeactivatedEvent(t SchemaState, e *schema.DeploymentDeactivatedEvent) (SchemaState, error) {
//TODO: fix deactivation
existing, ok := t.deployments[e.Key]
if !ok {
return t, fmt.Errorf("deployment %s not found", e.Key)
}
existing.ModRuntime().ModScaling().MinReplicas = 0
delete(t.activeDeployments, e.Key)
delete(t.activeDeployments, existing.Name)
return t, nil
}

func handleVerbRuntimeEvent(t SchemaState, e *schema.VerbRuntimeEvent) (SchemaState, error) {
m, ok := t.provisioning[e.Module]
if !ok {
return t, fmt.Errorf("module %s not found", e.Module)
m, err := provisioningModule(&t, e.Module)
if err != nil {
return SchemaState{}, err
}
for verb := range slices.FilterVariants[*schema.Verb](m.Decls) {
if verb.Name == e.ID {
Expand All @@ -116,10 +117,22 @@ func handleVerbRuntimeEvent(t SchemaState, e *schema.VerbRuntimeEvent) (SchemaSt
return t, nil
}

func handleTopicRuntimeEvent(t SchemaState, e *schema.TopicRuntimeEvent) (SchemaState, error) {
m, ok := t.provisioning[e.Module]
func provisioningModule(t *SchemaState, module string) (*schema.Module, error) {
d, ok := t.provisioning[module]
if !ok {
return nil, fmt.Errorf("module %s not found", module)
}
m, ok := t.deployments[d]
if !ok {
return t, fmt.Errorf("module %s not found", e.Module)
return nil, fmt.Errorf("deployment %s not found", d)
}
return m, nil
}

func handleTopicRuntimeEvent(t SchemaState, e *schema.TopicRuntimeEvent) (SchemaState, error) {
m, err := provisioningModule(&t, e.Module)
if err != nil {
return SchemaState{}, err
}
for topic := range slices.FilterVariants[*schema.Topic](m.Decls) {
if topic.Name == e.ID {
Expand All @@ -131,9 +144,9 @@ func handleTopicRuntimeEvent(t SchemaState, e *schema.TopicRuntimeEvent) (Schema
}

func handleDatabaseRuntimeEvent(t SchemaState, e *schema.DatabaseRuntimeEvent) (SchemaState, error) {
m, ok := t.provisioning[e.Module]
if !ok {
return t, fmt.Errorf("module %s not found", e.Module)
m, err := provisioningModule(&t, e.Module)
if err != nil {
return SchemaState{}, err
}
for _, decl := range m.Decls {
if db, ok := decl.(*schema.Database); ok && db.Name == e.ID {
Expand All @@ -148,20 +161,9 @@ func handleDatabaseRuntimeEvent(t SchemaState, e *schema.DatabaseRuntimeEvent) (
}

func handleModuleRuntimeEvent(t SchemaState, e *schema.ModuleRuntimeEvent) (SchemaState, error) {
var module *schema.Module
if dk, ok := e.DeploymentKey.Get(); ok {
deployment, err := key.ParseDeploymentKey(dk)
if err != nil {
return t, fmt.Errorf("invalid deployment key: %w", err)
}
module = t.deployments[deployment]
} else {
// updating a provisioning module
m, ok := t.provisioning[e.Module]
if !ok {
return t, fmt.Errorf("module %s not found", e.Module)
}
module = m
module := t.deployments[e.DeploymentKey]
if module == nil {
return t, fmt.Errorf("deployment %s not found", e.Deployment)
}
if base, ok := e.Base.Get(); ok {
module.ModRuntime().Base = base
Expand All @@ -176,7 +178,8 @@ func handleModuleRuntimeEvent(t SchemaState, e *schema.ModuleRuntimeEvent) (Sche
}

func handleProvisioningCreatedEvent(t SchemaState, e *schema.ProvisioningCreatedEvent) (SchemaState, error) {
t.provisioning[e.DesiredModule.Name] = e.DesiredModule
t.deployments[e.DesiredModule.Runtime.Deployment.DeploymentKey] = e.DesiredModule
t.provisioning[e.DesiredModule.Name] = e.DesiredModule.Runtime.Deployment.DeploymentKey
return t, nil
}

Expand All @@ -191,10 +194,28 @@ func handleChangesetCreatedEvent(t SchemaState, e *schema.ChangesetCreatedEvent)
return t, fmt.Errorf("can not create active changeset: %s already active", active.Key)
}
}
t.changesets[e.Changeset.Key] = e.Changeset
deployments := []key.Deployment{}
for _, mod := range e.Changeset.Modules {
t.deployments[mod.Runtime.Deployment.DeploymentKey] = mod
t.provisioning[mod.Name] = mod
if mod.Runtime == nil {
return t, fmt.Errorf("module %s has no runtime", mod.Name)
}
if mod.Runtime.Deployment == nil {
return t, fmt.Errorf("module %s has no deployment", mod.Name)
}
if mod.Runtime.Deployment.DeploymentKey.IsZero() {
return t, fmt.Errorf("module %s has no deployment key", mod.Name)
}
deploymentKey := mod.Runtime.Deployment.DeploymentKey
deployments = append(deployments, deploymentKey)
t.deployments[deploymentKey] = mod
t.provisioning[mod.Name] = deploymentKey
}
t.changesets[e.Changeset.Key] = &changesetDetails{
Key: e.Changeset.Key,
CreatedAt: e.Changeset.CreatedAt,
Deployments: deployments,
State: e.Changeset.State,
Error: e.Changeset.Error,
}
return t, nil
}
Expand All @@ -205,9 +226,9 @@ func handleChangesetCommittedEvent(t SchemaState, e *schema.ChangesetCommittedEv
return SchemaState{}, fmt.Errorf("changeset %s not found", e.Key)
}
changeset.State = schema.ChangesetStateCommitted
for _, module := range changeset.Modules {
t.deployments[module.Runtime.Deployment.DeploymentKey] = module
t.activeDeployments[module.Runtime.Deployment.DeploymentKey] = true
for _, depName := range changeset.Deployments {
dep := t.deployments[depName]
t.activeDeployments[dep.Name] = depName
}
return t, nil
}
Expand Down
3 changes: 1 addition & 2 deletions backend/schemaservice/schemaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.Pul
}

func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Request[ftlv1.UpdateDeploymentRuntimeRequest]) (*connect.Response[ftlv1.UpdateDeploymentRuntimeResponse], error) {
deployment, err := key.ParseDeploymentKey(*req.Msg.Event.DeploymentKey)
deployment, err := key.ParseDeploymentKey(req.Msg.Event.DeploymentKey)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}
Expand Down Expand Up @@ -177,7 +177,6 @@ func (s *Service) CreateChangeset(ctx context.Context, req *connect.Request[ftlv
}

// TODO: validate changeset schema with canonical schema

err = s.State.Publish(ctx, &schema.ChangesetCreatedEvent{
Changeset: changeset,
})
Expand Down
Loading

0 comments on commit 66390de

Please sign in to comment.