diff --git a/backend/provisioner/deployment.go b/backend/provisioner/deployment.go index 064a862434..34fc27a140 100644 --- a/backend/provisioner/deployment.go +++ b/backend/provisioner/deployment.go @@ -106,7 +106,7 @@ func (t *Task) Progress(ctx context.Context) error { if err != nil { return fmt.Errorf("schema server failed to handle provisioning event: %w", err) } - _, err = t.deployment.DeploymentState.ApplyEvent(event) + _, err = t.deployment.DeploymentState.ApplyEvent(ctx, event) if err != nil { return fmt.Errorf("failed to apply event: %w", err) } diff --git a/backend/provisioner/registry.go b/backend/provisioner/registry.go index 00a77f2138..a13dad3cb0 100644 --- a/backend/provisioner/registry.go +++ b/backend/provisioner/registry.go @@ -124,7 +124,7 @@ func (reg *ProvisionerRegistry) CreateDeployment(ctx context.Context, desiredMod module := desiredModule.GetName() state := schemaservice.NewSchemaState() - _, err := state.ApplyEvent(&schema.ProvisioningCreatedEvent{ + _, err := state.ApplyEvent(ctx, &schema.ProvisioningCreatedEvent{ DesiredModule: desiredModule, }) if err != nil { diff --git a/backend/schemaservice/eventextractor.go b/backend/schemaservice/eventextractor.go index 65a2a834e5..ef3f29fd06 100644 --- a/backend/schemaservice/eventextractor.go +++ b/backend/schemaservice/eventextractor.go @@ -25,13 +25,26 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[schema.E previousAllChangesets := previous.GetChangesets() allChangesets := maps.Values(current.GetChangesets()) - newDeployments := map[key.Deployment]bool{} + handledDeployments := map[key.Deployment]bool{} slices.SortFunc(allChangesets, func(a, b *changesetDetails) int { return a.CreatedAt.Compare(b.CreatedAt) }) for _, changeset := range allChangesets { pc, ok := previousAllChangesets[changeset.Key] if ok { + for _, key := range changeset.Deployments { + pd, ok := previous.deployments[key] + deployment := current.deployments[key] + if ok && !pd.Equals(deployment) { + handledDeployments[key] = true + // TODO: this seems super inefficient, we should not need to do equality checks on every deployment + events = append(events, &schema.DeploymentSchemaUpdatedEvent{ + Key: key, + Schema: deployment, + Changeset: &changeset.Key, + }) + } + } // Commit final state of changeset if changeset.State == schema.ChangesetStateCommitted && pc.State != schema.ChangesetStateCommitted { events = append(events, &schema.ChangesetCommittedEvent{ @@ -43,28 +56,26 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[schema.E Error: changeset.Error, }) } - continue - } - // New changeset and associated modules - events = append(events, &schema.ChangesetCreatedEvent{ - Changeset: hydrateChangeset(¤t, changeset), - }) - // Find new deployments from the changeset - for _, deployment := range changeset.Deployments { - // changeset is always a new deployment - events = append(events, &schema.DeploymentCreatedEvent{ - Key: deployment, - Schema: current.deployments[deployment], - Changeset: &changeset.Key, + } else { + // New changeset and associated modules + events = append(events, &schema.ChangesetCreatedEvent{ + Changeset: hydrateChangeset(¤t, changeset), }) - newDeployments[deployment] = true + // Find new deployments from the changeset + for _, deployment := range changeset.Deployments { + // changeset is always a new deployment + events = append(events, &schema.DeploymentCreatedEvent{ + Key: deployment, + Schema: current.deployments[deployment], + Changeset: &changeset.Key, + }) + handledDeployments[deployment] = true + } } - continue - } for key, deployment := range current.GetAllActiveDeployments() { - if newDeployments[key] { + if handledDeployments[key] { // Already handled in the changeset continue } diff --git a/backend/schemaservice/events.go b/backend/schemaservice/events.go index effca10da2..df4a181e5e 100644 --- a/backend/schemaservice/events.go +++ b/backend/schemaservice/events.go @@ -1,6 +1,7 @@ package schemaservice import ( + "context" "fmt" "github.com/alecthomas/types/optional" @@ -8,12 +9,15 @@ import ( "github.com/block/ftl/common/schema" "github.com/block/ftl/common/slices" "github.com/block/ftl/internal/key" + "github.com/block/ftl/internal/log" ) // TODO: these should be event methods once we can move them to this package // ApplyEvent applies an event to the schema state -func (r SchemaState) ApplyEvent(event schema.Event) (SchemaState, error) { +func (r SchemaState) ApplyEvent(ctx context.Context, event schema.Event) (SchemaState, error) { + logger := log.FromContext(ctx) + logger.Infof("applying event: %T%v", event, event) switch e := event.(type) { case *schema.DeploymentCreatedEvent: return handleDeploymentCreatedEvent(r, e) @@ -38,7 +42,7 @@ func (r SchemaState) ApplyEvent(event schema.Event) (SchemaState, error) { case *schema.ChangesetCreatedEvent: return handleChangesetCreatedEvent(r, e) case *schema.ChangesetCommittedEvent: - return handleChangesetCommittedEvent(r, e) + return handleChangesetCommittedEvent(ctx, r, e) case *schema.ChangesetFailedEvent: return handleChangesetFailedEvent(r, e) default: @@ -220,13 +224,15 @@ func handleChangesetCreatedEvent(t SchemaState, e *schema.ChangesetCreatedEvent) return t, nil } -func handleChangesetCommittedEvent(t SchemaState, e *schema.ChangesetCommittedEvent) (SchemaState, error) { +func handleChangesetCommittedEvent(ctx context.Context, t SchemaState, e *schema.ChangesetCommittedEvent) (SchemaState, error) { changeset, ok := t.changesets[e.Key] if !ok { return SchemaState{}, fmt.Errorf("changeset %s not found", e.Key) } + logger := log.FromContext(ctx) changeset.State = schema.ChangesetStateCommitted for _, depName := range changeset.Deployments { + logger.Infof("activating deployment %s", t.deployments[depName].GetRuntime().GetDeployment().Endpoint) dep := t.deployments[depName] t.activeDeployments[dep.Name] = depName } diff --git a/backend/schemaservice/schemaservice.go b/backend/schemaservice/schemaservice.go index 1433ccf5cf..1008fe6229 100644 --- a/backend/schemaservice/schemaservice.go +++ b/backend/schemaservice/schemaservice.go @@ -109,7 +109,7 @@ func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Requ if err != nil { return nil, fmt.Errorf("could not parse event: %w", err) } - _, err = view.ApplyEvent(event) + _, err = view.ApplyEvent(ctx, event) if err != nil { return nil, fmt.Errorf("could not apply event: %w", err) } @@ -304,11 +304,15 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon logger.Errorf(err, "Deployment not found: %s", event.Key) continue } + changeset := "" + if event.Changeset != nil { + changeset = event.Changeset.String() + } err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert Event: &ftlv1.PullSchemaResponse_DeploymentUpdated_{ DeploymentUpdated: &ftlv1.PullSchemaResponse_DeploymentUpdated{ // TODO: include changeset info - Changeset: nil, + Changeset: &changeset, Schema: dep.ToProto(), }, }, diff --git a/backend/schemaservice/state.go b/backend/schemaservice/state.go index 6b2c04d009..298880a175 100644 --- a/backend/schemaservice/state.go +++ b/backend/schemaservice/state.go @@ -136,7 +136,7 @@ func (c *schemaStateMachine) Publish(msg schema.Event) error { c.lock.Lock() defer c.lock.Unlock() var err error - c.state, err = c.state.ApplyEvent(msg) + c.state, err = c.state.ApplyEvent(c.runningCtx, msg) if err != nil { return fmt.Errorf("update: %w", err) } diff --git a/internal/routing/routing.go b/internal/routing/routing.go index 61ca7fc6b7..16fcae345b 100644 --- a/internal/routing/routing.go +++ b/internal/routing/routing.go @@ -37,8 +37,9 @@ func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable } func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSource) { - for range channels.IterContext(ctx, changes.Events()) { + for event := range channels.IterContext(ctx, changes.Events()) { old := r.routes.Load() + log.FromContext(ctx).Infof("Updating routes %T%v", event, event) routes := extractRoutes(ctx, changes.CanonicalView()) for module, rd := range old.moduleToDeployment { if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] { diff --git a/internal/schema/schemaeventsource/schemaeventsource.go b/internal/schema/schemaeventsource/schemaeventsource.go index 4968dcc06d..0468c5cab9 100644 --- a/internal/schema/schemaeventsource/schemaeventsource.go +++ b/internal/schema/schemaeventsource/schemaeventsource.go @@ -263,6 +263,11 @@ func (e EventSource) Publish(event Event) { e.view.Store(clone) case EventUpsert: + ep := "" + if event.Module.Runtime != nil && event.Module.Runtime.Deployment != nil { + ep = event.Module.Runtime.Deployment.Endpoint + } + println("EventUpsert: ", ep) clone := reflect.DeepCopy(e.CanonicalView()) changeset := reflect.DeepCopy(e.ActiveChangeset()) var modules []*schema.Module @@ -283,6 +288,7 @@ func (e EventSource) Publish(event Event) { } if _, ok := event.Changeset.Get(); ok { changeset.MustGet().Modules = modules + println("EventUpsert CS: ", ep) } else { clone.Modules = modules }