diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 4f7d96b..d7e8541 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -4,7 +4,9 @@ { "label": "Dependencies", "type": "shell", - "command": ["dart pub get"], + "command": [ + "dart pub get" + ], "group": { "kind": "none", "isDefault": true @@ -14,8 +16,12 @@ { "label": "Get protoc plugin", "type": "shell", - "command": ["dart pub global activate protoc_plugin"], - "dependsOn": ["Dependencies"], + "command": [ + "dart pub global activate protoc_plugin" + ], + "dependsOn": [ + "Dependencies" + ], "group": { "kind": "none", "isDefault": true @@ -30,7 +36,9 @@ "--proto_path=lib/src/transport/protobuf", "--dart_out=lib/src/transport/protobuf lib/src/transport/protobuf/client.proto" ], - "dependsOn": ["Get protoc plugin"], + "dependsOn": [ + "Get protoc plugin" + ], "group": { "kind": "none", "isDefault": true @@ -40,8 +48,12 @@ { "label": "Codegenerate", "type": "shell", - "command": ["dart run build_runner build --delete-conflicting-outputs"], - "dependsOn": ["Dependencies"], + "command": [ + "dart run build_runner build --delete-conflicting-outputs" + ], + "dependsOn": [ + "Dependencies" + ], "group": { "kind": "none", "isDefault": true @@ -151,7 +163,10 @@ "label": "Stop Centrifugo Server", "type": "shell", "command": "docker", - "args": ["stop", "centrifugo"], + "args": [ + "stop", + "centrifugo" + ], "group": { "kind": "none", "isDefault": true @@ -187,6 +202,28 @@ "reveal": "always", "panel": "dedicated" } + }, + { + "label": "Run echo server", + "type": "shell", + "command": "go", + "detail": "Running echo server", + "options": { + "cwd": "${workspaceFolder}/tool/echo", + }, + "args": [ + "run", + "main.go" + ], + "group": { + "kind": "none", + "isDefault": true + }, + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "dedicated" + } } ] -} +} \ No newline at end of file diff --git a/lib/src/model/spinify_interface.dart b/lib/src/model/spinify_interface.dart index 8d03cdb..25dda50 100644 --- a/lib/src/model/spinify_interface.dart +++ b/lib/src/model/spinify_interface.dart @@ -25,8 +25,7 @@ abstract interface class ISpinify ISpinifyPresenceOwner, ISpinifyHistoryOwner, ISpinifyRemoteProcedureCall, - ISpinifyMetricsOwner, - ISpinifyPing { + ISpinifyMetricsOwner { /// Unique client identifier. abstract final int id; @@ -156,8 +155,10 @@ abstract interface class ISpinifyMetricsOwner { SpinifyMetrics get metrics; } +/* /// Spinify ping interface. abstract interface class ISpinifyPing { /// Send ping to server. Future ping(); } + */ \ No newline at end of file diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index 516be23..fe586f4 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -366,13 +366,13 @@ base mixin SpinifyPingPongMixin @nonVirtual Timer? _pingTimer; - @override + /* @override Future ping() => _bucket.push( ClientEvent.command, (int id, DateTime timestamp) => SpinifyPingRequest( id: id, timestamp: timestamp, - )); + )); */ /// Stop keepalive timer. @protected diff --git a/test/smoke/smoke_test.dart b/test/smoke/smoke_test.dart index 2237e0c..4a85020 100644 --- a/test/smoke/smoke_test.dart +++ b/test/smoke/smoke_test.dart @@ -1,3 +1,5 @@ +import 'dart:convert'; + import 'package:spinify/spinify.dart'; import 'package:test/test.dart'; @@ -9,7 +11,7 @@ void main() { await client.connect(url); expect(client.state, isA()); //await client.ping(); - //await client.send(utf8.encode('Hello, Spinify!')); + await client.send(utf8.encode('Hello from Spinify!')); await client.disconnect(); expect(client.state, isA()); await client.close(); diff --git a/tool/echo/go.mod b/tool/echo/go.mod new file mode 100644 index 0000000..b5bf7d7 --- /dev/null +++ b/tool/echo/go.mod @@ -0,0 +1,26 @@ +module github.com/PlugFox/spinify/echo + +go 1.22 + +require github.com/centrifugal/centrifuge v0.32.2 + +require ( + github.com/FZambia/eagle v0.1.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/centrifugal/protocol v0.12.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/prometheus/client_golang v1.19.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/redis/rueidis v1.0.35 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/segmentio/encoding v0.4.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.19.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/tool/echo/go.sum b/tool/echo/go.sum new file mode 100644 index 0000000..d5f5429 --- /dev/null +++ b/tool/echo/go.sum @@ -0,0 +1,38 @@ +github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA= +github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/centrifugal/centrifuge v0.32.2 h1:iBq2Xx4PMxQtyADhcz2oF6kcXBHRNQatxX8r2mLa7IM= +github.com/centrifugal/centrifuge v0.32.2/go.mod h1:EqdCalAQ1YXtIO92ifTjNwFGQOtWoTpXQlh7MvZAK/E= +github.com/centrifugal/protocol v0.12.1 h1:hGbIl9Y0UbVsESgLcsqgZ7duwEnrZebFUYdu5Opwzgo= +github.com/centrifugal/protocol v0.12.1/go.mod h1:5Z0SuNdXEt83Fkoi34BCyY23p1P8+zQakQS6/BfJHak= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/redis/rueidis v1.0.35 h1:S1q50VYRl8Hg/ekcF5UPZsRXD4GYDLLU2b+oEogycnI= +github.com/redis/rueidis v1.0.35/go.mod h1:bnbkk4+CkXZgDPEbUtSos/o55i4RhFYYesJ4DS2zmq0= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8= +github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/tool/echo/main.go b/tool/echo/main.go new file mode 100644 index 0000000..7799454 --- /dev/null +++ b/tool/echo/main.go @@ -0,0 +1,173 @@ +package main + +import ( + "context" + log "log" + "os" + "os/signal" + "time" + + "io" + "net/http" + "syscall" + + "github.com/centrifugal/centrifuge" +) + +// waitExitSignal waits for the SIGINT or SIGTERM signal to shutdown the centrifuge node. +// It creates a channel to receive signals and a channel to indicate when the shutdown is complete. +// Then it notifies the channel for SIGINT and SIGTERM signals and starts a goroutine to wait for the signal. +// Once the signal is received, it shuts down the centrifuge node and indicates that the shutdown is complete. +func waitExitSignal(n *centrifuge.Node) { + // Create a channel to receive signals. + sigCh := make(chan os.Signal, 1) + // Create a channel to indicate when the shutdown is complete. + done := make(chan bool, 1) + // Notify the channel for SIGINT and SIGTERM signals. + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + // Start a goroutine to wait for the signal. + go func() { + <-sigCh + // Shutdown the centrifuge node. + _ = n.Shutdown(context.Background()) + // Indicate that the shutdown is complete. + done <- true + }() + // Wait for the shutdown to complete. + <-done +} + +// authMiddleware is a middleware function that adds credentials to the request context before passing it to the next handler. +// It sets the user ID, expiration time, and user information in the credentials. +// The middleware function takes in a http.Handler and returns a http.Handler. +func authMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ + UserID: "42", + ExpireAt: time.Now().Unix() + 10, + Info: []byte(`{"name": "Test User"}`), + }) + r = r.WithContext(newCtx) + h.ServeHTTP(w, r) + }) +} + +// main function initializes a new Centrifuge node and sets up event handlers for client connections, subscriptions, and RPCs. +// It also sets up a websocket handler and a file server for serving static files. +// The function waits for an exit signal before shutting down the node and exiting. +func Centrifuge() (*centrifuge.Node, error) { + logLevel := centrifuge.LogLevelInfo + + logHandler := func(e centrifuge.LogEntry) { + log.Printf("%s: %v", e.Message, e.Fields) + } + node, _ := centrifuge.New(centrifuge.Config{ + LogLevel: logLevel, + LogHandler: logHandler, + Version: "0.0.0", + Name: "echo", + }) + + node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { + return centrifuge.ConnectReply{ + Data: []byte(`{}`), + }, nil + }) + + node.OnConnect(func(client *centrifuge.Client) { + transport := client.Transport() + log.Printf("[user %s] connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) + + go func() { + err := client.Send([]byte("hello")) + if err != nil { + if err == io.EOF { + return + } + log.Fatalf("Error sending message to [user %s]: %v", client.UserID(), err.Error()) + } + }() + + client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) { + log.Printf("[user %s] connection is going to expire, refreshing", client.UserID()) + cb(centrifuge.RefreshReply{ + ExpireAt: time.Now().Unix() + 10, + }, nil) + }) + + client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { + log.Printf("[user %s] subscribes on %s", client.UserID(), e.Channel) + cb(centrifuge.SubscribeReply{}, nil) + }) + + client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { + log.Printf("[user %s] unsubscribed from %s", client.UserID(), e.Channel) + }) + + client.OnMessage(func(e centrifuge.MessageEvent) { + log.Printf("[user %s] async message: %s", client.UserID(), string(e.Data)) + client.Send(e.Data /* []byte("got your message") */) // echo back + }) + + client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { + log.Printf("[user %s] publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data)) + cb(centrifuge.PublishReply{}, nil) + }) + + client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) { + log.Printf("[user %s] sent RPC, data: %s, method: %s", client.UserID(), string(e.Data), e.Method) + switch e.Method { + case "getCurrentYear": + cb(centrifuge.RPCReply{Data: []byte(`{"year": "2020"}`)}, nil) + default: + cb(centrifuge.RPCReply{}, centrifuge.ErrorMethodNotFound) + } + }) + + client.OnDisconnect(func(e centrifuge.DisconnectEvent) { + log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) + }) + }) + + if err := node.Run(); err != nil { + return nil, err + } + + return node, nil +} + +// main function initializes a new Centrifuge node and sets up event handlers for client connections, subscriptions, and RPCs. +// It also sets up a websocket handler and a file server for serving static files. +// The function waits for an exit signal before shutting down the node and exiting. +func main() { + // Установка временной зоны по умолчанию в UTC + time.Local = time.UTC + + // Инициализация Centrifuge + node, err := Centrifuge() + if err != nil { + log.Fatalf("Centrifuge start error: %v", err) + os.Exit(1) + } + + // Serve Websocket connections using WebsocketHandler. + wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}) + http.Handle("/connection/websocket", authMiddleware(wsHandler)) + + // The second route is for serving index.html file. + //http.Handle("/", http.FileServer(http.Dir("./"))) + + log.Printf("Starting server, http://localhost:8000/connection/websocket") + if err := http.ListenAndServe(":8000", nil); err != nil { + log.Fatal(err) + } + + log.Printf("Service started") + + // Wait for an exit signal before shutting down the node and exiting. + waitExitSignal(node) + + log.Printf("Server stopped") + os.Exit(0) +}