diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 4bf177b..1041196 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -4,21 +4,69 @@ import ( "context" "fmt" "log" + "strconv" + "strings" + "time" "go.opentelemetry.io/otel" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" - custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel" "github.com/instill-ai/x/temporal" "github.com/instill-ai/x/zapadapter" "github.com/instill-ai/mgmt-backend/config" "github.com/instill-ai/mgmt-backend/pkg/logger" + custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel" mgmtWorker "github.com/instill-ai/mgmt-backend/pkg/worker" ) +const namespace = "mgmt-backend" + +func initTemporalNamespace(ctx context.Context, client client.Client) { + logger, _ := logger.GetZapLogger(ctx) + + resp, err := client.WorkflowService().ListNamespaces(ctx, &workflowservice.ListNamespacesRequest{}) + if err != nil { + logger.Fatal(fmt.Sprintf("Unable to list namespaces: %s", err)) + } + + found := false + for _, n := range resp.GetNamespaces() { + if n.NamespaceInfo.Name == namespace { + found = true + } + } + + if !found { + if _, err := client.WorkflowService().RegisterNamespace(ctx, + &workflowservice.RegisterNamespaceRequest{ + Namespace: namespace, + WorkflowExecutionRetentionPeriod: func() *time.Duration { + // Check if the string ends with "d" for day. + s := config.Config.Temporal.Retention + if strings.HasSuffix(s, "d") { + // Parse the number of days. + days, err := strconv.Atoi(s[:len(s)-1]) + if err != nil { + logger.Fatal(fmt.Sprintf("Unable to parse retention period in day: %s", err)) + } + // Convert days to hours and then to a duration. + t := time.Hour * 24 * time.Duration(days) + return &t + } + logger.Fatal(fmt.Sprintf("Unable to parse retention period in day: %s", err)) + return nil + }(), + }, + ); err != nil { + logger.Fatal(fmt.Sprintf("Unable to register namespace: %s", err)) + } + } +} + func main() { if err := config.Init(); err != nil { @@ -78,6 +126,8 @@ func main() { } defer temporalClient.Close() + initTemporalNamespace(ctx, temporalClient) + w := worker.New(temporalClient, mgmtWorker.TaskQueue, worker.Options{ MaxConcurrentActivityExecutionSize: 2, }) diff --git a/config/config.go b/config/config.go index 39f8c9d..6c83b28 100644 --- a/config/config.go +++ b/config/config.go @@ -52,6 +52,7 @@ type ServerConfig struct { type TemporalConfig struct { HostPort string `koanf:"hostport"` Namespace string `koanf:"namespace"` + Retention string `koanf:"retention"` Ca string `koanf:"ca"` Cert string `koanf:"cert"` Key string `koanf:"key"` diff --git a/config/config.yaml b/config/config.yaml index 3c8e300..6591922 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -54,6 +54,7 @@ openfga: temporal: hostport: temporal:7233 namespace: mgmt-backend + retention: 1d ca: cert: key: diff --git a/go.mod b/go.mod index 6f617c9..457a592 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/sdk/metric v0.39.0 go.opentelemetry.io/otel/trace v1.16.0 + go.temporal.io/api v1.16.0 go.temporal.io/sdk v1.21.0 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.14.0 @@ -106,7 +107,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect - go.temporal.io/api v1.16.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/mod v0.8.0 // indirect