Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Jan 30, 2025
1 parent d02c316 commit dda4047
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 72 deletions.
19 changes: 0 additions & 19 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,25 +353,6 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return connect.NewResponse(resp), nil
}

//
//func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.UpdateDeployRequest]) (response *connect.Response[ftlv1.UpdateDeployResponse], err error) {
// deploymentKey, err := key.ParseDeploymentKey(req.Msg.DeploymentKey)
// if err != nil {
// return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
// }
//
// logger := s.getDeploymentLogger(ctx, deploymentKey)
// logger.Debugf("Update deployment for: %s", deploymentKey)
// if req.Msg.MinReplicas != nil {
// err = s.setDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas))
// if err != nil {
// logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
// return nil, fmt.Errorf("could not set deployment replicas: %w", err)
// }
// }
// return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
//}

func (s *Service) setDeploymentReplicas(ctx context.Context, key key.Deployment, minReplicas int) (err error) {
deployments, err := s.schemaClient.GetDeployments(ctx, connect.NewRequest(&ftlv1.GetDeploymentsRequest{}))

Expand Down
1 change: 1 addition & 0 deletions backend/controller/sql/testdata/go/database/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions backend/controller/sql/testdata/go/mysql/queries.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change s
}
logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), change.Module.Name)
cronJobs[change.Module.Name] = moduleJobs
default:
}
return nil
}
Expand Down
2 changes: 0 additions & 2 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ func (s *Service) ProvisionChangeset(ctx context.Context, req *schema.Changeset)

logger.Debugf("Finished deployment for module %s", moduleName)

//deploymentKey := deployment.Module.Runtime.Deployment.DeploymentKey

}

//TODO: huge hack, this needs be be changed as it means all provisioning has to happen in a single goroutine
Expand Down
8 changes: 4 additions & 4 deletions backend/schemaservice/changesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import (
"github.com/block/ftl/internal/key"
)

func (r *SchemaState) ActiveChangeset() optional.Option[*changesetDetails] {
func (r *SchemaState) ActiveChangeset() optional.Option[*ChangesetDetails] {
for _, changeset := range r.changesets {
if changeset.State == schema.ChangesetStateProvisioning {
return optional.Some(changeset)
}
}
return optional.None[*changesetDetails]()
return optional.None[*ChangesetDetails]()
}

func (r *SchemaState) GetChangeset(changeset key.Changeset) (*changesetDetails, error) {
func (r *SchemaState) GetChangeset(changeset key.Changeset) (*ChangesetDetails, error) {
c, ok := r.changesets[changeset]
if !ok {
return nil, fmt.Errorf("changeset %s not found", changeset)
}
return c, nil
}

func (r *SchemaState) GetChangesets() map[key.Changeset]*changesetDetails {
func (r *SchemaState) GetChangesets() map[key.Changeset]*ChangesetDetails {
return r.changesets
}
2 changes: 1 addition & 1 deletion backend/schemaservice/eventextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[schema.E
previousAllChangesets := previous.GetChangesets()
allChangesets := maps.Values(current.GetChangesets())
handledDeployments := map[key.Deployment]bool{}
slices.SortFunc(allChangesets, func(a, b *changesetDetails) int {
slices.SortFunc(allChangesets, func(a, b *ChangesetDetails) int {
return a.CreatedAt.Compare(b.CreatedAt)
})
for _, changeset := range allChangesets {
Expand Down
4 changes: 1 addition & 3 deletions backend/schemaservice/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (

// ApplyEvent applies an event to the schema state
func (r SchemaState) ApplyEvent(ctx context.Context, event schema.Event) (SchemaState, error) {
logger := log.FromContext(ctx)
logger.Infof("applying event: %T%v", event, event)
switch e := event.(type) {
case *schema.DeploymentCreatedEvent:
return handleDeploymentCreatedEvent(r, e)
Expand Down Expand Up @@ -214,7 +212,7 @@ func handleChangesetCreatedEvent(t SchemaState, e *schema.ChangesetCreatedEvent)
t.deployments[deploymentKey] = mod
t.provisioning[mod.Name] = deploymentKey
}
t.changesets[e.Changeset.Key] = &changesetDetails{
t.changesets[e.Changeset.Key] = &ChangesetDetails{
Key: e.Changeset.Key,
CreatedAt: e.Changeset.CreatedAt,
Deployments: deployments,
Expand Down
10 changes: 5 additions & 5 deletions backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ type SchemaState struct {
deployments map[key.Deployment]*schema.Module
// currently active deployments for a given module name. This represents the canonical state of the schema.
activeDeployments map[string]key.Deployment
changesets map[key.Changeset]*changesetDetails
changesets map[key.Changeset]*ChangesetDetails
// TODO: consider removing committed changesets. Return success if asked about a missing changeset? Or keep the shell of changesets

provisioning map[string]key.Deployment
}

type changesetDetails struct {
type ChangesetDetails struct {
Key key.Changeset
CreatedAt time.Time
Deployments []key.Deployment
Expand All @@ -40,7 +40,7 @@ func NewSchemaState() SchemaState {
return SchemaState{
deployments: map[key.Deployment]*schema.Module{},
activeDeployments: map[string]key.Deployment{},
changesets: map[key.Changeset]*changesetDetails{},
changesets: map[key.Changeset]*ChangesetDetails{},
provisioning: map[string]key.Deployment{},
}
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *SchemaState) GetDeployments() map[key.Deployment]*schema.Module {
return r.deployments
}

// GetActiveDeployments returns all active deployments (excluding those in changesets).
// GetCanonicalDeployments returns all active deployments (excluding those in changesets).
func (r *SchemaState) GetCanonicalDeployments() map[key.Deployment]*schema.Module {
deployments := map[key.Deployment]*schema.Module{}
for _, dep := range r.activeDeployments {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *schemaStateMachine) Subscribe(ctx context.Context) (<-chan struct{}, er
return c.notifier.Subscribe(), nil
}

func hydrateChangeset(current *SchemaState, changeset *changesetDetails) *schema.Changeset {
func hydrateChangeset(current *SchemaState, changeset *ChangesetDetails) *schema.Changeset {
changesetModules := make([]*schema.Module, len(changeset.Deployments))
for i, deployment := range changeset.Deployments {
changesetModules[i] = current.deployments[deployment]
Expand Down
19 changes: 15 additions & 4 deletions cmd/ftl/cmd_kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package main

import (
"context"
"fmt"

"connectrpc.com/connect"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/internal/key"
)

Expand All @@ -13,9 +18,15 @@ type killCmd struct {

func (k *killCmd) Run(ctx context.Context, client ftlv1connect.SchemaServiceClient) error {
//TODO: implement this as a changeset
//_, err := client.UpdateDeploy(ctx, connect.NewRequest(&ftlv1.UpdateDeployRequest{DeploymentKey: k.Deployment.String()}))
//if err != nil {
// return err
//}
_, err := client.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{
Deployment: k.Deployment.String(),
Event: &schemapb.ModuleRuntimeEvent{
DeploymentKey: k.Deployment.String(),
Scaling: &schemapb.ModuleRuntimeScaling{MinReplicas: 0},
},
}))
if err != nil {
return fmt.Errorf("failed to kill deployment: %w", err)
}
return nil
}
24 changes: 16 additions & 8 deletions cmd/ftl/cmd_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package main

import (
"context"
"fmt"

"connectrpc.com/connect"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/internal/key"
)

Expand All @@ -12,14 +17,17 @@ type updateCmd struct {
Deployment key.Deployment `arg:"" help:"Deployment to update."`
}

func (u *updateCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceClient) error {
func (u *updateCmd) Run(ctx context.Context, client ftlv1connect.SchemaServiceClient) error {
//TODO: implement this as a changeset
//_, err := client.UpdateDeploy(ctx, connect.NewRequest(&ftlv1.UpdateDeployRequest{
// DeploymentKey: u.Deployment.String(),
// MinReplicas: &u.Replicas,
//}))
//if err != nil {
// return err
//}
_, err := client.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{
Deployment: u.Deployment.String(),
Event: &schemapb.ModuleRuntimeEvent{
DeploymentKey: u.Deployment.String(),
Scaling: &schemapb.ModuleRuntimeScaling{MinReplicas: u.Replicas},
},
}))
if err != nil {
return fmt.Errorf("failed to update deployment: %w", err)
}
return nil
}
1 change: 0 additions & 1 deletion ftl-provisioner-config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
plugins = [
{ id = "cloudformation", resources = ["postgres"] },
{ id = "controller", resources = ["module"] },
{ id = "kubernetes", resources = ["runner"] },
]
27 changes: 19 additions & 8 deletions internal/buildengine/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type deploymentArtefact struct {
localPath string
}

// TODO: rename
type DeployClient interface {
GetArtefactDiffs(ctx context.Context, req *connect.Request[ftlv1.GetArtefactDiffsRequest]) (*connect.Response[ftlv1.GetArtefactDiffsResponse], error)
UploadArtefact(ctx context.Context, req *connect.Request[ftlv1.UploadArtefactRequest]) (*connect.Response[ftlv1.UploadArtefactResponse], error)
Expand All @@ -38,8 +37,9 @@ type DeployClient interface {

type SchemaServiceClient interface {
CreateChangeset(ctx context.Context, req *connect.Request[ftlv1.CreateChangesetRequest]) (*connect.Response[ftlv1.CreateChangesetResponse], error)
PullSchema(context.Context, *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error)
UpdateDeploymentRuntime(ctx context.Context, req *connect.Request[ftlv1.UpdateDeploymentRuntimeRequest]) (*connect.Response[ftlv1.UpdateDeploymentRuntimeResponse], error)
}

// Deploy a module to the FTL controller with the given number of replicas. Optionally wait for the deployment to become ready.
Expand All @@ -58,7 +58,7 @@ func Deploy(ctx context.Context, projectConfig projectconfig.Config, modules []M
})
}
if err := uploadGroup.Wait(); err != nil {
return err
return fmt.Errorf("failed to upload artefacts: %w", err)
}
close(moduleSchemas)
collectedSchemas := []*schemapb.Module{}
Expand All @@ -80,6 +80,9 @@ func Deploy(ctx context.Context, projectConfig projectconfig.Config, modules []M

ctx, closeStream := context.WithCancelCause(ctx)
stream, err := schemaserviceClient.PullSchema(ctx, connect.NewRequest(&ftlv1.PullSchemaRequest{}))
if err != nil {
return fmt.Errorf("failed to pull schema: %w", err)
}
defer closeStream(fmt.Errorf("function is complete"))
for {
if !stream.Receive() {
Expand Down Expand Up @@ -142,14 +145,14 @@ func uploadArtefacts(ctx context.Context, projectConfig projectconfig.Config, mo
file := filesByHash[missing]
content, err := os.ReadFile(file.localPath)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to read file %w", err)
}
logger.Debugf("Uploading %s", relToCWD(file.localPath))
resp, err := client.UploadArtefact(ctx, connect.NewRequest(&ftlv1.UploadArtefactRequest{
Content: content,
}))
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to upload artefact: %w", err)
}
logger.Debugf("Uploaded %s as %s:%s", relToCWD(file.localPath), sha256.FromBytes(resp.Msg.Digest), file.Path)
}
Expand All @@ -168,7 +171,7 @@ func uploadArtefacts(ctx context.Context, projectConfig projectconfig.Config, mo
return moduleSchema, nil
}

func terminateModuleDeployment(ctx context.Context, client DeployClient, module string) error {
func terminateModuleDeployment(ctx context.Context, client DeployClient, schemaClient SchemaServiceClient, module string) error {
logger := log.FromContext(ctx).Module(module).Scope("terminate")

status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{}))
Expand All @@ -189,8 +192,16 @@ func terminateModuleDeployment(ctx context.Context, client DeployClient, module
}

logger.Infof("Terminating deployment %s", key)
//_, err = client.UpdateDeploy(ctx, connect.NewRequest(&ftlv1.UpdateDeployRequest{DeploymentKey: key}))
//return err
_, err = schemaClient.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{
Deployment: key,
Event: &schemapb.ModuleRuntimeEvent{
DeploymentKey: key,
Scaling: &schemapb.ModuleRuntimeScaling{MinReplicas: 0},
},
}))
if err != nil {
return fmt.Errorf("failed to kill deployment: %w", err)
}
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
_ = e.BuildAndDeploy(ctx, 1, true, config.Module) //nolint:errcheck
}
case watch.WatchEventModuleRemoved:
err := terminateModuleDeployment(ctx, e.deployClient, event.Config.Module)
err := terminateModuleDeployment(ctx, e.deployClient, e.schemaServiceClient, event.Config.Module)
if err != nil {
logger.Errorf(err, "terminate %s failed", event.Config.Module)
}
Expand Down Expand Up @@ -506,6 +506,8 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
logger.Infof("%s's schema changed; processing %s", event.Module.Name, strings.Join(dependentModuleNames, ", "))
_ = e.BuildAndDeploy(ctx, 1, true, dependentModuleNames...) //nolint:errcheck
}
default:

}

case event := <-e.rebuildEvents:
Expand Down Expand Up @@ -827,7 +829,7 @@ func (e *Engine) BuildAndDeploy(ctx context.Context, replicas int32, waitForDepl
// Wait for all build attempts to complete
buildErr := buildGroup.Wait()
if buildErr != nil {
return buildErr
return fmt.Errorf("build failed: %w", buildErr)
}

err = Deploy(ctx, e.projectConfig, modulesToDeploy, replicas, waitForDeployOnline, e.deployClient, e.schemaServiceClient)
Expand Down
3 changes: 1 addition & 2 deletions internal/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable
}

func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSource) {
for event := range channels.IterContext(ctx, changes.Events()) {
for range channels.IterContext(ctx, changes.Events()) {
old := r.routes.Load()
log.FromContext(ctx).Infof("Updating routes %T%v", event, event)
routes := extractRoutes(ctx, changes.CanonicalView())
for module, rd := range old.moduleToDeployment {
if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] {
Expand Down
Loading

0 comments on commit dda4047

Please sign in to comment.