Skip to content

Commit

Permalink
Add an in-memory data store (radius-project#7950)
Browse files Browse the repository at this point in the history
# Description

Implements an in-memory data store

This change implements an in-memory version of our data store interface.
This is provided for testing and development purposes.

The in-memory store uses our data store compliance tests so it will
behave consistently with other implementations. This will be simpler to
use than mocks, which we use frequently in tests. This will be faster to
use than the ETC.d implementation which we frequently use in integration
tests.

**Note: this pull request depends on
radius-project#7949 and will be rebased
once that change is merged.**

## Type of change

- This pull request is a minor refactor, code cleanup, test improvement,
or other maintenance task and doesn't change the functionality of Radius
(issue link optional).

Signed-off-by: Ryan Nowak <nowakra@gmail.com>
  • Loading branch information
rynowak authored Sep 23, 2024
1 parent 78751aa commit 2df79ec
Show file tree
Hide file tree
Showing 11 changed files with 669 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pkg/ucp/dataprovider/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
ucpv1alpha1 "github.com/radius-project/radius/pkg/ucp/store/apiserverstore/api/ucp.dev/v1alpha1"
"github.com/radius-project/radius/pkg/ucp/store/cosmosdb"
"github.com/radius-project/radius/pkg/ucp/store/etcdstore"
"github.com/radius-project/radius/pkg/ucp/store/inmemory"
"k8s.io/apimachinery/pkg/runtime"

runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -38,6 +39,7 @@ var storageClientFactory = map[StorageProviderType]storageFactoryFunc{
TypeAPIServer: initAPIServerClient,
TypeCosmosDB: initCosmosDBClient,
TypeETCD: InitETCDClient,
TypeInMemory: initInMemoryClient,
}

func initAPIServerClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) {
Expand Down Expand Up @@ -117,3 +119,8 @@ func InitETCDClient(ctx context.Context, opt StorageProviderOptions, _ string) (
etcdClient := etcdstore.NewETCDClient(client)
return etcdClient, nil
}

// initInMemoryClient creates a new in-memory store client.
func initInMemoryClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) {
return inmemory.NewClient(), nil
}
6 changes: 6 additions & 0 deletions pkg/ucp/dataprovider/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type StorageProviderOptions struct {

// ETCD configures options for the etcd store. Will be ignored if another store is configured.
ETCD ETCDOptions `yaml:"etcd,omitempty"`

// InMemory configures options for the in-memory store. Will be ignored if another store is configured.
InMemory InMemoryOptions `yaml:"inmemory,omitempty"`
}

// APIServerOptions represents options for the configuring the Kubernetes APIServer store.
Expand Down Expand Up @@ -65,3 +68,6 @@ type ETCDOptions struct {
// We need a way to share state between the etcd service and the things that want to consume it. This is that.
Client *hosting.AsyncValue[etcdclient.Client] `yaml:"-"`
}

// InMemoryOptions represents options for the in-memory store.
type InMemoryOptions struct{}
3 changes: 3 additions & 0 deletions pkg/ucp/dataprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (

// TypeETCD represents the etcd provider.
TypeETCD StorageProviderType = "etcd"

// TypeInMemory represents the in-memory provider.
TypeInMemory StorageProviderType = "inmemory"
)

//go:generate mockgen -typed -destination=./mock_datastorage_provider.go -package=dataprovider -self_package github.com/radius-project/radius/pkg/ucp/dataprovider github.com/radius-project/radius/pkg/ucp/dataprovider DataStorageProvider
Expand Down
42 changes: 42 additions & 0 deletions pkg/ucp/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,52 @@ var fieldRegex = regexp.MustCompile(fmt.Sprintf(`^(%s)(\.%s)*$`, jsonPropertyPat

//go:generate mockgen -typed -destination=./mock_storageClient.go -package=store -self_package github.com/radius-project/radius/pkg/ucp/store github.com/radius-project/radius/pkg/ucp/store StorageClient

// StorageClient is the interface for persisting and querying resource data.
//
// The StorageClient is purpose-built to work with resource data and understands concepts like
// scopes, resource types, and resource ids. This is a higher level abstraction than a generic
// key-value store, but low-level enough to support multiple implementation strategies.
//
// The StorageClient provides a optimistic concurrency control using ETags. Callers that want
// to enforce OCC should provide the ETag value in the options when calling Save or Delete.
//
// The StorageClient may return the errors ErrNotFound, ErrInvalid, and ErrConcurrency.
//
// - Callers should handle ErrNotFound on Get, Save, and Delete operations.
// - Callers should handle ErrConcurrency when using ETags.
// - Callers should not handle ErrInvalid as it represents a programming error.
//
// When using ETags, the Save or Delete operation will fail with ErrConcurrency (rather than ErrNotFound)
// if the underlying resource has been deleted.
type StorageClient interface {
// Query executes a query against the data store and returns the results.
//
// Queries must provide a root scope and a resource type. Other fields are optional.
Query(ctx context.Context, query Query, options ...QueryOptions) (*ObjectQueryResult, error)

// Get retrieves a single resource from the data store by its resource id.
//
// Get will return ErrNotFound if the resource is not found.
Get(ctx context.Context, id string, options ...GetOptions) (*Object, error)

// Delete removes a single resource from the data store by its resource id.
//
// Delete will return ErrNotFound if the resource is not found.
// When providing an ETag, Delete will return ErrConcurrency if the resource has been
// modified OR deleted since the ETag was retrieved.
Delete(ctx context.Context, id string, options ...DeleteOptions) error

// Save persists a single resource to the data store. Same is a logical PUT
// operation and will either create a new entry or update the existing entry.
//
// Save operations must set the ID field of the obj parameter.
// The ETag field of the obj parameter is read-only and will be updated by the Save operation.
//
// Use the options to pass an ETag if you want to enforce optimistic concurrency control.
//
// Save will return ErrNotFound if the resource is not found.
// When providing an ETag, Save will return ErrConcurrency if the resource has been
// modified OR deleted since the ETag was retrieved.
Save(ctx context.Context, obj *Object, options ...SaveOptions) error
}

Expand Down
289 changes: 289 additions & 0 deletions pkg/ucp/store/inmemory/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
Copyright 2023 The Radius Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inmemory

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
"github.com/radius-project/radius/pkg/ucp/store/storeutil"
"github.com/radius-project/radius/pkg/ucp/util/etag"
"golang.org/x/exp/maps"
)

var _ store.StorageClient = (*Client)(nil)

// Client is an in-memory implementation of store.StorageClient.
type Client struct {
// mutex is used to synchronize access to the resources map.
mutex sync.Mutex

// resources is a map of resource IDs to their corresponding entries.
//
// The Get/Save/Delete methods will use the resource ID directly since they only access
// a single entry at a time.
//
// The Query method will iterate over all entries in the map to find the matching ones.
resources map[string]entry
}

// entry stores the commonly-used fields (extracted from the resource ID) for comparison in queries.
// This is provided for ease of debugging.
//
// We use the existing normalization logic to simplify comparisons:
//
// - Convert to lowercase
// - Add leading/trailing slashes.
//
// Here's an example:
//
// resource ID: "/planes/radius/local/resourceGroups/my-rg/providers/Applications.Test/testType1/my-resource/testType2/my-child-resource"
//
// The entry would be:
//
// rootScope: "/planes/radius/local/resourcegroups/my-rg/"
// resourceType: "/applications.test/testtype1/testtype2/"
// routingScope: "/applications.test/testtype1/my-resource/testtype2/my-child-resource/"
//
// All fields are compared case-insensitively.
type entry struct {
// obj stores the object data.
obj store.Object

// rootScope is the root scope of the resource ID.
rootScope string

// resourceType is the resource type of the resource ID.
resourceType string

// routingScope is the routing scope of the resource ID.
routingScope string
}

// NewClient creates a new in-memory store client.
func NewClient() *Client {
return &Client{
mutex: sync.Mutex{},
resources: map[string]entry{},
}
}

// Get implements store.StorageClient.
func (c *Client) Get(ctx context.Context, id string, options ...store.GetOptions) (*store.Object, error) {
if ctx == nil {
return nil, &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"}
}
parsed, err := resources.Parse(id)
if err != nil {
return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must be a valid resource id"}
}
if parsed.IsEmpty() {
return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must not be empty"}
}
if parsed.IsResourceCollection() || parsed.IsScopeCollection() {
return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must refer to a named resource, not a collection"}
}

normalized, err := storeutil.NormalizeResourceID(parsed)
if err != nil {
return nil, err
}

c.mutex.Lock()
defer c.mutex.Unlock()

entry, ok := c.resources[strings.ToLower(normalized.String())]
if !ok {
return nil, &store.ErrNotFound{ID: id}
}

// Make a defensive copy so users can't modify the data in the store.
copy, err := entry.obj.DeepCopy()
if err != nil {
return nil, err
}

return copy, nil
}

// Delete implements store.StorageClient.
func (c *Client) Delete(ctx context.Context, id string, options ...store.DeleteOptions) error {
if ctx == nil {
return &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"}
}
parsed, err := resources.Parse(id)
if err != nil {
return &store.ErrInvalid{Message: "invalid argument. 'id' must be a valid resource id"}
}
if parsed.IsEmpty() {
return &store.ErrInvalid{Message: "invalid argument. 'id' must not be empty"}
}
if parsed.IsResourceCollection() || parsed.IsScopeCollection() {
return &store.ErrInvalid{Message: "invalid argument. 'id' must refer to a named resource, not a collection"}
}

normalized, err := storeutil.NormalizeResourceID(parsed)
if err != nil {
return err
}

c.mutex.Lock()
defer c.mutex.Unlock()

config := store.NewDeleteConfig(options...)

entry, ok := c.resources[strings.ToLower(normalized.String())]
if !ok && config.ETag != "" {
return &store.ErrConcurrency{}
} else if !ok {
return &store.ErrNotFound{ID: id}
} else if config.ETag != "" && config.ETag != entry.obj.ETag {
return &store.ErrConcurrency{}
}

delete(c.resources, strings.ToLower(normalized.String()))

return nil
}

// Query implements store.StorageClient.
func (c *Client) Query(ctx context.Context, query store.Query, options ...store.QueryOptions) (*store.ObjectQueryResult, error) {
if ctx == nil {
return nil, &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"}
}

err := query.Validate()
if err != nil {
return nil, &store.ErrInvalid{Message: fmt.Sprintf("invalid argument. Query is invalid: %s", err.Error())}
}

c.mutex.Lock()
defer c.mutex.Unlock()

result := &store.ObjectQueryResult{}
for _, entry := range c.resources {
// Check root scope.
if query.ScopeRecursive && !strings.HasPrefix(entry.rootScope, storeutil.NormalizePart(query.RootScope)) {
continue
} else if !query.ScopeRecursive && entry.rootScope != storeutil.NormalizePart(query.RootScope) {
continue
}

// Check resource type.
resourceType, err := storeutil.NormalizeResourceType(query.ResourceType)
if err != nil {
return nil, err
}
if entry.resourceType != storeutil.NormalizePart(resourceType) {
continue
}

// Check routing scope prefix (optional).
if query.RoutingScopePrefix != "" && !strings.HasPrefix(entry.routingScope, storeutil.NormalizePart(query.RoutingScopePrefix)) {
continue
}

// Check filters (optional).
match, err := entry.obj.MatchesFilters(query.Filters)
if err != nil {
return nil, err
}
if !match {
continue
}

// Make a defensive copy so users can't modify the data in the store.
copy, err := entry.obj.DeepCopy()
if err != nil {
return nil, err
}

result.Items = append(result.Items, *copy)
}

return result, nil
}

// Save implements store.StorageClient.
func (c *Client) Save(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error {
if ctx == nil {
return &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"}
}
if obj == nil {
return &store.ErrInvalid{Message: "invalid argument. 'obj' is required"}
}

parsed, err := resources.Parse(obj.ID)
if err != nil {
return &store.ErrInvalid{Message: "invalid argument. 'obj.ID' must be a valid resource id"}
}

normalized, err := storeutil.NormalizeResourceID(parsed)
if err != nil {
return err
}

c.mutex.Lock()
defer c.mutex.Unlock()

config := store.NewSaveConfig(options...)

entry, ok := c.resources[strings.ToLower(normalized.String())]
if !ok && config.ETag != "" {
return &store.ErrConcurrency{}
} else if ok && config.ETag != "" && config.ETag != entry.obj.ETag {
return &store.ErrConcurrency{}
} else if !ok {
// New entry, initialize it.
entry.rootScope = storeutil.NormalizePart(normalized.RootScope())
entry.resourceType = storeutil.NormalizePart(normalized.Type())
entry.routingScope = storeutil.NormalizePart(normalized.RoutingScope())
}

raw, err := json.Marshal(obj.Data)
if err != nil {
return err
}

// Updated the ETag before copying. Callers are allowed to read the ETag after calling save.
obj.ETag = etag.New(raw)

// Make a defensive copy so users can't modify the data in the store.
copy, err := obj.DeepCopy()
if err != nil {
return err
}

entry.obj = *copy

c.resources[strings.ToLower(normalized.String())] = entry

return nil
}

// Clear can be used to clear all stored data.
func (c *Client) Clear() {
c.mutex.Lock()
defer c.mutex.Unlock()

maps.Clear(c.resources)
}
Loading

0 comments on commit 2df79ec

Please sign in to comment.