Skip to content

Commit

Permalink
more work
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Jan 29, 2025
1 parent 66390de commit b5f598b
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 27 deletions.
2 changes: 1 addition & 1 deletion backend/provisioner/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (t *Task) Progress(ctx context.Context) error {
if err != nil {
return fmt.Errorf("schema server failed to handle provisioning event: %w", err)
}
_, err = t.deployment.DeploymentState.ApplyEvent(event)
_, err = t.deployment.DeploymentState.ApplyEvent(ctx, event)
if err != nil {
return fmt.Errorf("failed to apply event: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/provisioner/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (reg *ProvisionerRegistry) CreateDeployment(ctx context.Context, desiredMod
module := desiredModule.GetName()
state := schemaservice.NewSchemaState()

_, err := state.ApplyEvent(&schema.ProvisioningCreatedEvent{
_, err := state.ApplyEvent(ctx, &schema.ProvisioningCreatedEvent{
DesiredModule: desiredModule,
})
if err != nil {
Expand Down
47 changes: 29 additions & 18 deletions backend/schemaservice/eventextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,26 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[schema.E

previousAllChangesets := previous.GetChangesets()
allChangesets := maps.Values(current.GetChangesets())
newDeployments := map[key.Deployment]bool{}
handledDeployments := map[key.Deployment]bool{}
slices.SortFunc(allChangesets, func(a, b *changesetDetails) int {
return a.CreatedAt.Compare(b.CreatedAt)
})
for _, changeset := range allChangesets {
pc, ok := previousAllChangesets[changeset.Key]
if ok {
for _, key := range changeset.Deployments {
pd, ok := previous.deployments[key]
deployment := current.deployments[key]
if ok && !pd.Equals(deployment) {
handledDeployments[key] = true
// TODO: this seems super inefficient, we should not need to do equality checks on every deployment
events = append(events, &schema.DeploymentSchemaUpdatedEvent{
Key: key,
Schema: deployment,
Changeset: &changeset.Key,
})
}
}
// Commit final state of changeset
if changeset.State == schema.ChangesetStateCommitted && pc.State != schema.ChangesetStateCommitted {
events = append(events, &schema.ChangesetCommittedEvent{
Expand All @@ -43,28 +56,26 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[schema.E
Error: changeset.Error,
})
}
continue
}
// New changeset and associated modules
events = append(events, &schema.ChangesetCreatedEvent{
Changeset: hydrateChangeset(&current, changeset),
})
// Find new deployments from the changeset
for _, deployment := range changeset.Deployments {
// changeset is always a new deployment
events = append(events, &schema.DeploymentCreatedEvent{
Key: deployment,
Schema: current.deployments[deployment],
Changeset: &changeset.Key,
} else {
// New changeset and associated modules
events = append(events, &schema.ChangesetCreatedEvent{
Changeset: hydrateChangeset(&current, changeset),
})
newDeployments[deployment] = true
// Find new deployments from the changeset
for _, deployment := range changeset.Deployments {
// changeset is always a new deployment
events = append(events, &schema.DeploymentCreatedEvent{
Key: deployment,
Schema: current.deployments[deployment],
Changeset: &changeset.Key,
})
handledDeployments[deployment] = true
}
}
continue

}

for key, deployment := range current.GetAllActiveDeployments() {
if newDeployments[key] {
if handledDeployments[key] {
// Already handled in the changeset
continue
}
Expand Down
12 changes: 9 additions & 3 deletions backend/schemaservice/events.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package schemaservice

import (
"context"
"fmt"

"github.com/alecthomas/types/optional"

"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
)

// TODO: these should be event methods once we can move them to this package

// ApplyEvent applies an event to the schema state
func (r SchemaState) ApplyEvent(event schema.Event) (SchemaState, error) {
func (r SchemaState) ApplyEvent(ctx context.Context, event schema.Event) (SchemaState, error) {
logger := log.FromContext(ctx)
logger.Infof("applying event: %T%v", event, event)
switch e := event.(type) {
case *schema.DeploymentCreatedEvent:
return handleDeploymentCreatedEvent(r, e)
Expand All @@ -38,7 +42,7 @@ func (r SchemaState) ApplyEvent(event schema.Event) (SchemaState, error) {
case *schema.ChangesetCreatedEvent:
return handleChangesetCreatedEvent(r, e)
case *schema.ChangesetCommittedEvent:
return handleChangesetCommittedEvent(r, e)
return handleChangesetCommittedEvent(ctx, r, e)
case *schema.ChangesetFailedEvent:
return handleChangesetFailedEvent(r, e)
default:
Expand Down Expand Up @@ -220,13 +224,15 @@ func handleChangesetCreatedEvent(t SchemaState, e *schema.ChangesetCreatedEvent)
return t, nil
}

func handleChangesetCommittedEvent(t SchemaState, e *schema.ChangesetCommittedEvent) (SchemaState, error) {
func handleChangesetCommittedEvent(ctx context.Context, t SchemaState, e *schema.ChangesetCommittedEvent) (SchemaState, error) {
changeset, ok := t.changesets[e.Key]
if !ok {
return SchemaState{}, fmt.Errorf("changeset %s not found", e.Key)
}
logger := log.FromContext(ctx)
changeset.State = schema.ChangesetStateCommitted
for _, depName := range changeset.Deployments {
logger.Infof("activating deployment %s", t.deployments[depName].GetRuntime().GetDeployment().Endpoint)
dep := t.deployments[depName]
t.activeDeployments[dep.Name] = depName
}
Expand Down
8 changes: 6 additions & 2 deletions backend/schemaservice/schemaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Requ
if err != nil {
return nil, fmt.Errorf("could not parse event: %w", err)
}
_, err = view.ApplyEvent(event)
_, err = view.ApplyEvent(ctx, event)
if err != nil {
return nil, fmt.Errorf("could not apply event: %w", err)
}
Expand Down Expand Up @@ -304,11 +304,15 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
logger.Errorf(err, "Deployment not found: %s", event.Key)
continue
}
changeset := ""
if event.Changeset != nil {
changeset = event.Changeset.String()
}
err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
Event: &ftlv1.PullSchemaResponse_DeploymentUpdated_{
DeploymentUpdated: &ftlv1.PullSchemaResponse_DeploymentUpdated{
// TODO: include changeset info
Changeset: nil,
Changeset: &changeset,
Schema: dep.ToProto(),
},
},
Expand Down
2 changes: 1 addition & 1 deletion backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *schemaStateMachine) Publish(msg schema.Event) error {
c.lock.Lock()
defer c.lock.Unlock()
var err error
c.state, err = c.state.ApplyEvent(msg)
c.state, err = c.state.ApplyEvent(c.runningCtx, msg)
if err != nil {
return fmt.Errorf("update: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable
}

func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSource) {
for range channels.IterContext(ctx, changes.Events()) {
for event := range channels.IterContext(ctx, changes.Events()) {
old := r.routes.Load()
log.FromContext(ctx).Infof("Updating routes %T%v", event, event)
routes := extractRoutes(ctx, changes.CanonicalView())
for module, rd := range old.moduleToDeployment {
if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] {
Expand Down
6 changes: 6 additions & 0 deletions internal/schema/schemaeventsource/schemaeventsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ func (e EventSource) Publish(event Event) {
e.view.Store(clone)

case EventUpsert:
ep := ""
if event.Module.Runtime != nil && event.Module.Runtime.Deployment != nil {
ep = event.Module.Runtime.Deployment.Endpoint
}
println("EventUpsert: ", ep)
clone := reflect.DeepCopy(e.CanonicalView())
changeset := reflect.DeepCopy(e.ActiveChangeset())
var modules []*schema.Module
Expand All @@ -283,6 +288,7 @@ func (e EventSource) Publish(event Event) {
}
if _, ok := event.Changeset.Get(); ok {
changeset.MustGet().Modules = modules
println("EventUpsert CS: ", ep)
} else {
clone.Modules = modules
}
Expand Down

0 comments on commit b5f598b

Please sign in to comment.