Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl identity lookup by psql #2

Draft
wants to merge 12 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions actor/supervision.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package actor

import (
"log"
"runtime"
"time"
)

Expand All @@ -22,6 +24,14 @@ type Supervisor interface {
}

func logFailure(actorSystem *ActorSystem, child *PID, reason interface{}, directive Directive) {
log.Printf("[ERROR] %s\n", reason)
for depth := 0; ; depth++ {
_, file, line, ok := runtime.Caller(depth)
if !ok {
break
}
log.Printf("======> %d: %v:%d", depth, file, line)
}
actorSystem.EventStream.Publish(&SupervisorEvent{
Child: child,
Reason: reason,
Expand Down
74 changes: 62 additions & 12 deletions cluster/identity_storage_lookup.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package cluster

import (
"github.com/lithammer/shortuuid/v4"
"log/slog"
"time"

"github.com/asynkron/protoactor-go/actor"
)

const (
placementActorName = "placement-activator"
workerActorName = "identity-storage-worker"
pidClusterIdentityStartIndex = len(placementActorName) + 1
)

Expand All @@ -18,11 +21,13 @@ type IdentityStorageLookup struct {
isClient bool
placementActor *actor.PID
system *actor.ActorSystem
router *actor.PID
worker *actor.PID
memberID string
}

func newIdentityStorageLookup(storage StorageLookup) *IdentityStorageLookup {
var _ IdentityLookup = (*IdentityStorageLookup)(nil)

func NewIdentityStorageLookup(storage StorageLookup) *IdentityStorageLookup {
this := &IdentityStorageLookup{
Storage: storage,
}
Expand All @@ -44,22 +49,67 @@ func RemotePlacementActor(address string) *actor.PID {
//

// Get returns a PID for a given ClusterIdentity
func (id *IdentityStorageLookup) Get(clusterIdentity *ClusterIdentity) *actor.PID {
func (i *IdentityStorageLookup) Get(clusterIdentity *ClusterIdentity) *actor.PID {
i.system.Logger().Info("Get", slog.Any("clusterIdentity", clusterIdentity))
msg := newGetPid(clusterIdentity)
timeout := 5 * time.Second

res, _ := id.system.Root.RequestFuture(id.router, msg, timeout).Result()
response := res.(*actor.Future)
res, err := i.system.Root.RequestFuture(i.worker, msg, timeout).Result()
if err != nil {
i.system.Logger().Error("Failed to get pid", slog.Any("error", err))
return nil
}
response := res.(*PidResult)
return response.Pid
}

return response.PID()
func (i *IdentityStorageLookup) RemovePid(clusterIdentity *ClusterIdentity, _ *actor.PID) {
if i.system.IsStopped() {
return
}

i.Storage.RemoveActivation(newSpawnLock(shortuuid.New(), clusterIdentity))
}

func (id *IdentityStorageLookup) Setup(cluster *Cluster, kinds []string, isClient bool) {
id.cluster = cluster
id.system = cluster.ActorSystem
id.memberID = cluster.ActorSystem.ID
func (i *IdentityStorageLookup) Shutdown() {
i.system.Root.Stop(i.worker)
if !i.isClient {
i.system.Root.Stop(i.placementActor)
}

// workerProps := actor.PropsFromProducer(func() actor.Actor { return newIdentityStorageWorker(identity) })
i.RemoveMember(i.memberID)
i.system.Root.Stop(i.worker)
}

// routerProps := identity.system.Root.(workerProps, 50);
func (i *IdentityStorageLookup) Setup(cluster *Cluster, _ []string, isClient bool) {
i.cluster = cluster
i.system = cluster.ActorSystem
i.memberID = cluster.ActorSystem.ID
i.isClient = isClient

workerProps := actor.PropsFromProducer(func() actor.Actor { return newIdentityStorageWorker(i) })
var err error
i.worker, err = i.system.Root.SpawnNamed(workerProps, workerActorName)
if err != nil {
panic(err)
}
i.cluster.ActorSystem.EventStream.SubscribeWithPredicate(func(message interface{}) {
ct := message.(*ClusterTopology)
for _, member := range ct.Left {
i.RemoveMember(member.Id)
}
}, func(m interface{}) bool {
_, ok := m.(*ClusterTopology)
return ok
})

if i.isClient {
return
}

i.placementActor, err = i.system.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newIdentityStoragePlacementActor(cluster, i) }), placementActorName)
i.cluster.Logger().Info("placement actor started", slog.Any("actor", i.placementActor))
if err != nil {
panic(err)
}
}
126 changes: 126 additions & 0 deletions cluster/identity_storage_placement_actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package cluster

import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/eventstream"
"log/slog"
)

type GrainMeta struct {
ID *ClusterIdentity
PID *actor.PID
}

type IdentityStoragePlacementActor struct {
cluster *Cluster
identityStorageLookup *IdentityStorageLookup
subscription *eventstream.Subscription
actors map[string] /* clusterIdentity*/ GrainMeta
logger *slog.Logger
}

func newIdentityStoragePlacementActor(cluster *Cluster, identityStorageLookup *IdentityStorageLookup) *IdentityStoragePlacementActor {
this := &IdentityStoragePlacementActor{
cluster: cluster,
identityStorageLookup: identityStorageLookup,
actors: make(map[string] /* clusterIdentity*/ GrainMeta),
logger: cluster.Logger().With(
slog.String("actorType", "IdentityStoragePlacementActor"),
),
}
return this
}

func (i *IdentityStoragePlacementActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
i.onStarted(context)
case *actor.Stopping:
i.onStopping(context)
case *ActivationTerminating:
i.onActivationTerminating(context, msg)
case *ActivationRequest:
i.onActivationRequest(context, msg)
}
}

func (i *IdentityStoragePlacementActor) onStarted(context actor.Context) {
i.logger.Info("IdentityStoragePlacementActor started")
i.logger = i.logger.With(slog.String("pid", context.Self().Id))
i.subscription = i.cluster.ActorSystem.EventStream.Subscribe(func(evt interface{}) {
context.Send(context.Self(), evt)
})
}

func (i *IdentityStoragePlacementActor) onStopping(_ actor.Context) {
i.logger.Info("IdentityStoragePlacementActor stopping")
i.subscription.Deactivate()
for _, meta := range i.actors {
/// TODO: need to throttle?
i.cluster.ActorSystem.Root.Stop(meta.PID)
}
}

func (i *IdentityStoragePlacementActor) onActivationTerminating(_ actor.Context, msg *ActivationTerminating) {
if grainMeta, ok := i.actors[msg.ClusterIdentity.AsKey()]; ok {
if grainMeta.PID != msg.Pid {
i.logger.Error("PID mismatch", slog.Any("clusterIdentity", msg.ClusterIdentity), slog.Any("expectedPid", grainMeta.PID), slog.Any("actualPid", msg.Pid))
return
}
delete(i.actors, msg.ClusterIdentity.AsKey())
i.cluster.PidCache.Remove(msg.ClusterIdentity.Identity, msg.ClusterIdentity.Kind)
i.identityStorageLookup.Storage.RemoveActivation(newSpawnLock(grainMeta.PID.String(), msg.ClusterIdentity))
} else {
i.logger.Error("IdentityStoragePlacementActor#onActivationTerminating activation not found", slog.Any("clusterIdentity", msg.ClusterIdentity))
}
}

func (i *IdentityStoragePlacementActor) onActivationRequest(context actor.Context, msg *ActivationRequest) {
if grainMeta, ok := i.actors[msg.ClusterIdentity.AsKey()]; ok {
context.Respond(&ActivationResponse{Pid: grainMeta.PID})
return
}

clusterKind := i.cluster.GetClusterKind(msg.ClusterIdentity.Kind)
if clusterKind == nil {
context.Logger().Error("Cluster kind not found", slog.Any("clusterIdentity", msg.ClusterIdentity), slog.String("kind", msg.ClusterIdentity.Kind))
context.Respond(&ActivationResponse{Pid: nil, Failed: true})
return
}

i.spawnActor(context, msg, clusterKind)
}

func (i *IdentityStoragePlacementActor) spawnActor(ctx actor.Context, req *ActivationRequest, kind *ActivatedKind) {
props := WithClusterIdentity(kind.Props, req.ClusterIdentity)
pid := ctx.SpawnPrefix(props, req.ClusterIdentity.Identity)

storeActivation := func(pid *actor.PID) (ok bool) {
ok = false
defer func() {
if r := recover(); r != nil {
i.logger.Error("Failed to store activation", slog.Any("error", r))
ctx.Stop(pid)
ok = false
}
}()
i.identityStorageLookup.Storage.StoreActivation(i.cluster.ActorSystem.ID, newSpawnLock(req.RequestId, req.ClusterIdentity), pid)
return true
}

ok := storeActivation(pid)
if !ok {
ctx.Respond(&ActivationResponse{Pid: nil, Failed: true})
return
}
kind.Inc()

/// TODO: member selectionを考慮
i.logger.Info("Activation Stored", slog.Any("pid", pid), slog.Any("clusterIdentity", req.ClusterIdentity), slog.String("key", req.ClusterIdentity.AsKey()))
i.actors[req.ClusterIdentity.AsKey()] = GrainMeta{
ID: req.ClusterIdentity,
PID: pid,
}
i.cluster.PidCache.Set(req.ClusterIdentity.Identity, req.ClusterIdentity.Kind, pid)
ctx.Respond(&ActivationResponse{Pid: pid})
}
Loading