diff --git a/CHANGELOG.md b/CHANGELOG.md index d859ca28..f2965440 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ Starting from v2.2.5, all notable changes to this project will be documented in this file. +## v3.1.0 + +### New Features + +- Added support for forwarding raw messages via TCP. +- Re-support for the SeedLink protocol. +- Updated Go version to v1.23.0. +- Updated Gin version to v1.10.0. + ## v3.0.5 ### New Features diff --git a/VERSION b/VERSION index 29b317f0..6c8dc7eb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v3.0.5 +v3.1.0 diff --git a/api/v1/socket/module.go b/api/v1/socket/module.go index 2ae0c094..f46f5329 100644 --- a/api/v1/socket/module.go +++ b/api/v1/socket/module.go @@ -75,7 +75,7 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error { logger.GetLogger(s.GetApiName()).Errorln(err) return } - defer s.Unsubscribe(clienrId) + defer s.unsubscribe(clienrId) // Listen for incoming messages for { diff --git a/api/v1/socket/unsubscribe.go b/api/v1/socket/unsubscribe.go index 4b32c3b6..01d87573 100644 --- a/api/v1/socket/unsubscribe.go +++ b/api/v1/socket/unsubscribe.go @@ -4,7 +4,7 @@ import ( "errors" ) -func (s *Socket) Unsubscribe(clientId string) error { +func (s *Socket) unsubscribe(clientId string) error { fn, ok := s.subscribers.Get(clientId) if !ok { return errors.New("this client has not subscribed") diff --git a/api/v2/generated.go b/api/v2/generated.go index a826e0a1..394d2739 100644 --- a/api/v2/generated.go +++ b/api/v2/generated.go @@ -46,12 +46,12 @@ type DirectiveRoot struct { type ComplexityRoot struct { Query struct { - Test func(childComplexity int) int + Ping func(childComplexity int) int } } type QueryResolver interface { - Test(ctx context.Context) (*bool, error) + Ping(ctx context.Context) (*string, error) } type executableSchema struct { @@ -73,12 +73,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in _ = ec switch typeName + "." + field { - case "Query.test": - if e.complexity.Query.Test == nil { + case "Query.ping": + if e.complexity.Query.Ping == nil { break } - return e.complexity.Query.Test(childComplexity), true + return e.complexity.Query.Ping(childComplexity), true } return 0, false @@ -241,8 +241,8 @@ func (ec *executionContext) field___Type_fields_args(ctx context.Context, rawArg // region **************************** field.gotpl ***************************** -func (ec *executionContext) _Query_test(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Query_test(ctx, field) +func (ec *executionContext) _Query_ping(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_ping(ctx, field) if err != nil { return graphql.Null } @@ -255,7 +255,7 @@ func (ec *executionContext) _Query_test(ctx context.Context, field graphql.Colle }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().Test(rctx) + return ec.resolvers.Query().Ping(rctx) }) if err != nil { ec.Error(ctx, err) @@ -264,19 +264,19 @@ func (ec *executionContext) _Query_test(ctx context.Context, field graphql.Colle if resTmp == nil { return graphql.Null } - res := resTmp.(*bool) + res := resTmp.(*string) fc.Result = res - return ec.marshalOBoolean2ᚖbool(ctx, field.Selections, res) + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Query_test(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Query_ping(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Query", Field: field, IsMethod: true, IsResolver: true, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type Boolean does not have child fields") + return nil, errors.New("field of type String does not have child fields") }, } return fc, nil @@ -2211,7 +2211,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Query") - case "test": + case "ping": field := field innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) { @@ -2220,7 +2220,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr ec.Error(ctx, ec.Recover(ctx, r)) } }() - res = ec._Query_test(ctx, field) + res = ec._Query_ping(ctx, field) return res } diff --git a/api/v2/schema.graphqls b/api/v2/schema.graphqls index 6f761390..118db426 100644 --- a/api/v2/schema.graphqls +++ b/api/v2/schema.graphqls @@ -1,5 +1,5 @@ scalar Int64 type Query { - test: Boolean + ping: String } diff --git a/api/v2/schema.resolvers.go b/api/v2/schema.resolvers.go index e4ec4419..e71661fa 100644 --- a/api/v2/schema.resolvers.go +++ b/api/v2/schema.resolvers.go @@ -6,12 +6,12 @@ package v2 import ( "context" - "fmt" ) -// Test is the resolver for the test field. -func (r *queryResolver) Test(ctx context.Context) (*bool, error) { - panic(fmt.Errorf("not implemented: Test - test")) +// Ping is the resolver for the ping field. +func (r *queryResolver) Ping(ctx context.Context) (*string, error) { + res := "pong" + return &res, nil } // Query returns QueryResolver implementation. diff --git a/build/assets/config.json b/build/assets/config.json index 833cdb32..fd11f5c7 100644 --- a/build/assets/config.json +++ b/build/assets/config.json @@ -14,7 +14,7 @@ "explorer_settings": { "dsn": "transport:///dev/ttyUSB0?baudrate=115200", "engine": "serial", - "legacy": true + "legacy": false }, "sensor_settings": { "frequency": 4.5, @@ -62,6 +62,16 @@ "archiver": { "enable": true, "lifecycle": 10 + }, + "forwarder": { + "enable": true, + "host": "0.0.0.0", + "port": 30000 + }, + "seedlink": { + "enable": true, + "host": "0.0.0.0", + "port": 18000 } } } \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 6418d559..8708175c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -21,7 +21,9 @@ import ( "github.com/anyshake/observer/services" service_archiver "github.com/anyshake/observer/services/archiver" + service_forwarder "github.com/anyshake/observer/services/forwarder" service_miniseed "github.com/anyshake/observer/services/miniseed" + service_seedlink "github.com/anyshake/observer/services/seedlink" service_timesync "github.com/anyshake/observer/services/timesync" service_watchdog "github.com/anyshake/observer/services/watchdog" "github.com/anyshake/observer/startups" @@ -177,6 +179,8 @@ func main() { &service_archiver.ArchiverService{}, &service_miniseed.MiniSeedService{}, &service_timesync.TimeSyncService{}, + &service_seedlink.SeedLinkService{}, + &service_forwarder.ForwarderService{}, } serviceOptions := &services.Options{ Config: &conf, diff --git a/drivers/explorer/impl.go b/drivers/explorer/impl.go index e6a891e2..b33d3361 100644 --- a/drivers/explorer/impl.go +++ b/drivers/explorer/impl.go @@ -46,20 +46,22 @@ func (g *legacyPacket) decode(data []byte) error { // Using XOR algorithm calc_checksum := [3]uint8{0, 0, 0} - z_axis_offset := 0 - e_axis_offset := int(unsafe.Sizeof(int32(0)) * legacy_packet_channel_size) - n_axis_offset := int(unsafe.Sizeof(int32(0)) * legacy_packet_channel_size * 2) - for i := z_axis_offset; i < e_axis_offset; i++ { + z_axis_offset := int(unsafe.Sizeof(g.Z_Axis)) + for i := 0; i < z_axis_offset; i++ { calc_checksum[0] ^= data[i] } - for i := e_axis_offset; i < n_axis_offset; i++ { + e_axis_offset := z_axis_offset + int(unsafe.Sizeof(g.E_Axis)) + for i := z_axis_offset; i < e_axis_offset; i++ { calc_checksum[1] ^= data[i] } - for i := e_axis_offset; i < len(data)-int(unsafe.Sizeof([3]uint8{})); i++ { + n_axis_offset := e_axis_offset + int(unsafe.Sizeof(g.N_Axis)) + for i := e_axis_offset; i < n_axis_offset; i++ { calc_checksum[2] ^= data[i] } - if bytes.Equal(g.Checksum[:], calc_checksum[:]) { - return fmt.Errorf("checksum mismatch, expected %v, got %v", g.Checksum, calc_checksum) + for i := 0; i < len(calc_checksum); i++ { + if calc_checksum[i] != g.Checksum[i] { + return fmt.Errorf("checksum mismatch, expected %v, got %v", g.Checksum, calc_checksum) + } } return nil @@ -123,7 +125,7 @@ type mainlinePacketChannel struct { func (g *mainlinePacketChannel) length(sampleRate int) int { return 3*sampleRate*int(unsafe.Sizeof(int32(0))) + // Z, E, N axis data - int(unsafe.Sizeof(uint32(0))) // Checksum of Z, E, N axis + int(unsafe.Sizeof(g.checksum)) // Checksum of Z, E, N axis } func (g *mainlinePacketChannel) decode(data []byte, sampleRate int) error { @@ -198,11 +200,11 @@ type ExplorerDriverImpl struct { } func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { - fifoBuffer := fifo.New(16384) + fifoBuffer := fifo.New(e.legacyPacket.length() * 512) // Read data from the transport continuously go func() { - buf := make([]byte, 1024) + buf := make([]byte, e.legacyPacket.length()) for { select { case <-deps.CancelToken.Done(): @@ -218,7 +220,7 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { } }() - // reference: https://stackoverflow.com/a/51424566 + // Reference: https://stackoverflow.com/a/51424566 // Calculate the duration to the next whole second to allivate the drift calcDuration := func(currentTime time.Time, duration time.Duration) time.Duration { return currentTime.Round(duration).Add(duration).Sub(currentTime) @@ -268,13 +270,9 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { ticker.Reset(calcDuration(currentTick, time.Second)) } - case <-time.After(2 * time.Millisecond): - for { - dat, err := fifoBuffer.Read(legacy_packet_frame_header, len(legacy_packet_frame_header)+e.legacyPacket.length()) - if err != nil { - break - } - + case <-time.After(500 * time.Microsecond): + dat, err := fifoBuffer.Read(legacy_packet_frame_header, len(legacy_packet_frame_header)+e.legacyPacket.length()) + if err == nil { // Read the packet data err = e.legacyPacket.decode(dat[len(legacy_packet_frame_header):]) if err != nil { diff --git a/drivers/explorer/types.go b/drivers/explorer/types.go index dde4cf10..bda35e8f 100644 --- a/drivers/explorer/types.go +++ b/drivers/explorer/types.go @@ -10,7 +10,7 @@ import ( messagebus "github.com/vardius/message-bus" ) -const EXPLORER_ALLOWED_JITTER_MS = 2 +const EXPLORER_ALLOWED_JITTER_MS = 5 const ( EXPLORER_CHANNEL_CODE_Z = "Z" diff --git a/frontend/src/.env b/frontend/src/.env index eaf4f8f1..3f719e9b 100644 --- a/frontend/src/.env +++ b/frontend/src/.env @@ -1,2 +1,2 @@ -REACT_APP_VERSION=v3.0.5 -REACT_APP_RELEASE=0e50a2dc-20240822174555 +REACT_APP_VERSION=v3.1.0 +REACT_APP_RELEASE=01429d85-20240825141612 diff --git a/go.mod b/go.mod index 160aa2d7..74edddc3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/anyshake/observer -go 1.21.5 +go 1.23.0 require ( github.com/99designs/gqlgen v0.17.49 @@ -9,11 +9,12 @@ require ( github.com/bclswl0827/go-serial v0.0.1 github.com/bclswl0827/mseedio v1.0.9 github.com/bclswl0827/sacio v1.0.6 + github.com/bclswl0827/slgo v0.0.3 github.com/bclswl0827/sqlite v1.11.1-0.20240613172512-9e6ac9861470 github.com/beevik/ntp v1.4.3 github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be github.com/gin-contrib/gzip v0.0.6 - github.com/gin-gonic/gin v1.9.0 + github.com/gin-gonic/gin v1.10.0 github.com/gorilla/websocket v1.5.0 github.com/juju/ratelimit v1.0.2 github.com/mackerelio/go-osstat v0.2.5 @@ -69,6 +70,10 @@ require ( require ( github.com/agnivade/levenshtein v1.1.1 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/clbanning/anyxml v1.2.2 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -80,26 +85,25 @@ require ( ) require ( - github.com/bytedance/sonic v1.8.0 // indirect - github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect + github.com/bytedance/sonic v1.11.6 // indirect github.com/creack/goselect v0.1.3-0.20221130125424-8eac7f782437 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.19.0 - github.com/goccy/go-json v0.10.0 // indirect + github.com/go-playground/validator/v10 v10.20.0 + github.com/goccy/go-json v0.10.2 // indirect github.com/json-iterator/go v1.1.12 github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pelletier/go-toml/v2 v2.0.6 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/ugorji/go/codec v1.2.9 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect + golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.21.0 // indirect diff --git a/go.sum b/go.sum index e5e0c5dc..b4163bde 100644 --- a/go.sum +++ b/go.sum @@ -38,16 +38,22 @@ github.com/bclswl0827/mseedio v1.0.9 h1:t5w6K+yjWxl1PxlKy1YpOFf5wKE/r5/bqesWLIrF github.com/bclswl0827/mseedio v1.0.9/go.mod h1:G9nOPR8epxb7ihX5pUpvs3HqYZMnMlXc1Ha1WSMWn7s= github.com/bclswl0827/sacio v1.0.6 h1:kR6ll6xjoQGO+YVEZvvvbAxKGUD3qUXsyIuQape1+vc= github.com/bclswl0827/sacio v1.0.6/go.mod h1:SuiIK7hBHCJvxUOC/rgbJRUO5IoFrEnuCY0xVJ7RyQk= +github.com/bclswl0827/slgo v0.0.3 h1:YISf1FdMfSzFa7MCpbc/MqFj8LuwkHhRXj6olXtbtrM= +github.com/bclswl0827/slgo v0.0.3/go.mod h1:OpplhVusu0BqYRhoq/vNyZ+55t0rCnrIbDib0j54H+c= github.com/bclswl0827/sqlite v1.11.1-0.20240613172512-9e6ac9861470 h1:VYRieWhizTvSnQ3iJ9cVvB+4go+f/2mTp6CC2MSXnn4= github.com/bclswl0827/sqlite v1.11.1-0.20240613172512-9e6ac9861470/go.mod h1:/xElCFPGgTMHBhfSjGBlITMQ/qe6CLTinWdxSbc4oDE= github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho= github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q= -github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA= -github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= -github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/clbanning/anyxml v1.2.2 h1:EqBXr26KEC7tuGLDH7ZSeFmaE3yft0h386m5uydpNbU= +github.com/clbanning/anyxml v1.2.2/go.mod h1:m8+zXuK8aS9lnXzfpSLUUjXoqcZ41osGX+JXr09eOjY= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be h1:J5BL2kskAlV9ckgEsNQXscjIaLiOYiZ75d4e94E6dcQ= github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be/go.mod h1:mk5IQ+Y0ZeO87b858TlA645sVcEcbiX6YqP98kt+7+w= github.com/creack/goselect v0.1.3-0.20221130125424-8eac7f782437 h1:vR0VDJLclKuJceyXLWbj8ZFPvUvOkSSQ6fJ63R4pzRY= @@ -69,8 +75,8 @@ github.com/gin-contrib/gzip v0.0.6/go.mod h1:QOJlmV2xmayAjkNS2Y8NQsMneuRShOU/kjo github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk= -github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8= -github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/glebarez/go-sqlite v1.22.0 h1:uAcMJhaA6r3LHMTFgP0SifzgXg46yJkgxqyuyec+ruQ= github.com/glebarez/go-sqlite v1.22.0/go.mod h1:PlBIdHe0+aUEFn+r2/uthrWq4FxbzugL0L8Li6yQJbc= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= @@ -100,13 +106,13 @@ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos= -github.com/go-playground/validator/v10 v10.19.0 h1:ol+5Fu+cSq9JD7SoSqe04GMI92cbn0+wvQ3bZ8b/AU4= -github.com/go-playground/validator/v10 v10.19.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= +github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= -github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= @@ -160,6 +166,7 @@ github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSg github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -201,8 +208,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= -github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= -github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -229,6 +236,7 @@ github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERA github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -254,8 +262,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= -github.com/ugorji/go/codec v1.2.9 h1:rmenucSohSTiyL09Y+l2OCk+FrMxGMzho2+tjr5ticU= -github.com/ugorji/go/codec v1.2.9/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/vardius/message-bus v1.1.5 h1:YSAC2WB4HRlwc4neFPTmT88kzzoiQ+9WRRbej/E/LZc= github.com/vardius/message-bus v1.1.5/go.mod h1:6xladCV2lMkUAE4bzzS85qKOiB5miV7aBVRafiTJGqw= github.com/vektah/gqlparser/v2 v2.5.16 h1:1gcmLTvs3JLKXckwCwlUagVn/IlV2bwqle0vJ0vy5p8= @@ -271,8 +279,9 @@ go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc= go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= +golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -404,4 +413,5 @@ modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/services/archiver/start.go b/services/archiver/start.go index 69378a69..a84fbfba 100644 --- a/services/archiver/start.go +++ b/services/archiver/start.go @@ -40,7 +40,8 @@ func (a *ArchiverService) Start(options *services.Options, waitGroup *sync.WaitG explorerDriver.Subscribe(explorerDeps, a.GetServiceName(), a.handleExplorerEvent) logger.GetLogger(a.GetServiceName()).Infoln("service has been started") + defer logger.GetLogger(a.GetServiceName()).Infoln("service has been stopped") + <-options.CancelToken.Done() explorerDriver.Unsubscribe(explorerDeps, a.GetServiceName()) - logger.GetLogger(a.GetServiceName()).Infoln("service has been stopped") } diff --git a/services/forwarder/checksum.go b/services/forwarder/checksum.go new file mode 100644 index 00000000..f691990b --- /dev/null +++ b/services/forwarder/checksum.go @@ -0,0 +1,17 @@ +package forwarder + +import "unsafe" + +func (a *ForwarderService) getChecksum(arr []int32) uint8 { + checksum := uint8(0) + + for i := 0; i < len(arr); i++ { + bytes := (*[4]byte)(unsafe.Pointer(&arr[i]))[:] + + for j := 0; j < int(unsafe.Sizeof(int32(0))); j++ { + checksum ^= bytes[j] + } + } + + return checksum +} diff --git a/services/forwarder/handler.go b/services/forwarder/handler.go new file mode 100644 index 00000000..923fd0ef --- /dev/null +++ b/services/forwarder/handler.go @@ -0,0 +1,38 @@ +package forwarder + +import ( + "fmt" + "net" + "strings" + + "github.com/anyshake/observer/drivers/explorer" + "github.com/anyshake/observer/utils/logger" +) + +func (a *ForwarderService) handleConnection(conn net.Conn) { + defer a.unsubscribe(conn.RemoteAddr().String()) + defer conn.Close() + + logger.GetLogger(a.GetServiceName()).Infof("accepted connection from %s", conn.RemoteAddr().String()) + defer logger.GetLogger(a.GetServiceName()).Infof("closed connection from %s", conn.RemoteAddr().String()) + + a.subscribe(conn.RemoteAddr().String(), func(data *explorer.ExplorerData) { + dataBytes := []byte(fmt.Sprintf("$%s,%s,%s,%s,%d,%d,%s,*%02X\r\n$%s,%s,%s,%s,%d,%d,%s,*%02X\r\n$%s,%s,%s,%s,%d,%d,%s,*%02X\r\n", + a.networkCode, a.stationCode, a.locationCode, fmt.Sprintf("%sZ", a.channelPrefix), data.Timestamp, data.SampleRate, strings.Trim(strings.Replace(fmt.Sprint(data.Z_Axis), " ", ",", -1), "[]"), a.getChecksum(data.Z_Axis), + a.networkCode, a.stationCode, a.locationCode, fmt.Sprintf("%sE", a.channelPrefix), data.Timestamp, data.SampleRate, strings.Trim(strings.Replace(fmt.Sprint(data.E_Axis), " ", ",", -1), "[]"), a.getChecksum(data.E_Axis), + a.networkCode, a.stationCode, a.locationCode, fmt.Sprintf("%sN", a.channelPrefix), data.Timestamp, data.SampleRate, strings.Trim(strings.Replace(fmt.Sprint(data.N_Axis), " ", ",", -1), "[]"), a.getChecksum(data.N_Axis), + )) + _, err := conn.Write(dataBytes) + if err != nil { + logger.GetLogger(a.GetServiceName()).Errorln(err) + return + } + }) + + for { + _, err := conn.Read(make([]byte, 1)) + if err != nil { + return + } + } +} diff --git a/services/forwarder/name.go b/services/forwarder/name.go new file mode 100644 index 00000000..9e691b2a --- /dev/null +++ b/services/forwarder/name.go @@ -0,0 +1,5 @@ +package forwarder + +func (s *ForwarderService) GetServiceName() string { + return "forwarder" +} diff --git a/services/forwarder/start.go b/services/forwarder/start.go new file mode 100644 index 00000000..338658dc --- /dev/null +++ b/services/forwarder/start.go @@ -0,0 +1,78 @@ +package forwarder + +import ( + "fmt" + "net" + "sync" + + "github.com/anyshake/observer/drivers/explorer" + "github.com/anyshake/observer/services" + "github.com/anyshake/observer/utils/logger" + cmap "github.com/orcaman/concurrent-map/v2" + messagebus "github.com/vardius/message-bus" +) + +func (a *ForwarderService) Start(options *services.Options, waitGroup *sync.WaitGroup) { + defer waitGroup.Done() + + serviceConfig, ok := options.Config.Services[a.GetServiceName()] + if !ok { + logger.GetLogger(a.GetServiceName()).Errorln("service configuration not found") + return + } + if !serviceConfig.(map[string]any)["enable"].(bool) { + logger.GetLogger(a.GetServiceName()).Infoln("service has been disabled") + return + } + serverHost := serviceConfig.(map[string]any)["host"].(string) + serverPort := int(serviceConfig.(map[string]any)["port"].(float64)) + a.stationCode = options.Config.Stream.Station + a.networkCode = options.Config.Stream.Network + a.locationCode = options.Config.Stream.Location + a.channelPrefix = options.Config.Stream.Channel + + a.subscribers = cmap.New[explorer.ExplorerEventHandler]() + a.messageBus = messagebus.New(65535) + + // Forward events to internal message bus + var explorerDeps *explorer.ExplorerDependency + err := options.Dependency.Invoke(func(deps *explorer.ExplorerDependency) error { + explorerDeps = deps + return nil + }) + if err != nil { + logger.GetLogger(a.GetServiceName()).Errorln(err) + return + } + explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{}) + explorerDriver.Subscribe( + explorerDeps, + a.GetServiceName(), + func(data *explorer.ExplorerData) { a.messageBus.Publish(a.GetServiceName(), data) }, + ) + + // Create TCP server to forward events + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", serverHost, serverPort)) + if err != nil { + logger.GetLogger(a.GetServiceName()).Errorln(err) + return + } + defer listener.Close() + logger.GetLogger(a.GetServiceName()).Infof("forwarder is listening on %s:%d", serverHost, serverPort) + + go func() { + for { + conn, err := listener.Accept() + if err != nil { + continue + } + go a.handleConnection(conn) + } + }() + + logger.GetLogger(a.GetServiceName()).Infoln("service has been started") + defer logger.GetLogger(a.GetServiceName()).Infoln("service has been stopped") + + <-options.CancelToken.Done() + explorerDriver.Unsubscribe(explorerDeps, a.GetServiceName()) +} diff --git a/services/forwarder/subscribe.go b/services/forwarder/subscribe.go new file mode 100644 index 00000000..637bef5e --- /dev/null +++ b/services/forwarder/subscribe.go @@ -0,0 +1,16 @@ +package forwarder + +import ( + "errors" + + "github.com/anyshake/observer/drivers/explorer" +) + +func (s *ForwarderService) subscribe(clientId string, handler explorer.ExplorerEventHandler) error { + if _, ok := s.subscribers.Get(clientId); ok { + return errors.New("this client has already subscribed") + } + s.subscribers.Set(clientId, handler) + s.messageBus.Subscribe(s.GetServiceName(), handler) + return nil +} diff --git a/services/forwarder/types.go b/services/forwarder/types.go new file mode 100644 index 00000000..f289dda1 --- /dev/null +++ b/services/forwarder/types.go @@ -0,0 +1,16 @@ +package forwarder + +import ( + "github.com/anyshake/observer/drivers/explorer" + cmap "github.com/orcaman/concurrent-map/v2" + messagebus "github.com/vardius/message-bus" +) + +type ForwarderService struct { + messageBus messagebus.MessageBus // An independent message bus for the socket module + subscribers cmap.ConcurrentMap[string, explorer.ExplorerEventHandler] + stationCode string + networkCode string + locationCode string + channelPrefix string +} diff --git a/services/forwarder/unsubscribe.go b/services/forwarder/unsubscribe.go new file mode 100644 index 00000000..e6288634 --- /dev/null +++ b/services/forwarder/unsubscribe.go @@ -0,0 +1,15 @@ +package forwarder + +import ( + "errors" +) + +func (s *ForwarderService) unsubscribe(clientId string) error { + fn, ok := s.subscribers.Get(clientId) + if !ok { + return errors.New("this client has not subscribed") + } + s.messageBus.Unsubscribe(s.GetServiceName(), fn) + s.subscribers.Remove(clientId) + return nil +} diff --git a/services/miniseed/start.go b/services/miniseed/start.go index b1f57c86..5ea01386 100644 --- a/services/miniseed/start.go +++ b/services/miniseed/start.go @@ -58,7 +58,7 @@ func (m *MiniSeedService) Start(options *services.Options, waitGroup *sync.WaitG ) _, err := os.Stat(filePath) if err == nil { - logger.GetLogger(m.GetServiceName()).Infof("starting %s from last record", channelName) + logger.GetLogger(m.GetServiceName()).Infof("reading existing record from %s", filePath) // Get last sequence number from file var ms mseedio.MiniSeedData @@ -78,6 +78,7 @@ func (m *MiniSeedService) Start(options *services.Options, waitGroup *sync.WaitG // Set current sequence number m.miniseedSequence[channelCode] = n + logger.GetLogger(m.GetServiceName()).Infof("starting %s from last record, sequence %d", channelName, n) } } } @@ -96,7 +97,8 @@ func (m *MiniSeedService) Start(options *services.Options, waitGroup *sync.WaitG explorerDriver.Subscribe(explorerDeps, m.GetServiceName(), m.handleExplorerEvent) logger.GetLogger(m.GetServiceName()).Infoln("service has been started") + defer logger.GetLogger(m.GetServiceName()).Infoln("service has been stopped") + <-options.CancelToken.Done() explorerDriver.Unsubscribe(explorerDeps, m.GetServiceName()) - logger.GetLogger(m.GetServiceName()).Infoln("service has been stopped") } diff --git a/services/miniseed/write.go b/services/miniseed/write.go index 88cafa95..cda289df 100644 --- a/services/miniseed/write.go +++ b/services/miniseed/write.go @@ -16,7 +16,7 @@ func (m *MiniSeedService) handleWrite() error { ) for i := 1; i < len(m.miniseedBuffer); i++ { - // Make sure timestamp is continuous + // Make sure timestamp is increasing by 1000 ms with allowed jitter if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { return fmt.Errorf( "timestamp is not within allowed jitter %d ms, expected %d, got %d", @@ -25,11 +25,6 @@ func (m *MiniSeedService) handleWrite() error { m.miniseedBuffer[i].Timestamp, ) } - - // Make sure sample rate is the same - if m.miniseedBuffer[i].SampleRate != startSampleRate { - return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate) - } } // Write data to file by channels diff --git a/services/seedlink/consumer.go b/services/seedlink/consumer.go new file mode 100644 index 00000000..7e14e65d --- /dev/null +++ b/services/seedlink/consumer.go @@ -0,0 +1,64 @@ +package seedlink + +import ( + "errors" + "fmt" + + "github.com/anyshake/observer/drivers/explorer" + "github.com/bclswl0827/slgo/handlers" + cmap "github.com/orcaman/concurrent-map/v2" + messagebus "github.com/vardius/message-bus" +) + +type consumer struct { + channelPrefix string + serviceName string + messageBus messagebus.MessageBus // An independent message bus for the socket module + subscribers cmap.ConcurrentMap[string, explorer.ExplorerEventHandler] +} + +func (c *consumer) Subscribe(clientId string, channels []string, eventHandler func(handlers.SeedLinkDataPacket)) error { + if _, ok := c.subscribers.Get(clientId); ok { + return errors.New("this client has already subscribed") + } + handler := func(data *explorer.ExplorerData) { + for _, channel := range channels { + switch channel { + case fmt.Sprintf("%sZ", c.channelPrefix): + eventHandler(handlers.SeedLinkDataPacket{ + Timestamp: data.Timestamp, + SampleRate: data.SampleRate, + Channel: channel, + DataArr: data.Z_Axis, + }) + case fmt.Sprintf("%sE", c.channelPrefix): + eventHandler(handlers.SeedLinkDataPacket{ + Timestamp: data.Timestamp, + SampleRate: data.SampleRate, + Channel: channel, + DataArr: data.E_Axis, + }) + case fmt.Sprintf("%sN", c.channelPrefix): + eventHandler(handlers.SeedLinkDataPacket{ + Timestamp: data.Timestamp, + SampleRate: data.SampleRate, + Channel: channel, + DataArr: data.N_Axis, + }) + } + } + } + c.subscribers.Set(clientId, handler) + c.messageBus.Subscribe(c.serviceName, handler) + return nil +} + +func (c *consumer) Unsubscribe(clientId string) error { + fn, ok := c.subscribers.Get(clientId) + if !ok { + return errors.New("this client has not subscribed") + } + c.messageBus.Unsubscribe(c.serviceName, fn) + c.subscribers.Remove(clientId) + return nil +} diff --git a/services/seedlink/hooks.go b/services/seedlink/hooks.go new file mode 100644 index 00000000..76765efc --- /dev/null +++ b/services/seedlink/hooks.go @@ -0,0 +1,24 @@ +package seedlink + +import ( + "github.com/anyshake/observer/utils/logger" + "github.com/bclswl0827/slgo/handlers" +) + +type hooks struct { + serviceName string +} + +func (h *hooks) OnConnection(client *handlers.SeedLinkClient) { + logger.GetLogger(h.serviceName).Infof("client %v connected", client.RemoteAddr()) +} + +func (h *hooks) OnData(client *handlers.SeedLinkClient, data []byte) {} + +func (h *hooks) OnClose(client *handlers.SeedLinkClient) { + logger.GetLogger(h.serviceName).Infof("client %v disconnected", client.RemoteAddr()) +} + +func (h *hooks) OnCommand(client *handlers.SeedLinkClient, command []string) { + logger.GetLogger(h.serviceName).Infof("client %v issued command %v", client.RemoteAddr(), command) +} diff --git a/services/seedlink/name.go b/services/seedlink/name.go new file mode 100644 index 00000000..36e83ae7 --- /dev/null +++ b/services/seedlink/name.go @@ -0,0 +1,5 @@ +package seedlink + +func (w *SeedLinkService) GetServiceName() string { + return "seedlink" +} diff --git a/services/seedlink/provider.go b/services/seedlink/provider.go new file mode 100644 index 00000000..6df9edc9 --- /dev/null +++ b/services/seedlink/provider.go @@ -0,0 +1,140 @@ +package seedlink + +import ( + "fmt" + "time" + + "github.com/anyshake/observer/drivers/dao/tables" + "github.com/anyshake/observer/utils/timesource" + "github.com/bclswl0827/slgo/handlers" + "gorm.io/gorm" +) + +type provider struct { + timeSource *timesource.Source + database *gorm.DB + startTime time.Time + stationCode string + networkCode string + locationCode string + channelPrefix string +} + +func (p *provider) GetSoftware() string { + return "anyshake_observer" +} + +func (p *provider) GetStartTime() time.Time { + return p.startTime +} + +func (p *provider) GetCurrentTime() time.Time { + currentTime, _ := p.timeSource.Get() + return currentTime +} + +func (p *provider) GetOrganization() string { + return "anyshake.org" +} + +func (p *provider) GetStations() []handlers.SeedLinkStation { + return []handlers.SeedLinkStation{ + { + BeginSequence: "000000", + EndSequence: "FFFFFF", + Station: p.stationCode, + Network: p.networkCode, + Description: "AnyShake Observer station", + }, + } +} + +func (p *provider) GetStreams() []handlers.SeedLinkStream { + return []handlers.SeedLinkStream{ + { + BeginTime: p.GetStartTime().Format("2006-01-02 15:04:01"), + EndTime: "9999-12-31 23:59:59", + SeedName: fmt.Sprintf("%sZ", p.channelPrefix), + Location: p.locationCode, + Type: "D", + Station: p.stationCode, + }, + { + BeginTime: p.GetStartTime().Format("2006-01-02 15:04:01"), + EndTime: "9999-12-31 23:59:59", + SeedName: fmt.Sprintf("%sE", p.channelPrefix), + Location: p.locationCode, + Type: "D", + Station: p.stationCode, + }, + { + BeginTime: p.GetStartTime().Format("2006-01-02 15:04:01"), + EndTime: "9999-12-31 23:59:59", + SeedName: fmt.Sprintf("%sN", p.channelPrefix), + Location: p.locationCode, + Type: "D", + Station: p.stationCode, + }, + } +} + +func (p *provider) GetCapabilities() []handlers.SeedLinkCapability { + return []handlers.SeedLinkCapability{ + {Name: "info:all"}, {Name: "info:gaps"}, {Name: "info:streams"}, + {Name: "dialup"}, {Name: "info:id"}, {Name: "multistation"}, + {Name: "window-extraction"}, {Name: "info:connections"}, + {Name: "info:capabilities"}, {Name: "info:stations"}, + } +} + +func (p *provider) QueryHistory(startTime, endTime time.Time, channels []string) ([]handlers.SeedLinkDataPacket, error) { + var ( + adcCountModel tables.AdcCount + adcCountData []tables.AdcCount + ) + err := p.database. + Table(adcCountModel.GetName()). + Where("timestamp >= ? AND timestamp <= ?", startTime.UnixMilli(), endTime.UnixMilli()). + Order("timestamp ASC"). + Find(&adcCountData). + Error + if err != nil { + return nil, err + } + + // Convert ADC count data to SeedLink data packets + var dataPacketArr []handlers.SeedLinkDataPacket + for _, channel := range channels { + switch channel { + case fmt.Sprintf("%sZ", p.channelPrefix): + for _, adcCount := range adcCountData { + dataPacketArr = append(dataPacketArr, handlers.SeedLinkDataPacket{ + Timestamp: adcCount.Timestamp, + SampleRate: adcCount.SampleRate, + Channel: channel, + DataArr: adcCount.Z_Axis, + }) + } + case fmt.Sprintf("%sE", p.channelPrefix): + for _, adcCount := range adcCountData { + dataPacketArr = append(dataPacketArr, handlers.SeedLinkDataPacket{ + Timestamp: adcCount.Timestamp, + SampleRate: adcCount.SampleRate, + Channel: channel, + DataArr: adcCount.E_Axis, + }) + } + case fmt.Sprintf("%sN", p.channelPrefix): + for _, adcCount := range adcCountData { + dataPacketArr = append(dataPacketArr, handlers.SeedLinkDataPacket{ + Timestamp: adcCount.Timestamp, + SampleRate: adcCount.SampleRate, + Channel: channel, + DataArr: adcCount.N_Axis, + }) + } + } + } + + return dataPacketArr, nil +} diff --git a/services/seedlink/start.go b/services/seedlink/start.go new file mode 100644 index 00000000..1aa2be6a --- /dev/null +++ b/services/seedlink/start.go @@ -0,0 +1,87 @@ +package seedlink + +import ( + "sync" + + "github.com/anyshake/observer/drivers/explorer" + "github.com/anyshake/observer/services" + "github.com/anyshake/observer/utils/logger" + "github.com/bclswl0827/slgo" + cmap "github.com/orcaman/concurrent-map/v2" + messagebus "github.com/vardius/message-bus" +) + +func (s *SeedLinkService) Start(options *services.Options, waitGroup *sync.WaitGroup) { + defer waitGroup.Done() + + serviceConfig, ok := options.Config.Services[s.GetServiceName()] + if !ok { + logger.GetLogger(s.GetServiceName()).Errorln("service configuration not found") + return + } + if !serviceConfig.(map[string]any)["enable"].(bool) { + logger.GetLogger(s.GetServiceName()).Infoln("service has been disabled") + return + } + serverHost := serviceConfig.(map[string]any)["host"].(string) + serverPort := int(serviceConfig.(map[string]any)["port"].(float64)) + currentTime, _ := options.TimeSource.Get() + messageBus := messagebus.New(65535) + + // Subscribe to Explorer events + var explorerDeps *explorer.ExplorerDependency + err := options.Dependency.Invoke(func(deps *explorer.ExplorerDependency) error { + explorerDeps = deps + return nil + }) + if err != nil { + logger.GetLogger(s.GetServiceName()).Errorln(err) + return + } + explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{}) + explorerDriver.Subscribe( + explorerDeps, + s.GetServiceName(), + func(data *explorer.ExplorerData) { + if s.prevSampleRate == 0 { + s.prevSampleRate = data.SampleRate + } + if s.prevSampleRate == data.SampleRate { + messageBus.Publish(s.GetServiceName(), data) + } else { + logger.GetLogger(s.GetServiceName()).Warnf("sample rate is not the same, expected %d, got %d", s.prevSampleRate, data.SampleRate) + } + s.prevSampleRate = data.SampleRate + }, + ) + + // Start SeedLink server + server := slgo.New( + &provider{ + timeSource: options.TimeSource, + database: options.Database, + startTime: currentTime, + stationCode: options.Config.Stream.Station, + networkCode: options.Config.Stream.Network, + locationCode: options.Config.Stream.Location, + channelPrefix: options.Config.Stream.Channel, + }, + &consumer{ + channelPrefix: options.Config.Stream.Channel, + serviceName: s.GetServiceName(), + messageBus: messageBus, + subscribers: cmap.New[explorer.ExplorerEventHandler](), + }, + &hooks{ + serviceName: s.GetServiceName(), + }, + ) + go server.Start(serverHost, serverPort) + logger.GetLogger(s.GetServiceName()).Infof("seedlink is listening on %s:%d", serverHost, serverPort) + + logger.GetLogger(s.GetServiceName()).Infoln("service has been started") + defer logger.GetLogger(s.GetServiceName()).Infoln("service has been stopped") + + <-options.CancelToken.Done() + explorerDriver.Unsubscribe(explorerDeps, s.GetServiceName()) +} diff --git a/services/seedlink/types.go b/services/seedlink/types.go new file mode 100644 index 00000000..f84525ac --- /dev/null +++ b/services/seedlink/types.go @@ -0,0 +1,5 @@ +package seedlink + +type SeedLinkService struct { + prevSampleRate int +} diff --git a/startups/explorer/execute.go b/startups/explorer/execute.go index c6627e88..bdccf2ab 100644 --- a/startups/explorer/execute.go +++ b/startups/explorer/execute.go @@ -35,7 +35,7 @@ func (t *ExplorerStartupTask) Execute(depsContainer *dig.Container, options *sta if !explorerDeps.Config.LegacyMode { logger.GetLogger(t.GetTaskName()).Infof("handshake successful, device ID: %08X", explorerDeps.Config.DeviceId) } else { - logger.GetLogger(t.GetTaskName()).Warnln("device is in legacy mode, keep an eye on the CPU usage") + logger.GetLogger(t.GetTaskName()).Warnln("device is in legacy mode, this is for backward compatibility only") } return nil }