Skip to content

Commit

Permalink
chore(worker): create temporal namespace at launch
Browse files Browse the repository at this point in the history
  • Loading branch information
pinglin committed Feb 14, 2024
1 parent 59981c6 commit 538221c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
53 changes: 52 additions & 1 deletion cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,70 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

"go.opentelemetry.io/otel"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel"
"github.com/instill-ai/x/temporal"
"github.com/instill-ai/x/zapadapter"

"github.com/instill-ai/mgmt-backend/config"
"github.com/instill-ai/mgmt-backend/pkg/logger"

custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel"
mgmtWorker "github.com/instill-ai/mgmt-backend/pkg/worker"
)

const namespace = "mgmt-backend"

func initTemporalNamespace(ctx context.Context, client client.Client) {
logger, _ := logger.GetZapLogger(ctx)

resp, err := client.WorkflowService().ListNamespaces(ctx, &workflowservice.ListNamespacesRequest{})
if err != nil {
logger.Fatal(fmt.Sprintf("Unable to list namespaces: %s", err))
}

found := false
for _, n := range resp.GetNamespaces() {
fmt.Println(n.NamespaceInfo.Name)
if n.NamespaceInfo.Name == namespace {
found = true
}
}

if !found {
if _, err := client.WorkflowService().RegisterNamespace(ctx,
&workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
WorkflowExecutionRetentionPeriod: func() *time.Duration {
// Check if the string ends with "d" for day.
s := config.Config.Temporal.Retention
if strings.HasSuffix(s, "d") {
// Parse the number of days.
days, err := strconv.Atoi(s[:len(s)-1])
if err != nil {
logger.Fatal(fmt.Sprintf("Unable to parse retention period in day: %s", err))
}
// Convert days to hours and then to a duration.
t := time.Hour * 24 * time.Duration(days)
return &t
}
logger.Fatal(fmt.Sprintf("Unable to parse retention period in day: %s", err))
return nil
}(),
},
); err != nil {
logger.Fatal(fmt.Sprintf("Unable to register namespace: %s", err))
}
}
}

func main() {

if err := config.Init(); err != nil {
Expand Down Expand Up @@ -78,6 +127,8 @@ func main() {
}
defer temporalClient.Close()

initTemporalNamespace(ctx, temporalClient)

w := worker.New(temporalClient, mgmtWorker.TaskQueue, worker.Options{
MaxConcurrentActivityExecutionSize: 2,
})
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type ServerConfig struct {
type TemporalConfig struct {
HostPort string `koanf:"hostport"`
Namespace string `koanf:"namespace"`
Retention string `koanf:"retention"`
Ca string `koanf:"ca"`
Cert string `koanf:"cert"`
Key string `koanf:"key"`
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ openfga:
temporal:
hostport: temporal:7233
namespace: mgmt-backend
retention: 1d
ca:
cert:
key:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.opentelemetry.io/otel/trace v1.16.0
go.temporal.io/api v1.16.0
go.temporal.io/sdk v1.21.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.14.0
Expand Down Expand Up @@ -106,7 +107,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.temporal.io/api v1.16.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.8.0 // indirect
Expand Down

0 comments on commit 538221c

Please sign in to comment.