Skip to content

Commit

Permalink
refactor: move handle namespace to reconciler (#48)
Browse files Browse the repository at this point in the history
* refactor: use namespace instead of filter

* refactor: remove unused value in runtime
  • Loading branch information
siyul-park authored Dec 12, 2023
1 parent 8a069c9 commit 8d6e141
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 28 deletions.
34 changes: 20 additions & 14 deletions pkg/loader/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,34 @@ import (
"context"
"sync"

"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/storage"
)

// ReconcilerConfig holds the configuration for the Reconciler.
type ReconcilerConfig struct {
Storage *storage.Storage // Storage is the storage used by the Reconciler.
Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table.
Filter *storage.Filter // Filter is the filter for tracking changes to the scheme.Spec.
Namespace string // // Namespace is the namespace for the Reconciler.
Storage *storage.Storage // Storage is the storage used by the Reconciler.
Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table.
}

// Reconciler keeps the symbol.Table up to date by tracking changes to scheme.Spec.
type Reconciler struct {
storage *storage.Storage
loader *Loader
filter *storage.Filter
stream *storage.Stream
done chan struct{}
mu sync.Mutex
namespace string
storage *storage.Storage
loader *Loader
stream *storage.Stream
done chan struct{}
mu sync.Mutex
}

// NewReconciler creates a new Reconciler with the given configuration.
func NewReconciler(config ReconcilerConfig) *Reconciler {
return &Reconciler{
storage: config.Storage,
loader: config.Loader,
filter: config.Filter,
done: make(chan struct{}),
namespace: config.Namespace,
storage: config.Storage,
loader: config.Loader,
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -99,7 +100,12 @@ func (r *Reconciler) watch(ctx context.Context) (*storage.Stream, error) {
if r.stream != nil {
return r.stream, nil
}
s, err := r.storage.Watch(ctx, r.filter)

var filter *storage.Filter
if r.namespace != "" {
filter = storage.Where[string](scheme.KeyNamespace).EQ(r.namespace)
}
s, err := r.storage.Watch(ctx, filter)
if err != nil {
return nil, err
}
Expand Down
17 changes: 3 additions & 14 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ type Config struct {

// Runtime represents an execution environment for running Flows.
type Runtime struct {
namespace string
hooks *hook.Hook
scheme *scheme.Scheme
storage *storage.Storage
table *symbol.Table
loader *loader.Loader
Expand Down Expand Up @@ -66,21 +63,13 @@ func New(ctx context.Context, config Config) (*Runtime, error) {
return nil, err
}

var filter *storage.Filter
if config.Namespace != "" {
filter = storage.Where[string](scheme.KeyNamespace).EQ(config.Namespace)
}

rc := loader.NewReconciler(loader.ReconcilerConfig{
Storage: st,
Loader: ld,
Filter: filter,
Namespace: config.Namespace,
Storage: st,
Loader: ld,
})

return &Runtime{
namespace: config.Namespace,
hooks: config.Hooks,
scheme: config.Scheme,
storage: st,
table: tb,
loader: ld,
Expand Down

0 comments on commit 8d6e141

Please sign in to comment.