diff --git a/internal/cmd/cluster/cluster.go b/internal/cmd/cluster/cluster.go index 1d2ebf3b..5d6a0e5c 100644 --- a/internal/cmd/cluster/cluster.go +++ b/internal/cmd/cluster/cluster.go @@ -6,9 +6,8 @@ import ( "github.com/lthibault/log" "github.com/urfave/cli/v2" - "github.com/wetware/ww/internal/runtime" - runtimeutil "github.com/wetware/ww/internal/util/runtime" "github.com/wetware/ww/pkg/client" + "github.com/wetware/ww/pkg/runtime" "go.uber.org/fx" ) @@ -67,10 +66,9 @@ func Command() *cli.Command { func setup() cli.BeforeFunc { return func(c *cli.Context) (err error) { app = fx.New( - runtime.Prelude(runtimeutil.New(c)), + runtime.NewClient(c.Context, c), fx.StartTimeout(c.Duration("timeout")), - fx.Populate(&logger, &dialer), - runtime.Client()) + fx.Populate(&logger, &dialer)) ctx, cancel := context.WithTimeout( c.Context, diff --git a/internal/cmd/debug/debug.go b/internal/cmd/debug/debug.go index a2f3ceb8..8085773d 100644 --- a/internal/cmd/debug/debug.go +++ b/internal/cmd/debug/debug.go @@ -9,9 +9,8 @@ import ( "github.com/urfave/cli/v2" "go.uber.org/fx" - "github.com/wetware/ww/internal/runtime" - runtimeutil "github.com/wetware/ww/internal/util/runtime" "github.com/wetware/ww/pkg/client" + "github.com/wetware/ww/pkg/runtime" ) var ( @@ -67,10 +66,9 @@ func Command() *cli.Command { func setup() cli.BeforeFunc { return func(c *cli.Context) (err error) { app = fx.New( - runtime.Prelude(runtimeutil.New(c)), + runtime.NewClient(c.Context, c), fx.StartTimeout(c.Duration("timeout")), - fx.Populate(&logger, &dialer), - runtime.Client()) + fx.Populate(&logger, &dialer)) ctx, cancel := context.WithTimeout( c.Context, diff --git a/internal/cmd/start/start.go b/internal/cmd/start/start.go index 53c6df5f..d5d541a5 100644 --- a/internal/cmd/start/start.go +++ b/internal/cmd/start/start.go @@ -8,8 +8,7 @@ import ( "github.com/urfave/cli/v2" "go.uber.org/fx" - "github.com/wetware/ww/internal/runtime" - runtimeutil "github.com/wetware/ww/internal/util/runtime" + "github.com/wetware/ww/pkg/runtime" "github.com/wetware/ww/pkg/server" ) @@ -70,9 +69,8 @@ func Command() *cli.Command { func setup() cli.BeforeFunc { return func(c *cli.Context) error { app = fx.New( - runtime.Prelude(runtimeutil.New(c)), - fx.Populate(&logger, &node), - runtime.Server()) + runtime.NewServer(c.Context, c), + fx.Populate(&logger, &node)) return start(c.Context, app) } diff --git a/internal/runtime/client.go b/internal/runtime/client.go deleted file mode 100644 index 6f69e64c..00000000 --- a/internal/runtime/client.go +++ /dev/null @@ -1,26 +0,0 @@ -package runtime - -import ( - casm "github.com/wetware/casm/pkg" - "go.uber.org/fx" -) - -// Client declares dependencies for a *client.Node. -func Client(opt ...Option) fx.Option { - var c Config - for _, option := range clientDefaults(opt) { - option(&c) - } - - return fx.Module("client", - c.Vat(), - c.System(), - c.ClientBootstrap(), - ) -} - -func clientDefaults(opt []Option) []Option { - return append([]Option{ - WithHostConfig(casm.Client), - }, opt...) -} diff --git a/internal/runtime/server.go b/internal/runtime/server.go deleted file mode 100644 index 334acbd1..00000000 --- a/internal/runtime/server.go +++ /dev/null @@ -1,42 +0,0 @@ -package runtime - -import ( - pubsub "github.com/libp2p/go-libp2p-pubsub" - casm "github.com/wetware/casm/pkg" - "github.com/wetware/ww/pkg/server" - "go.uber.org/fx" -) - -func Server(opt ...Option) fx.Option { - var c Config - for _, option := range serverDefaults(opt) { - option(&c) - } - - return fx.Module("server", - c.Vat(), - c.System(), - c.PubSub(), - c.Routing(), - c.ServerBootstrap(), - fx.Provide(newServerNode), - fx.Invoke(bootServer)) -} - -func serverDefaults(opt []Option) []Option { - return append([]Option{ - WithHostConfig(casm.Server), - }, opt...) -} - -func newServerNode(j server.Joiner, ps *pubsub.PubSub) (*server.Node, error) { - // TODO: this should use lx.OnStart to benefit from the start timeout. - return j.Join(ps) -} - -func bootServer(lx fx.Lifecycle, n *server.Node) { - lx.Append(fx.Hook{ - OnStart: bootstrap(n), - OnStop: onclose(n), - }) -} diff --git a/internal/util/runtime/runtime.go b/internal/util/runtime/runtime.go deleted file mode 100644 index 177eef2d..00000000 --- a/internal/util/runtime/runtime.go +++ /dev/null @@ -1,53 +0,0 @@ -package runtimeutil - -import ( - "context" - "time" - - "github.com/lthibault/log" - "github.com/urfave/cli/v2" - - "github.com/wetware/casm/pkg/util/metrics" - "github.com/wetware/ww/internal/runtime" - logutil "github.com/wetware/ww/internal/util/log" - statsdutil "github.com/wetware/ww/internal/util/statsd" -) - -func New(c *cli.Context) runtime.Env { - logging := logutil.New(c) - metrics := statsdutil.New(c, logging) - - return env{ - flags: c, - logging: logging, - metrics: metrics, - } -} - -type env struct { - flags - logging log.Logger - metrics metrics.Client -} - -func (env env) Context() context.Context { - return env.flags.(*cli.Context).Context -} - -func (env env) Log() log.Logger { - return env.logging -} - -func (env env) Metrics() metrics.Client { - return env.metrics -} - -type flags interface { - Bool(string) bool - IsSet(string) bool - Path(string) string - String(string) string - StringSlice(string) []string - Duration(string) time.Duration - Float64(string) float64 -} diff --git a/internal/runtime/discover.go b/pkg/runtime/discover.go similarity index 71% rename from internal/runtime/discover.go rename to pkg/runtime/discover.go index 1f23191a..bc359fb5 100644 --- a/internal/runtime/discover.go +++ b/pkg/runtime/discover.go @@ -12,6 +12,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-kad-dht/dual" "github.com/libp2p/go-libp2p/core/discovery" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/discovery/routing" casm "github.com/wetware/casm/pkg" @@ -23,6 +24,26 @@ import ( "go.uber.org/fx" ) +type bootConfig struct { + fx.In + + Log log.Logger + Metrics metrics.Client + Vat casm.Vat + Flag Flags +} + +func (bc bootConfig) host() host.Host { + return bc.Vat.Host +} + +func (bc bootConfig) metrics() bootMetrics { + return bootMetrics{ + Log: bc.Log, + Metrics: bc.Metrics, + } +} + func (c Config) ClientBootstrap() fx.Option { return fx.Provide(c.newClientDisc) } @@ -31,14 +52,14 @@ func (c Config) ServerBootstrap() fx.Option { return fx.Provide(c.newServerDisc) } -func (c Config) newServerDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discovery.Discovery, err error) { - if env.IsSet("addr") { - d, err = boot.NewStaticAddrStrings(env.StringSlice("addr")...) +func (c Config) newServerDisc(config bootConfig, lx fx.Lifecycle) (d discovery.Discovery, err error) { + if config.Flag.IsSet("addr") { + d, err = boot.NewStaticAddrStrings(config.Flag.StringSlice("addr")...) return } - d, err = bootutil.ListenString(vat.Host, env.String("discover"), - socket.WithLogger(env.Log()), + d, err = bootutil.ListenString(config.host(), config.Flag.String("discover"), + socket.WithLogger(config.Log), socket.WithRateLimiter(socket.NewPacketLimiter(256, 16))) if c, ok := d.(io.Closer); ok { lx.Append(closer(c)) @@ -47,14 +68,14 @@ func (c Config) newServerDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discove return } -func (c Config) newClientDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discovery.Discoverer, err error) { - if env.IsSet("addr") { - d, err = boot.NewStaticAddrStrings(env.StringSlice("addr")...) +func (c Config) newClientDisc(config bootConfig, lx fx.Lifecycle) (d discovery.Discoverer, err error) { + if config.Flag.IsSet("addr") { + d, err = boot.NewStaticAddrStrings(config.Flag.StringSlice("addr")...) return } - d, err = bootutil.DialString(vat.Host, env.String("discover"), - socket.WithLogger(env.Log()), + d, err = bootutil.DialString(config.host(), config.Flag.String("discover"), + socket.WithLogger(config.Log), socket.WithRateLimiter(socket.NewPacketLimiter(256, 16))) if c, ok := d.(io.Closer); ok { lx.Append(closer(c)) @@ -62,7 +83,7 @@ func (c Config) newClientDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discove return &logMetricDisc{ disc: d, - metrics: bootMetrics{env}, + metrics: config.metrics(), }, err } @@ -78,8 +99,7 @@ func (c Config) withPubSubDiscovery(d discovery.Discovery, config psBootConfig) type psBootConfig struct { fx.In - Env Env - Vat casm.Vat + Boot bootConfig DHT *dual.DHT Datastore ds.Batching Lifecycle fx.Lifecycle @@ -91,9 +111,9 @@ func (config psBootConfig) maybePeX(d discovery.Discovery, opt []pex.Option) (di return d, nil } - px, err := pex.New(config.Vat.Host, append([]pex.Option{ + px, err := pex.New(config.Boot.host(), append([]pex.Option{ // default options for PeX - pex.WithLogger(config.Env.Log()), + pex.WithLogger(config.Boot.Log), pex.WithDatastore(config.Datastore), pex.WithDiscovery(d), }, opt...)...) @@ -110,7 +130,7 @@ func (config psBootConfig) Wrap(d discovery.Discovery) *boot.Namespace { // // 1. the bootstrap service, iff namespace matches cluster topic; else // 2. the DHT-backed discovery service. - bootTopic := "floodsub:" + config.Env.String("ns") + bootTopic := "floodsub:" + config.Boot.Flag.String("ns") match := func(ns string) bool { return ns == bootTopic } @@ -118,7 +138,7 @@ func (config psBootConfig) Wrap(d discovery.Discovery) *boot.Namespace { target := logMetricDisc{ disc: d, advt: d, - metrics: bootMetrics{config.Env}, + metrics: config.Boot.metrics(), } return &boot.Namespace{ @@ -159,18 +179,16 @@ func (b logMetricDisc) Advertise(ctx context.Context, ns string, opt ...discover } type bootMetrics struct { - env interface { - Log() log.Logger - Metrics() metrics.Client - } + Log log.Logger + Metrics metrics.Client } func (m bootMetrics) OnFindPeers(ns string) { - m.env.Log().Debug("bootstrapping namespace") - m.env.Metrics().Incr(fmt.Sprintf("boot.%s.find_peers", ns)) + m.Log.Debug("bootstrapping namespace") + m.Metrics.Incr(fmt.Sprintf("boot.%s.find_peers", ns)) } func (m bootMetrics) OnAdvertise(ns string) { - m.env.Log().Debug("advertising namespace") - m.env.Metrics().Decr(fmt.Sprintf("boot.%s.find_peers", ns)) + m.Log.Debug("advertising namespace") + m.Metrics.Decr(fmt.Sprintf("boot.%s.find_peers", ns)) } diff --git a/pkg/runtime/doc.go b/pkg/runtime/doc.go new file mode 100644 index 00000000..7142fa12 --- /dev/null +++ b/pkg/runtime/doc.go @@ -0,0 +1,35 @@ +/* +Package runtime provides a high-level API for constructing Wetware +clients and servers using dependency injection. + +The runtime package is intended to be used with go.uber.org/fx. As +such, it provides constructors for the fx.Option type, which can be +consumed by fx.New to create a client or server node. Refer to the +Fx documentation for information about how to consume fx.Options. + +The runtime package exports three basic types: + + 1. runtime.Env is a configuration struct containing top-level types + required by the runtime. These types are effectful, interacting + with the host environment: loggers, metrics, contexts, configs, + etc. The Options() method exports these types to Fx. + + 2. runtime.Config contains options for the libp2p, CASM and Wetware + constructors required to build a node. These are passed to their + constructors lazily, via the fx.Options returned by the Client() + and Server() methods. The Options returned by either Client() or + Server() MUST be passed to fx.New() along with those returned by + runtime.Env.Options(). + + 3. runtime.Option allows callers to pass options to libp2p, CASM or + Wetware. These are initially staged in runtime.Config, whereupon + they are passed into the fx.Options produced by the Client() and + Server() methods. + +The runtime API also exports two high-level constructors for building +client and server nodes: NewClient() and NewServer(). Callers SHOULD +prefer these over manual invocation of Env.Options(), Config.Client() +and Config().Server(), as they apply sensible defaults and are easier +to use overall. The lower-level API is provided for advanced users. +*/ +package runtime diff --git a/internal/runtime/option.go b/pkg/runtime/option.go similarity index 76% rename from internal/runtime/option.go rename to pkg/runtime/option.go index ac7f922c..a857b4da 100644 --- a/internal/runtime/option.go +++ b/pkg/runtime/option.go @@ -2,6 +2,7 @@ package runtime import ( "github.com/libp2p/go-libp2p" + "go.uber.org/fx" casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/pex" @@ -15,8 +16,35 @@ import ( type Config struct { newHost HostConfig hostOpt []libp2p.Option + pexOpt []pex.Option +} + +// With returns a new Config populated by the supplied options. +func (c Config) With(opt []Option) Config { + for _, option := range opt { + option(&c) + } + + return c +} + +func (c Config) Client() fx.Option { + return fx.Module("client", + c.Vat(), + c.System(), + c.ClientBootstrap(), + ) +} - pexOpt []pex.Option +func (c Config) Server() fx.Option { + return fx.Module("server", + c.Vat(), + c.System(), + c.PubSub(), + c.Routing(), + c.ServerBootstrap(), + fx.Provide(newServerNode), + fx.Invoke(bootServer)) } // Option can modify the state of Config. It is used to set diff --git a/internal/runtime/pubsub.go b/pkg/runtime/pubsub.go similarity index 58% rename from internal/runtime/pubsub.go rename to pkg/runtime/pubsub.go index 501c3109..847b013b 100644 --- a/internal/runtime/pubsub.go +++ b/pkg/runtime/pubsub.go @@ -1,12 +1,15 @@ package runtime import ( + "context" "fmt" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/lthibault/log" casm "github.com/wetware/casm/pkg" + "github.com/wetware/casm/pkg/util/metrics" protoutil "github.com/wetware/casm/pkg/util/proto" ww "github.com/wetware/ww/pkg" ww_pubsub "github.com/wetware/ww/pkg/pubsub" @@ -16,8 +19,12 @@ import ( type pubSubConfig struct { fx.In - Vat casm.Vat - Boot discovery.Discovery + Ctx context.Context + Log log.Logger + Metrics metrics.Client + Flag Flags + Vat casm.Vat + Boot discovery.Discovery } func (c *Config) PubSub() fx.Option { @@ -26,26 +33,26 @@ func (c *Config) PubSub() fx.Option { fx.Decorate(c.withPubSubDiscovery)) } -func (c *Config) newPubSub(env Env, config pubSubConfig) (*pubsub.PubSub, error) { - return pubsub.NewGossipSub(env.Context(), config.Vat.Host, +func (c *Config) newPubSub(config pubSubConfig) (*pubsub.PubSub, error) { + return pubsub.NewGossipSub(config.Ctx, config.Vat.Host, pubsub.WithPeerExchange(true), - pubsub.WithRawTracer(config.tracer(env)), + pubsub.WithRawTracer(config.tracer()), pubsub.WithDiscovery(config.Boot), - pubsub.WithProtocolMatchFn(config.protoMatchFunc(env)), - pubsub.WithGossipSubProtocols(config.subProtos(env)), + pubsub.WithProtocolMatchFn(config.protoMatchFunc()), + pubsub.WithGossipSubProtocols(config.subProtos()), pubsub.WithPeerOutboundQueueSize(256), ) } -func (pubSubConfig) tracer(env Env) ww_pubsub.Tracer { +func (config pubSubConfig) tracer() ww_pubsub.Tracer { return ww_pubsub.Tracer{ - Log: env.Log(), - Metrics: env.Metrics().WithPrefix("pubsub"), + Log: config.Log, + Metrics: config.Metrics.WithPrefix("pubsub"), } } -func (config pubSubConfig) protoMatchFunc(env Env) pubsub.ProtocolMatchFn { - match := matcher(env) +func (config pubSubConfig) protoMatchFunc() pubsub.ProtocolMatchFn { + match := matcher(config.Flag) return func(local string) func(string) bool { if match.Match(local) { @@ -56,10 +63,10 @@ func (config pubSubConfig) protoMatchFunc(env Env) pubsub.ProtocolMatchFn { } } -func (config pubSubConfig) features(env Env) func(pubsub.GossipSubFeature, protocol.ID) bool { - supportGossip := matcher(env) +func (config pubSubConfig) features() func(pubsub.GossipSubFeature, protocol.ID) bool { + supportGossip := matcher(config.Flag) - _, version := protoutil.Split(protoID(env)) + _, version := protoutil.Split(protoID(config.Flag)) supportsPX := protoutil.Suffix(version) return func(feat pubsub.GossipSubFeature, proto protocol.ID) bool { @@ -76,19 +83,19 @@ func (config pubSubConfig) features(env Env) func(pubsub.GossipSubFeature, proto } } -func matcher(env Env) protoutil.MatchFunc { +func matcher(flag Flags) protoutil.MatchFunc { proto, version := protoutil.Split(pubsub.GossipSubID_v11) return protoutil.Match( - ww.NewMatcher(env.String("ns")), + ww.NewMatcher(flag.String("ns")), protoutil.Exactly(string(proto)), protoutil.SemVer(string(version))) } -func (config pubSubConfig) subProtos(env Env) ([]protocol.ID, func(pubsub.GossipSubFeature, protocol.ID) bool) { - return []protocol.ID{protoID(env)}, config.features(env) +func (config pubSubConfig) subProtos() ([]protocol.ID, func(pubsub.GossipSubFeature, protocol.ID) bool) { + return []protocol.ID{protoID(config.Flag)}, config.features() } -func protoID(env Env) protocol.ID { +func protoID(flag Flags) protocol.ID { // FIXME: For security, the cluster topic should not be present // in the root pubsub capability server. @@ -98,6 +105,6 @@ func protoID(env Env) protocol.ID { // /casm//ww///meshsub/1.1.0 return protoutil.Join( - ww.Subprotocol(env.String("ns")), + ww.Subprotocol(flag.String("ns")), pubsub.GossipSubID_v11) } diff --git a/internal/runtime/routing.go b/pkg/runtime/routing.go similarity index 67% rename from internal/runtime/routing.go rename to pkg/runtime/routing.go index e491adf4..36048638 100644 --- a/internal/runtime/routing.go +++ b/pkg/runtime/routing.go @@ -1,6 +1,8 @@ package runtime import ( + "context" + dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p-kad-dht/dual" "github.com/libp2p/go-libp2p/core/host" @@ -16,7 +18,15 @@ func (c Config) Routing() fx.Option { fx.Decorate(routedHost)) } -func (c Config) newDHT(env Env, lx fx.Lifecycle, vat casm.Vat) (*dual.DHT, error) { +type routingConfig struct { + fx.In + + Ctx context.Context + Flag Flags + Vat casm.Vat +} + +func (c Config) newDHT(config routingConfig, lx fx.Lifecycle) (*dual.DHT, error) { // TODO: Use dht.BootstrapPeersFunc to get bootstrap peers from PeX? // This might allow us to greatly simplify our architecture and // runtime initialization. In particular: @@ -27,9 +37,9 @@ func (c Config) newDHT(env Env, lx fx.Lifecycle, vat casm.Vat) (*dual.DHT, error // 2. The server.Joiner type could be simplified, and perhaps // eliminated entirely. - d, err := dual.New(env.Context(), vat.Host, - dual.LanDHTOption(lanOpt(env)...), // TODO: options (w/ defaults) from Config - dual.WanDHTOption(wanOpt(env)...)) // TODO: options (w/ defaults) from Config + d, err := dual.New(config.Ctx, config.Vat.Host, + dual.LanDHTOption(lanOpt(config.Flag)...), // TODO: options (w/ defaults) from Config + dual.WanDHTOption(wanOpt(config.Flag)...)) // TODO: options (w/ defaults) from Config if err == nil { lx.Append(fx.Hook{ @@ -45,16 +55,16 @@ func routedHost(h host.Host, dht *dual.DHT) host.Host { return routedhost.Wrap(h, dht) } -func lanOpt(env Env) []dht.Option { +func lanOpt(flag Flags) []dht.Option { return []dht.Option{ dht.Mode(dht.ModeServer), - dht.ProtocolPrefix(ww.Subprotocol(env.String("ns"))), + dht.ProtocolPrefix(ww.Subprotocol(flag.String("ns"))), dht.ProtocolExtension("lan")} } -func wanOpt(env Env) []dht.Option { +func wanOpt(flag Flags) []dht.Option { return []dht.Option{ dht.Mode(dht.ModeAuto), - dht.ProtocolPrefix(ww.Subprotocol(env.String("ns"))), + dht.ProtocolPrefix(ww.Subprotocol(flag.String("ns"))), dht.ProtocolExtension("wan")} } diff --git a/internal/runtime/runtime.go b/pkg/runtime/runtime.go similarity index 65% rename from internal/runtime/runtime.go rename to pkg/runtime/runtime.go index 02e9deea..74d69dc4 100644 --- a/internal/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -8,12 +8,15 @@ import ( "reflect" "time" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/lthibault/log" - "go.uber.org/fx" - "go.uber.org/fx/fxevent" - casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/util/metrics" + logutil "github.com/wetware/ww/internal/util/log" + statsdutil "github.com/wetware/ww/internal/util/statsd" + "github.com/wetware/ww/pkg/server" + "go.uber.org/fx" + "go.uber.org/fx/fxevent" ) /**************************************************************** @@ -22,25 +25,38 @@ import ( * * ****************************************************************/ -// Env is a context object that exposes environmental data and -// effectful operations to the runtime. -type Env interface { - // Context returns the main runtime context. The returned - // context MAY expire, in which case callers SHOULD finish - // any outstanding work and terminate promptly. - Context() context.Context +// NewClient returns Fx options for a client runtime. Options +// are passed directly to Client(). +func NewClient(ctx context.Context, fs Flags, opt ...Option) fx.Option { + return fx.Options( + Env{Ctx: ctx, Flags: fs}.Options(), + Config{}.With(clientDefaults(opt)).Client(), + ) +} - /* - Observability - */ +// NewClient returns Fx options for a server runtime. Options +// are passed directly to Server(). +func NewServer(ctx context.Context, fs Flags, opt ...Option) fx.Option { + return fx.Options( + Env{Ctx: ctx, Flags: fs}.Options(), + Config{}.With(serverDefaults(opt)).Server(), + ) +} - Log() log.Logger - Metrics() metrics.Client +func clientDefaults(opt []Option) []Option { + return append([]Option{ + WithHostConfig(casm.Client), + }, opt...) +} - /* - Configuration - */ +func serverDefaults(opt []Option) []Option { + return append([]Option{ + WithHostConfig(casm.Server), + }, opt...) +} +// Flags are used to query configuration parameters. +type Flags interface { Bool(string) bool IsSet(string) bool Path(string) string @@ -50,23 +66,64 @@ type Env interface { Float64(string) float64 } -// Prelude provides the core wetware runtime. It MUST be passed -// to the top-level call to fx.New. -func Prelude(env Env) fx.Option { - return fx.Options(fx.WithLogger(newFxLogger), +// Env is a context object that exposes environmental data and +// effectful operations to the runtime. +type Env struct { + fx.In + + Ctx context.Context `optional:"true"` + Log log.Logger `optional:"true"` + Metrics metrics.Client `optional:"true"` + Flags Flags +} + +// Options for the Wetware runtime. These MUST be passed to +// the top-level call to fx.New, along with either options +// provided by either Config.Client() or Config.Server(). +func (env Env) Options() fx.Option { + return fx.Options( + fx.WithLogger(newFxLogger), fx.Supply( - fx.Annotate(env, fx.As(new(Env))), - fx.Annotate(env.Log(), fx.As(new(log.Logger))), - fx.Annotate(env.Metrics(), fx.As(new(metrics.Client)))), + fx.Annotate(env.Flags, fx.As(new(Flags))), + fx.Annotate(env.context(), fx.As(new(context.Context))), + fx.Annotate(env.logging(), fx.As(new(log.Logger))), + fx.Annotate(env.metrics(), fx.As(new(metrics.Client)))), fx.Decorate( - decorateLogger, - decorateEnv)) + decorateLogger)) +} + +func (env Env) context() context.Context { + if env.Ctx == nil { + env.Ctx = context.Background() + } + + return env.Ctx } +func (env *Env) logging() log.Logger { + if env.Log == nil { + env.Log = logutil.New(env.Flags) + } + + return env.Log +} + +func (env *Env) metrics() metrics.Client { + if env.Metrics == nil { + env.Metrics = statsdutil.New(env.Flags, env.logging()) + } + + return env.Metrics +} + +/* + Fx Logger +*/ + type fxLogger struct{ log.Logger } func newFxLogger(env Env) fxevent.Logger { - return fxLogger{env.Log()} + return fxLogger{env.Log} } func (lx fxLogger) LogEvent(ev fxevent.Event) { @@ -220,32 +277,32 @@ func (lx fxLogger) MaybeModule(name string) fxLogger { return lx } -type environment struct { - log log.Logger - Env -} - -func decorateEnv(env Env, log log.Logger) Env { - return environment{ - log: log, - Env: env, - } -} - -func (env environment) Log() log.Logger { - return env.log -} +/* + Misc. +*/ func decorateLogger(env Env, vat casm.Vat) log.Logger { - log := env.Log().With(vat) + log := env.Log.With(vat) - if env.IsSet("meta") { - log = log.WithField("meta", env.StringSlice("meta")) + if env.Flags.IsSet("meta") { + log = log.WithField("meta", env.Flags.StringSlice("meta")) } return log } +func newServerNode(j server.Joiner, ps *pubsub.PubSub) (*server.Node, error) { + // TODO: this should use lx.OnStart to benefit from the start timeout. + return j.Join(ps) +} + +func bootServer(lx fx.Lifecycle, n *server.Node) { + lx.Append(fx.Hook{ + OnStart: bootstrap(n), + OnStop: onclose(n), + }) +} + func closer(c io.Closer) fx.Hook { return fx.Hook{ OnStop: onclose(c), diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go new file mode 100644 index 00000000..c9fa3afe --- /dev/null +++ b/pkg/runtime/runtime_test.go @@ -0,0 +1,108 @@ +package runtime_test + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + inproc "github.com/lthibault/go-libp2p-inproc-transport" + "github.com/lthibault/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + casm "github.com/wetware/casm/pkg" + "github.com/wetware/casm/pkg/util/metrics" + "github.com/wetware/ww/pkg/runtime" + "go.uber.org/fx" +) + +func TestEnv_Options(t *testing.T) { + t.Parallel() + + /* + Test that optional env dependencies are created. + */ + + fs := flags{ + "ns": "test", + } + + env := runtime.Env{ + Flags: fs, + } + + vat := newVat() + defer vat.Host.Close() + + var ( + ctx context.Context + logger log.Logger + mc metrics.Client + ) + + app := fx.New(env.Options(), + fx.Supply(vat), + fx.Populate( + &ctx, + &logger, + &mc)) + + err := app.Start(context.Background()) + require.NoError(t, err, "fx application should start") + + assert.NotNil(t, ctx, "should populate context") + assert.NotNil(t, logger, "should populate logger") + assert.NotNil(t, mc, "should populate metrics") +} + +type flags map[string]interface{} + +func (f flags) IsSet(name string) bool { + _, ok := f[name] + return ok +} + +func (f flags) Bool(name string) bool { + v, _ := f[name].(bool) + return v +} + +func (f flags) Path(name string) string { + v, _ := f[name].(string) + return v +} + +func (f flags) String(name string) string { + v, _ := f[name].(string) + return v +} + +func (f flags) StringSlice(name string) []string { + v, _ := f[name].([]string) + return v +} + +func (f flags) Duration(name string) time.Duration { + v, _ := f[name].(time.Duration) + return v +} + +func (f flags) Float64(name string) float64 { + v, _ := f[name].(float64) + return v +} + +func newVat() casm.Vat { + h, err := libp2p.New( + libp2p.NoListenAddrs, + libp2p.NoTransports, + libp2p.Transport(inproc.New())) + if err != nil { + panic(err) + } + + return casm.Vat{ + NS: "test", + Host: h, + } +} diff --git a/internal/runtime/system.go b/pkg/runtime/system.go similarity index 80% rename from internal/runtime/system.go rename to pkg/runtime/system.go index 33085988..3a5e502d 100644 --- a/internal/runtime/system.go +++ b/pkg/runtime/system.go @@ -59,8 +59,8 @@ type meta struct { fields []routing.MetaField } -func metadata(env Env) (pulse.Preparer, error) { - ss := env.StringSlice("meta") +func metadata(flag Flags) (pulse.Preparer, error) { + ss := flag.StringSlice("meta") fs := make([]routing.MetaField, len(ss)) var err error @@ -94,29 +94,29 @@ func (m *meta) Prepare(h pulse.Heartbeat) error { return h.SetHost(host) } -func storage(env Env, lx fx.Lifecycle) (ds.Batching, error) { - if !env.IsSet("data") { +func storage(log log.Logger, flag Flags, lx fx.Lifecycle) (ds.Batching, error) { + if !flag.IsSet("data") { return memstore(), nil } - err := os.MkdirAll(storagePath(env), 0700) + err := os.MkdirAll(storagePath(flag), 0700) if err != nil { return nil, fmt.Errorf("mkdir: %w", err) } - return dbstore(env, lx) + return dbstore(log, flag, lx) } func memstore() ds.Batching { return ds_sync.MutexWrap(ds.NewMapDatastore()) } -func dbstore(env Env, lx fx.Lifecycle) (ds.Batching, error) { - log := newBadgerLogger(env) - badgerds.DefaultOptions.Logger = log +func dbstore(log log.Logger, flag Flags, lx fx.Lifecycle) (ds.Batching, error) { + logger := newBadgerLogger(log, flag) + badgerds.DefaultOptions.Logger = logger d, err := badgerds.NewDatastore( - storagePath(env), + storagePath(flag), &badgerds.DefaultOptions) if d == nil { lx.Append(closer(d)) @@ -126,8 +126,8 @@ func dbstore(env Env, lx fx.Lifecycle) (ds.Batching, error) { return d, err } -func storagePath(env Env) string { - return filepath.Join(env.Path("data"), "data") +func storagePath(flag Flags) string { + return filepath.Join(flag.Path("data"), "data") } func syncer(log log.Logger, s interface { @@ -143,9 +143,9 @@ func syncer(log log.Logger, s interface { type badgerLogger struct{ log.Logger } -func newBadgerLogger(env Env) badgerLogger { +func newBadgerLogger(log log.Logger, flag Flags) badgerLogger { return badgerLogger{ - Logger: env.Log().WithField("data_dir", storagePath(env)), + Logger: log.WithField("data_dir", storagePath(flag)), } } diff --git a/internal/runtime/vat.go b/pkg/runtime/vat.go similarity index 67% rename from internal/runtime/vat.go rename to pkg/runtime/vat.go index 3302d621..5f237db1 100644 --- a/internal/runtime/vat.go +++ b/pkg/runtime/vat.go @@ -9,6 +9,7 @@ import ( "go.uber.org/fx" casm "github.com/wetware/casm/pkg" + "github.com/wetware/casm/pkg/util/metrics" ) func (c Config) Vat() fx.Option { @@ -26,19 +27,19 @@ type hostFactoryConfig struct { Priv crypto.PrivKey } -func (c Config) newHostFactory(env Env, cfg hostFactoryConfig) casm.HostFactory { +func (c Config) newHostFactory(flag Flags, cfg hostFactoryConfig) casm.HostFactory { return c.newHost(append([]libp2p.Option{ - libp2p.ListenAddrStrings(env.StringSlice("listen")...), + libp2p.ListenAddrStrings(flag.StringSlice("listen")...), // libp2p.BandwidthReporter(cfg.Metrics), libp2p.Identity(cfg.Priv), }, c.hostOpt...)...) } -func newVat(env Env, lx fx.Lifecycle, f casm.HostFactory) (casm.Vat, error) { - vat, err := casm.New(env.String("ns"), f) +func newVat(mc metrics.Client, flag Flags, lx fx.Lifecycle, f casm.HostFactory) (casm.Vat, error) { + vat, err := casm.New(flag.String("ns"), f) if err == nil { lx.Append(closer(vat.Host)) - vat.Metrics = env.Metrics().WithPrefix("vat") + vat.Metrics = mc.WithPrefix("vat") } return vat, err