Skip to content

Commit

Permalink
Add echo server
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed May 8, 2024
1 parent 497deea commit c2aa662
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 13 deletions.
53 changes: 45 additions & 8 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
{
"label": "Dependencies",
"type": "shell",
"command": ["dart pub get"],
"command": [
"dart pub get"
],
"group": {
"kind": "none",
"isDefault": true
Expand All @@ -14,8 +16,12 @@
{
"label": "Get protoc plugin",
"type": "shell",
"command": ["dart pub global activate protoc_plugin"],
"dependsOn": ["Dependencies"],
"command": [
"dart pub global activate protoc_plugin"
],
"dependsOn": [
"Dependencies"
],
"group": {
"kind": "none",
"isDefault": true
Expand All @@ -30,7 +36,9 @@
"--proto_path=lib/src/transport/protobuf",
"--dart_out=lib/src/transport/protobuf lib/src/transport/protobuf/client.proto"
],
"dependsOn": ["Get protoc plugin"],
"dependsOn": [
"Get protoc plugin"
],
"group": {
"kind": "none",
"isDefault": true
Expand All @@ -40,8 +48,12 @@
{
"label": "Codegenerate",
"type": "shell",
"command": ["dart run build_runner build --delete-conflicting-outputs"],
"dependsOn": ["Dependencies"],
"command": [
"dart run build_runner build --delete-conflicting-outputs"
],
"dependsOn": [
"Dependencies"
],
"group": {
"kind": "none",
"isDefault": true
Expand Down Expand Up @@ -151,7 +163,10 @@
"label": "Stop Centrifugo Server",
"type": "shell",
"command": "docker",
"args": ["stop", "centrifugo"],
"args": [
"stop",
"centrifugo"
],
"group": {
"kind": "none",
"isDefault": true
Expand Down Expand Up @@ -187,6 +202,28 @@
"reveal": "always",
"panel": "dedicated"
}
},
{
"label": "Run echo server",
"type": "shell",
"command": "go",
"detail": "Running echo server",
"options": {
"cwd": "${workspaceFolder}/tool/echo",
},
"args": [
"run",
"main.go"
],
"group": {
"kind": "none",
"isDefault": true
},
"problemMatcher": [],
"presentation": {
"reveal": "always",
"panel": "dedicated"
}
}
]
}
}
5 changes: 3 additions & 2 deletions lib/src/model/spinify_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ abstract interface class ISpinify
ISpinifyPresenceOwner,
ISpinifyHistoryOwner,
ISpinifyRemoteProcedureCall,
ISpinifyMetricsOwner,
ISpinifyPing {
ISpinifyMetricsOwner {
/// Unique client identifier.
abstract final int id;

Expand Down Expand Up @@ -156,8 +155,10 @@ abstract interface class ISpinifyMetricsOwner {
SpinifyMetrics get metrics;
}

/*
/// Spinify ping interface.
abstract interface class ISpinifyPing {
/// Send ping to server.
Future<void> ping();
}
*/
4 changes: 2 additions & 2 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,13 @@ base mixin SpinifyPingPongMixin
@nonVirtual
Timer? _pingTimer;

@override
/* @override
Future<void> ping() => _bucket.push(
ClientEvent.command,
(int id, DateTime timestamp) => SpinifyPingRequest(
id: id,
timestamp: timestamp,
));
)); */

/// Stop keepalive timer.
@protected
Expand Down
4 changes: 3 additions & 1 deletion test/smoke/smoke_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:convert';

import 'package:spinify/spinify.dart';
import 'package:test/test.dart';

Expand All @@ -9,7 +11,7 @@ void main() {
await client.connect(url);
expect(client.state, isA<SpinifyState$Connected>());
//await client.ping();
//await client.send(utf8.encode('Hello, Spinify!'));
await client.send(utf8.encode('Hello from Spinify!'));
await client.disconnect();
expect(client.state, isA<SpinifyState$Disconnected>());
await client.close();
Expand Down
26 changes: 26 additions & 0 deletions tool/echo/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module github.com/PlugFox/spinify/echo

go 1.22

require github.com/centrifugal/centrifuge v0.32.2

require (
github.com/FZambia/eagle v0.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/centrifugal/protocol v0.12.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/redis/rueidis v1.0.35 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/segmentio/encoding v0.4.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
38 changes: 38 additions & 0 deletions tool/echo/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA=
github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/centrifugal/centrifuge v0.32.2 h1:iBq2Xx4PMxQtyADhcz2oF6kcXBHRNQatxX8r2mLa7IM=
github.com/centrifugal/centrifuge v0.32.2/go.mod h1:EqdCalAQ1YXtIO92ifTjNwFGQOtWoTpXQlh7MvZAK/E=
github.com/centrifugal/protocol v0.12.1 h1:hGbIl9Y0UbVsESgLcsqgZ7duwEnrZebFUYdu5Opwzgo=
github.com/centrifugal/protocol v0.12.1/go.mod h1:5Z0SuNdXEt83Fkoi34BCyY23p1P8+zQakQS6/BfJHak=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/rueidis v1.0.35 h1:S1q50VYRl8Hg/ekcF5UPZsRXD4GYDLLU2b+oEogycnI=
github.com/redis/rueidis v1.0.35/go.mod h1:bnbkk4+CkXZgDPEbUtSos/o55i4RhFYYesJ4DS2zmq0=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8=
github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
173 changes: 173 additions & 0 deletions tool/echo/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package main

import (
"context"
log "log"
"os"
"os/signal"
"time"

"io"
"net/http"
"syscall"

"github.com/centrifugal/centrifuge"
)

// waitExitSignal waits for the SIGINT or SIGTERM signal to shutdown the centrifuge node.
// It creates a channel to receive signals and a channel to indicate when the shutdown is complete.
// Then it notifies the channel for SIGINT and SIGTERM signals and starts a goroutine to wait for the signal.
// Once the signal is received, it shuts down the centrifuge node and indicates that the shutdown is complete.
func waitExitSignal(n *centrifuge.Node) {
// Create a channel to receive signals.
sigCh := make(chan os.Signal, 1)
// Create a channel to indicate when the shutdown is complete.
done := make(chan bool, 1)
// Notify the channel for SIGINT and SIGTERM signals.
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Start a goroutine to wait for the signal.
go func() {
<-sigCh
// Shutdown the centrifuge node.
_ = n.Shutdown(context.Background())
// Indicate that the shutdown is complete.
done <- true
}()
// Wait for the shutdown to complete.
<-done
}

// authMiddleware is a middleware function that adds credentials to the request context before passing it to the next handler.
// It sets the user ID, expiration time, and user information in the credentials.
// The middleware function takes in a http.Handler and returns a http.Handler.
func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
newCtx := centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
UserID: "42",
ExpireAt: time.Now().Unix() + 10,
Info: []byte(`{"name": "Test User"}`),
})
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}

// main function initializes a new Centrifuge node and sets up event handlers for client connections, subscriptions, and RPCs.
// It also sets up a websocket handler and a file server for serving static files.
// The function waits for an exit signal before shutting down the node and exiting.
func Centrifuge() (*centrifuge.Node, error) {
logLevel := centrifuge.LogLevelInfo

logHandler := func(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}
node, _ := centrifuge.New(centrifuge.Config{
LogLevel: logLevel,
LogHandler: logHandler,
Version: "0.0.0",
Name: "echo",
})

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
return centrifuge.ConnectReply{
Data: []byte(`{}`),
}, nil
})

node.OnConnect(func(client *centrifuge.Client) {
transport := client.Transport()
log.Printf("[user %s] connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())

go func() {
err := client.Send([]byte("hello"))
if err != nil {
if err == io.EOF {
return
}
log.Fatalf("Error sending message to [user %s]: %v", client.UserID(), err.Error())
}
}()

client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) {
log.Printf("[user %s] connection is going to expire, refreshing", client.UserID())
cb(centrifuge.RefreshReply{
ExpireAt: time.Now().Unix() + 10,
}, nil)
})

client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
log.Printf("[user %s] subscribes on %s", client.UserID(), e.Channel)
cb(centrifuge.SubscribeReply{}, nil)
})

client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
log.Printf("[user %s] unsubscribed from %s", client.UserID(), e.Channel)
})

client.OnMessage(func(e centrifuge.MessageEvent) {
log.Printf("[user %s] async message: %s", client.UserID(), string(e.Data))
client.Send(e.Data /* []byte("got your message") */) // echo back
})

client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
log.Printf("[user %s] publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data))
cb(centrifuge.PublishReply{}, nil)
})

client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
log.Printf("[user %s] sent RPC, data: %s, method: %s", client.UserID(), string(e.Data), e.Method)
switch e.Method {
case "getCurrentYear":
cb(centrifuge.RPCReply{Data: []byte(`{"year": "2020"}`)}, nil)
default:
cb(centrifuge.RPCReply{}, centrifuge.ErrorMethodNotFound)
}
})

client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
})
})

if err := node.Run(); err != nil {
return nil, err
}

return node, nil
}

// main function initializes a new Centrifuge node and sets up event handlers for client connections, subscriptions, and RPCs.
// It also sets up a websocket handler and a file server for serving static files.
// The function waits for an exit signal before shutting down the node and exiting.
func main() {
// Установка временной зоны по умолчанию в UTC
time.Local = time.UTC

// Инициализация Centrifuge
node, err := Centrifuge()
if err != nil {
log.Fatalf("Centrifuge start error: %v", err)
os.Exit(1)
}

// Serve Websocket connections using WebsocketHandler.
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})
http.Handle("/connection/websocket", authMiddleware(wsHandler))

// The second route is for serving index.html file.
//http.Handle("/", http.FileServer(http.Dir("./")))

log.Printf("Starting server, http://localhost:8000/connection/websocket")
if err := http.ListenAndServe(":8000", nil); err != nil {
log.Fatal(err)
}

log.Printf("Service started")

// Wait for an exit signal before shutting down the node and exiting.
waitExitSignal(node)

log.Printf("Server stopped")
os.Exit(0)
}

0 comments on commit c2aa662

Please sign in to comment.