Skip to content

Commit

Permalink
refactor: discover pluggable component then register it to mosn runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
cyb0225 committed Sep 19, 2023
1 parent 72b5388 commit 6209b3f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 64 deletions.
109 changes: 60 additions & 49 deletions pkg/runtime/pluggable.go → pkg/runtime/pluggable/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package runtime
package pluggable

import (
"context"
Expand All @@ -20,6 +20,8 @@ import (
"path/filepath"
"runtime"

"mosn.io/layotto/components/hello"

"mosn.io/layotto/pkg/common"
grpcdial "mosn.io/layotto/pkg/grpc"

Expand All @@ -28,6 +30,10 @@ import (
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)

func init() {
onServiceDiscovered = make(map[string]CallbackFunc)
}

const (
// the default folder to store pluggable component socket files.
defaultSocketFolder = "/tmp/runtime/component-sockets"
Expand All @@ -41,35 +47,10 @@ var (
onServiceDiscovered map[string]CallbackFunc
)

type CallbackFunc func(compType string, dialer grpcdial.GRPCConnectionDialer, m *MosnRuntime)
type CallbackFunc func(compType string, dialer grpcdial.GRPCConnectionDialer, m *DiscoverFactory)

// get service form socket files.
type reflectServiceClient interface {
ListServices() ([]string, error)
Reset()
}

type pluggableComponentService struct {
// protoRef is the proto service name
protoRef string
// componentName is the component name that implements such service.
componentName string
// dialer is the used grpc connectiondialer.
dialer grpcdial.GRPCConnectionDialer
}

type grpcConnectionCloser interface {
grpc.ClientConnInterface
Close() error
}

func init() {
onServiceDiscovered = make(map[string]CallbackFunc)
}

// AddServiceDiscoveryCallback register callback function, not concurrent secure
func AddServiceDiscoveryCallback(serviceName string, callback CallbackFunc) {
onServiceDiscovered[serviceName] = callback
type DiscoverFactory struct {
Hellos []*hello.HelloFactory
}

// GetSocketFolderPath gets the path of folder storing pluggable component socket files.
Expand All @@ -80,30 +61,60 @@ func GetSocketFolderPath() string {
return defaultSocketFolder
}

// RegisterPluggableComponent discovers pluggable component and registers their factory into MosnRuntime
func (m *MosnRuntime) RegisterPluggableComponent() error {
// AddServiceDiscoveryCallback register callback function, not concurrent secure
func AddServiceDiscoveryCallback(serviceName string, callback CallbackFunc) {
onServiceDiscovered[serviceName] = callback
}

func NewDefaultDiscoverFactory() *DiscoverFactory {
factory := new(DiscoverFactory)
factory.Hellos = make([]*hello.HelloFactory, 0)
return factory
}

func Discover() (*DiscoverFactory, error) {
// At present, layotto only support register component from unix domain socket connection,
// and not compatible with windows.
if runtime.GOOS == "windows" {
return errors.New("windows haven't support register pluggable component")
return nil, errors.New("windows haven't support register pluggable component")
}

// 1. discover pluggable component
services, err := m.discover()
factory := NewDefaultDiscoverFactory()
serviceList, err := discover()
if err != nil {
return err
return nil, err
}

// 2. callback to register factory into MosnRuntime
m.callback(services)
return nil
callback(serviceList, factory)
return factory, nil
}

// get service form socket files.
type reflectServiceClient interface {
ListServices() ([]string, error)
Reset()
}

type grpcService struct {
// protoRef is the proto service name
protoRef string
// componentName is the component name that implements such service.
componentName string
// dialer is the used grpc connectiondialer.
dialer grpcdial.GRPCConnectionDialer
}

type grpcConnectionCloser interface {
grpc.ClientConnInterface
Close() error
}

// discover use grpc reflect to get services information.
func (m *MosnRuntime) discover() ([]pluggableComponentService, error) {
// discover use grpc reflect to get services' information.
func discover() ([]grpcService, error) {
ctx := context.TODO()
services, err := serviceDiscovery(func(socket string) (client reflectServiceClient, closer func(), err error) {
serviceList, err := serviceDiscovery(func(socket string) (client reflectServiceClient, closer func(), err error) {
conn, err := grpcdial.SocketDial(
ctx,
socket,
Expand All @@ -120,7 +131,7 @@ func (m *MosnRuntime) discover() ([]pluggableComponentService, error) {
return nil, err
}

return services, nil
return serviceList, nil
}

// reflectServiceConnectionCloser is used for cleanup the stream created to be used for the reflection service.
Expand All @@ -132,14 +143,14 @@ func reflectServiceConnectionCloser(conn grpcConnectionCloser, client reflectSer
}

// discover service socket files and get service information from factory factor function.
func serviceDiscovery(reflectClientFactory func(socket string) (client reflectServiceClient, cleanup func(), err error)) ([]pluggableComponentService, error) {
services := make([]pluggableComponentService, 0)
func serviceDiscovery(reflectClientFactory func(socket string) (client reflectServiceClient, cleanup func(), err error)) ([]grpcService, error) {
res := make([]grpcService, 0)

// 1. get socket folder files
path := GetSocketFolderPath()
_, err := os.Stat(path)
if os.IsNotExist(err) {
return services, nil
return res, nil
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -178,25 +189,25 @@ func serviceDiscovery(reflectClientFactory func(socket string) (client reflectSe
return nil, err
}

for _, service := range serviceList {
services = append(services, pluggableComponentService{
protoRef: service,
for _, s := range serviceList {
res = append(res, grpcService{
protoRef: s,
dialer: grpcdial.SocketDialer(socket, grpc.WithBlock(), grpc.FailOnNonTempDialError(true)),
componentName: common.RemoveExt(f.Name()),
})
}
}
return services, nil
return res, nil
}

// callback use callback function to register pluggable component factories into MosnRuntime
func (m *MosnRuntime) callback(services []pluggableComponentService) {
func callback(services []grpcService, factory *DiscoverFactory) {
for _, service := range services {
callback, ok := onServiceDiscovered[service.protoRef]
if !ok {
continue
}

callback(service.componentName, service.dialer, m)
callback(service.componentName, service.dialer, factory)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package runtime
package pluggable

import (
"errors"
Expand All @@ -21,10 +21,10 @@ import (
"sync"
"testing"

grpcdial "mosn.io/layotto/pkg/grpc"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

grpcdial "mosn.io/layotto/pkg/grpc"
)

type fakeReflectService struct {
Expand Down Expand Up @@ -61,10 +61,6 @@ func (f *fakeGrpcCloser) Close() error {
return nil
}

func newMosnRuntime() *MosnRuntime {
return NewMosnRuntime(&MosnRuntimeConfig{})
}

func TestGetSocketFolderPath(t *testing.T) {
t.Run("get socket folder should use default when env var is not set", func(t *testing.T) {
assert.Equal(t, GetSocketFolderPath(), defaultSocketFolder)
Expand All @@ -76,16 +72,16 @@ func TestGetSocketFolderPath(t *testing.T) {
})
}

func TestMosnRuntime_Callback(t *testing.T) {
m := newMosnRuntime()
func Test_Callback(t *testing.T) {
t.Run("callback should be called when service ref is registered", func(t *testing.T) {
const fakeComponentName, fakeServiceName = "fake-comp", "fake-svc"
called := 0
AddServiceDiscoveryCallback(fakeServiceName, func(name string, _ grpcdial.GRPCConnectionDialer, m *MosnRuntime) {
f := NewDefaultDiscoverFactory()
AddServiceDiscoveryCallback(fakeServiceName, func(name string, _ grpcdial.GRPCConnectionDialer, f *DiscoverFactory) {
called++
assert.Equal(t, name, fakeComponentName)
})
m.callback([]pluggableComponentService{{protoRef: fakeServiceName, componentName: fakeComponentName}})
callback([]grpcService{{protoRef: fakeServiceName, componentName: fakeComponentName}}, f)
assert.Equal(t, 1, called)
})
}
Expand All @@ -95,7 +91,7 @@ func Test_serviceDiscovery(t *testing.T) {
return
}
t.Run("add service callback should add a new entry when called", func(t *testing.T) {
AddServiceDiscoveryCallback("fake", func(string, grpcdial.GRPCConnectionDialer, *MosnRuntime) {})
AddServiceDiscoveryCallback("fake", func(string, grpcdial.GRPCConnectionDialer, *DiscoverFactory) {})
assert.NotEmpty(t, onServiceDiscovered)
})
t.Run("serviceDiscovery should return empty services if directory not exists", func(t *testing.T) {
Expand Down
15 changes: 13 additions & 2 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"mosn.io/layotto/components/sequencer"
"mosn.io/layotto/pkg/grpc"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"
"mosn.io/layotto/pkg/runtime/pluggable"
runtime_pubsub "mosn.io/layotto/pkg/runtime/pubsub"
runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer"
runtime_state "mosn.io/layotto/pkg/runtime/state"
Expand Down Expand Up @@ -676,8 +677,8 @@ func (m *MosnRuntime) AppendInitRuntimeStage(f initRuntimeStage) {
func (m *MosnRuntime) initRuntime(r *runtimeOptions) error {
st := time.Now()

// register component component
if err := m.RegisterPluggableComponent(); err != nil {
// register pluggable component
if err := m.registerPluggableComponent(); err != nil {
return err
}

Expand All @@ -697,6 +698,16 @@ func (m *MosnRuntime) initRuntime(r *runtimeOptions) error {
return nil
}

func (m *MosnRuntime) registerPluggableComponent() error {
discover, err := pluggable.Discover()
if err != nil {
return err
}
m.helloRegistry.Register(discover.Hellos...)

return nil
}

func (m *MosnRuntime) SetCustomComponent(kind string, name string, component custom.Component) {
if _, ok := m.customComponent[kind]; !ok {
m.customComponent[kind] = make(map[string]custom.Component)
Expand Down
18 changes: 17 additions & 1 deletion pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net"
"runtime"
"testing"

aws2 "mosn.io/layotto/components/oss/aws"
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestMosnRuntime_Run(t *testing.T) {
assert.NotNil(t, server)
rt.Stop()
})
t.Run("run succesfully with initRuntimeStage", func(t *testing.T) {
t.Run("run successfully with initRuntimeStage", func(t *testing.T) {
runtimeConfig := &MosnRuntimeConfig{}
rt := NewMosnRuntime(runtimeConfig)
etcdCustomComponent := mock_component.NewCustomComponentMock()
Expand Down Expand Up @@ -241,7 +242,10 @@ func TestMosnRuntime_Run(t *testing.T) {
// 4. assert
assert.NotNil(t, err)
assert.True(t, err == errExpected)
})

t.Run("register pluggable component", func(t *testing.T) {
// todo
})
}

Expand Down Expand Up @@ -271,6 +275,18 @@ func TestMosnRuntime_initAppCallbackConnection(t *testing.T) {
})
}

func TestMosnRuntime_registerPluggableComponent(t *testing.T) {
if runtime.GOOS == "windows" {
return
}

t.Run("register hello service success", func(t *testing.T) {
// todo
// 1. start grpc service which implements hello service
// 2. register hello service into mosnRuntime
})
}

func TestMosnRuntime_initPubSubs(t *testing.T) {
t.Run("normal", func(t *testing.T) {
// mock pubsub component
Expand Down

0 comments on commit 6209b3f

Please sign in to comment.