Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Jan 17, 2022
1 parent f042019 commit 51a176c
Show file tree
Hide file tree
Showing 7 changed files with 872 additions and 0 deletions.
33 changes: 33 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2

updates:
- package-ecosystem: gomod # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: daily
reviewers:
- "rustatian"
assignees:
- "rustatian"

- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: weekly
reviewers:
- "rustatian"
assignees:
- "rustatian"

- package-ecosystem: "docker"
directory: "/"
schedule:
interval: daily
reviewers:
- "rustatian"
assignees:
- "rustatian"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

# Dependency directories (remove the comment below to include it)
# vendor/
.idea
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module github.com/roadrunner-server/memcached/v2

go 1.17

require (
github.com/bradfitz/gomemcache v0.0.0-20220106215444-fb4bf637b56d
github.com/roadrunner-server/api/v2 v2.0.0-rc.2
github.com/roadrunner-server/errors v1.1.0
go.uber.org/zap v1.20.0
)

require (
github.com/pkg/errors v0.9.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
506 changes: 506 additions & 0 deletions go.sum

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions memcachedkv/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package memcachedkv

type Config struct {
// Addr is url for memcached, 11211 port is used by default
Addr []string
}

func (s *Config) InitDefaults() {
if s.Addr == nil {
s.Addr = []string{"127.0.0.1:11211"} // default url for memcached
}
}
255 changes: 255 additions & 0 deletions memcachedkv/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package memcachedkv

import (
"strings"
"time"

"github.com/bradfitz/gomemcache/memcache"
"github.com/roadrunner-server/api/v2/plugins/config"
kvv1 "github.com/roadrunner-server/api/v2/proto/kv/v1beta"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

type driver struct {
client *memcache.Client
log *zap.Logger
cfg *Config
}

// NewMemcachedDriver returns a memcache client using the provided server(s)
// with equal weight. If a server is listed multiple times,
// it gets a proportional amount of weight.
func NewMemcachedDriver(log *zap.Logger, key string, cfgPlugin config.Configurer) (*driver, error) {
const op = errors.Op("new_memcached_driver")

s := &driver{
log: log,
}

err := cfgPlugin.UnmarshalKey(key, &s.cfg)
if err != nil {
return nil, errors.E(op, err)
}

if s.cfg == nil {
return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
}

s.cfg.InitDefaults()

s.client = memcache.New(s.cfg.Addr...)

return s, nil
}

// Has checks the key for existence
func (d *driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("memcached_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
m := make(map[string]bool, len(keys))
for i := range keys {
keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
exist, err := d.client.Get(keys[i])

if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err == memcache.ErrCacheMiss {
continue
}
return nil, errors.E(op, err)
}
if exist != nil {
m[keys[i]] = true
}
}
return m, nil
}

// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
func (d *driver) Get(key string) ([]byte, error) {
const op = errors.Op("memcached_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
data, err := d.client.Get(key)
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err == memcache.ErrCacheMiss {
return nil, nil
}
return nil, errors.E(op, err)
}
if data != nil {
// return the value by the key
return data.Value, nil
}
// data is nil by some reason and error also nil
return nil, nil
}

// MGet return map with key -- string
// and map value as value -- []byte
func (d *driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("memcached_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}

// should not be empty keys
for i := range keys {
keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
}

m := make(map[string][]byte, len(keys))
for i := range keys {
// Here also MultiGet
data, err := d.client.Get(keys[i])
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err == memcache.ErrCacheMiss {
continue
}
return nil, errors.E(op, err)
}
if data != nil {
m[keys[i]] = data.Value
}
}

return m, nil
}

// Set sets the KV pairs. Keys should be 250 bytes maximum
// TTL:
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
func (d *driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}

for i := range items {
if items[i] == nil {
return errors.E(op, errors.EmptyItem)
}

// pre-allocate item
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
Value: items[i].Value,
Flags: 0,
}

// add additional TTL in case of TTL isn't empty
if items[i].Timeout != "" {
// verify the TTL
t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
memcachedItem.Expiration = int32(t.Unix())
}

err := d.client.Set(memcachedItem)
if err != nil {
return err
}
}

return nil
}

// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
func (d *driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
if items[i] == nil {
continue
}
if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}

// verify provided TTL
t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}

// Touch updates the expiry for the given key. The seconds parameter is either
// a Unix timestamp or, if seconds is less than 1 month, the number of seconds
// into the future at which time the item will expire. Zero means the item has
// no expiration time. ErrCacheMiss is returned if the key is not in the cache.
// The key must be at most 250 bytes in length.
err = d.client.Touch(items[i].Key, int32(t.Unix()))
if err != nil {
return errors.E(op, err)
}
}
return nil
}

// TTL return time in seconds (int32) for a given keys
func (d *driver) TTL(_ ...string) (map[string]string, error) {
const op = errors.Op("memcached_plugin_ttl")
return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
}

func (d *driver) Delete(keys ...string) error {
const op = errors.Op("memcached_plugin_has")
if keys == nil {
return errors.E(op, errors.NoKeys)
}

// should not be empty keys
for i := range keys {
keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return errors.E(op, errors.EmptyKey)
}
}

for i := range keys {
err := d.client.Delete(keys[i])
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err == memcache.ErrCacheMiss {
continue
}
return errors.E(op, err)
}
}
return nil
}

func (d *driver) Clear() error {
err := d.client.DeleteAll()
if err != nil {
d.log.Error("flush_all operation failed", zap.Error(err))
return err
}

return nil
}

func (d *driver) Stop() {
// not implemented https://github.com/bradfitz/gomemcache/issues/51
}
47 changes: 47 additions & 0 deletions plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package memcached

import (
"github.com/roadrunner-server/api/v2/plugins/config"
"github.com/roadrunner-server/api/v2/plugins/kv"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/memcached/v2/memcachedkv"
"go.uber.org/zap"
)

const (
PluginName string = "memcached"
RootPluginName string = "kv"
)

type Plugin struct {
// config plugin
cfgPlugin config.Configurer
// logger
log *zap.Logger
}

func (p *Plugin) Init(log *zap.Logger, cfg config.Configurer) error {
if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}

p.cfgPlugin = cfg
p.log = new(zap.Logger)
*p.log = *log
return nil
}

// Name returns plugin user-friendly name
func (p *Plugin) Name() string {
return PluginName
}

func (p *Plugin) KvFromConfig(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := memcachedkv.NewMemcachedDriver(p.log, key, p.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}

return st, nil
}

0 comments on commit 51a176c

Please sign in to comment.