From 447068038ed00dc98d84c05bda698cebf8f07af5 Mon Sep 17 00:00:00 2001 From: hujm2023 Date: Mon, 19 Aug 2024 01:21:46 +0800 Subject: [PATCH] first init --- README.md | 10 +++++ cmd/demo/server.go | 28 ++++++++++++ connection.go | 37 ++++++++++++++++ go.mod | 17 +++++++ go.sum | 38 ++++++++++++++++ handler.go | 101 ++++++++++++++++++++++++++++++++++++++++++ msgid.go | 20 +++++++++ server.go | 108 +++++++++++++++++++++++++++++++++++++++++++++ server_test.go | 27 ++++++++++++ 9 files changed, 386 insertions(+) create mode 100644 README.md create mode 100644 cmd/demo/server.go create mode 100644 connection.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 handler.go create mode 100644 msgid.go create mode 100644 server.go create mode 100644 server_test.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..8272973 --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ + + +# go-cmpp-server + +This GitHub repository contains a CMPP 2.0 service implemented in Go language. The service is built on top of the [cloudwego/netpoll](https://github.com/cloudwego/netpoll). It has the following features: + +- Event-driven: The service uses an event-driven architecture to efficiently handle a large number of concurrent connections. +- High-performance: By taking advantage of Go's concurrency capabilities, the service provides high-performance CMPP 2.0 service. +- Quick start: The simple and easy-to-use design allows developers to quickly get started and deploy the service. + Key Features diff --git a/cmd/demo/server.go b/cmd/demo/server.go new file mode 100644 index 0000000..e802d06 --- /dev/null +++ b/cmd/demo/server.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + cmppserver "github.com/hujm2023/go-cmpp-server" +) + +func main() { + s := cmppserver.NewCMPPServer() + errChan := make(chan error) + go func() { + errChan <- s.Listen("tcp", "[::]:8899") + }() + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM) + + select { + case s := <-signals: + fmt.Printf("receive signal: %d\n", s) + return + case err := <-errChan: + fmt.Printf("run server error: %v\n", err) + } +} diff --git a/connection.go b/connection.go new file mode 100644 index 0000000..e346482 --- /dev/null +++ b/connection.go @@ -0,0 +1,37 @@ +package cmppserver + +import ( + "github.com/cloudwego/netpoll" +) + +type Connection struct { + conn netpoll.Reader +} + +func (c *Connection) Peek(n int) ([]byte, error) { + return c.conn.Peek(n) +} + +func (c *Connection) Discard(n int) (int, error) { + return n, c.conn.Skip(n) +} + +func (c *Connection) Size() int { + return c.conn.Len() +} + +// Read . +// Do not use. +func (c *Connection) Read(p []byte) (n int, err error) { + // make a copy + r, err := c.conn.Slice(n) + if err != nil { + return 0, err + } + data, err := r.Next(-1) + if err != nil { + return 0, err + } + copy(p, data) + return len(data), nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b66f88f --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/hujm2023/go-cmpp-server + +go 1.23.0 + +require ( + github.com/cloudwego/netpoll v0.6.3 + github.com/hujm2023/go-sms-protocol v0.6.0 + github.com/hujm2023/hlog v0.2.0 + github.com/samber/lo v1.47.0 +) + +require ( + github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/text v0.16.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3e7c1ce --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY= +github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= +github.com/cloudwego/netpoll v0.6.3 h1:t+ndlwBFjQZimUj3ul31DwI45t18eOr2pcK3juZZm+E= +github.com/cloudwego/netpoll v0.6.3/go.mod h1:kaqvfZ70qd4T2WtIIpCOi5Cxyob8viEpzLhCrTrz3HM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/hujm2023/go-sms-protocol v0.6.0 h1:OqI9n1HWTtJvrdorzy39aXzzilbYmQCQWg9/M+jxSss= +github.com/hujm2023/go-sms-protocol v0.6.0/go.mod h1:U9Ji8qlk9sBWHPvgtCXGsUmEGX/Wcuhwmre6uIXVrH4= +github.com/hujm2023/hlog v0.2.0 h1:4yqL60WqhwHUi4RPvwDrY7z6KXFKBwGzKdC9e4qpl8U= +github.com/hujm2023/hlog v0.2.0/go.mod h1:VfRfBpprTbLXJ2Y8xmCIQ/uZosNCvYCgQ/Ty8O2E08Q= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..d31a01d --- /dev/null +++ b/handler.go @@ -0,0 +1,101 @@ +package cmppserver + +import ( + "context" + "errors" + "fmt" + + protocol "github.com/hujm2023/go-sms-protocol" + "github.com/hujm2023/go-sms-protocol/cmpp" + "github.com/hujm2023/go-sms-protocol/cmpp/cmpp20" + "github.com/hujm2023/hlog" +) + +var ErrInvalidPDUAssert = errors.New("invalid pdu assert") + +type CommandHandler func(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) + +type Dispatcher struct { + handlers map[protocol.ICommander]CommandHandler +} + +func newDisPatcher() *Dispatcher { + return &Dispatcher{ + handlers: make(map[protocol.ICommander]CommandHandler), + } +} + +func (d *Dispatcher) Register(cmd protocol.ICommander, handler CommandHandler) { + if _, ok := d.handlers[cmd]; ok { + panic(fmt.Sprintf("%s has been registered", cmd.String())) + } + d.handlers[cmd] = handler +} + +func (d *Dispatcher) Dispatch(ctx context.Context, data []byte) (resp []byte, err error) { + pdu, err := cmpp20.DecodeCMPP20(data) + if err != nil { + return nil, fmt.Errorf("decode cmpp20 error: %w", err) + } + cmd := pdu.GetCommand() + handler, ok := d.handlers[cmd] + if !ok { + return nil, fmt.Errorf("%s not implemented", cmd.String()) + } + return handler(ctx, pdu) +} + +var cmpp20Dispatcher = newDisPatcher() + +func init() { + cmpp20Dispatcher.Register(cmpp.CommandConnect, cmpp20Connect) + cmpp20Dispatcher.Register(cmpp.CommandSubmit, cmpp20Submit) + cmpp20Dispatcher.Register(cmpp.CommandActiveTest, cmpp20ActiveTest) + cmpp20Dispatcher.Register(cmpp.CommandActiveTestResp, cmpp20ActiveTestResp) + cmpp20Dispatcher.Register(cmpp.CommandDeliverResp, cmpp20DeliveyResp) +} + +func cmpp20Connect(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) { + connect, ok := pdu.(*cmpp20.PduConnect) + if !ok { + return nil, ErrInvalidPDUAssert + } + hlog.CtxInfo(ctx, "[cmpp20Connect] user:%s", connect.SourceAddr) + + // TODO: handle auth + + return connect.GenEmptyResponse().IEncode() +} + +func cmpp20Submit(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) { + // handle sumit content + submit, ok := pdu.(*cmpp20.PduSubmit) + if !ok { + return nil, ErrInvalidPDUAssert + } + + content, err := protocol.DecodeCMPPCContent(ctx, submit.MsgContent, submit.MsgFmt) + if err != nil { + hlog.CtxWarn(ctx, "[cmpp20Submit] decode content error: %v", err) + return nil, nil + } + submitResp := submit.GenEmptyResponse().(*cmpp20.PduSubmitResp) + submitResp.MsgID = GenMsgID() + submitResp.Result = 0 + + hlog.CtxInfo(ctx, "[cmpp20Submit] content:%s, msgID:%d", content, submitResp.MsgID) + return submitResp.IEncode() +} + +func cmpp20ActiveTest(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) { + return pdu.GenEmptyResponse().IEncode() +} + +func cmpp20ActiveTestResp(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) { + hlog.CtxInfo(ctx, "[cmpp20ActiveTestResp] received an active test resp") + return nil, nil +} + +func cmpp20DeliveyResp(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) { + return nil, nil +} diff --git a/msgid.go b/msgid.go new file mode 100644 index 0000000..43cbcee --- /dev/null +++ b/msgid.go @@ -0,0 +1,20 @@ +package cmppserver + +import ( + "sync/atomic" + "time" + + "github.com/bytedance/gopkg/lang/fastrand" + "github.com/hujm2023/go-sms-protocol/cmpp" +) + +var ( + msgID uint64 + gateID = fastrand.Uint64n(1024) +) + +func GenMsgID() uint64 { + now := time.Now() + sequenceID := atomic.AddUint64(&msgID, 1) + return cmpp.CombineMsgID(uint64(now.Month()), uint64(now.Day()), uint64(now.Hour()), uint64(now.Minute()), uint64(now.Day()), gateID, sequenceID) +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..0de333d --- /dev/null +++ b/server.go @@ -0,0 +1,108 @@ +package cmppserver + +import ( + "context" + "errors" + "net" + "time" + + "github.com/cloudwego/netpoll" + "github.com/hujm2023/go-sms-protocol/codec" + "github.com/hujm2023/hlog" + "github.com/samber/lo" +) + +type CMPPServer struct { + *codec.CMPPCodec + handler *Dispatcher + eventLoop netpoll.EventLoop +} + +func NewCMPPServer() *CMPPServer { + c := &CMPPServer{ + CMPPCodec: codec.NewCMPPCodec(), + handler: cmpp20Dispatcher, + } + c.eventLoop = lo.Must( + netpoll.NewEventLoop( + c.onRequest, + netpoll.WithOnPrepare(c.onPrepare), + netpoll.WithOnConnect(c.onConnect), + netpoll.WithReadTimeout(time.Second), + netpoll.WithOnDisconnect(c.onDisConnect), + ), + ) + + return c +} + +// Listen 阻塞监听. +func (c *CMPPServer) Listen(network, address string) error { + listener, err := net.Listen(network, address) + if err != nil { + return err + } + hlog.Noticef("===> cmpp2.0 server listened at %s://%s", network, address) + return c.eventLoop.Serve(listener) +} + +func (c *CMPPServer) Shutdown(waitFor time.Duration) error { + ctx, cancel := context.WithTimeout(context.TODO(), waitFor) + defer cancel() + return c.eventLoop.Shutdown(ctx) +} + +// onPrepare means connection Connected but not initialized. +// connection is not registered into poller. +func (c *CMPPServer) onPrepare(connection netpoll.Connection) context.Context { + hlog.Noticef("[onPrepare] remote address: %s", connection.RemoteAddr().String()) + return context.Background() +} + +// onConnect means connection has Connected and been initialized. +// This connection is ready for read and write. +func (c *CMPPServer) onConnect(ctx context.Context, connection netpoll.Connection) context.Context { + hlog.Noticef("[onConnect] remote address: %s", connection.RemoteAddr().String()) + // 注意:这里的connection里面没数据 + return nil +} + +// onRequest means the first byte has beed sent to this side. +func (c *CMPPServer) onRequest(ctx context.Context, connection netpoll.Connection) error { + reader, writer := connection.Reader(), connection.Writer() + defer func() { + writer.Flush() + reader.Release() + }() + + conn := &Connection{conn: reader} + + // 水平触发,在一个for中读全部的数据 + for { + data, err := c.CMPPCodec.Decode(conn) + if err != nil { + if errors.Is(err, codec.ErrPacketNotComplete) { + break + } + return err + } + + // TODO: handle pdu + hlog.CtxInfo(ctx, "[onRequest] read data: %+v", data) + resp, err := c.handler.Dispatch(ctx, data) + if err != nil { + hlog.CtxError(ctx, "[onRequest] dispatch data error: %v", err) + return err + } + + if len(resp) > 0 { + _, _ = writer.WriteBinary(resp) + } + } + + return nil +} + +func (c *CMPPServer) onDisConnect(ctx context.Context, connection netpoll.Connection) { + hlog.Noticef("[onDisConnect] remote address: %s", connection.RemoteAddr().String()) +} diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000..720d424 --- /dev/null +++ b/server_test.go @@ -0,0 +1,27 @@ +package cmppserver + +import ( + "fmt" + "os" + "os/signal" + "syscall" + "testing" +) + +func TestServer(t *testing.T) { + s := NewCMPPServer() + errChan := make(chan error) + go func() { + errChan <- s.Listen("tcp", "0.0.0.0:8899") + }() + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM) + + select { + case s := <-signals: + fmt.Printf("receive signal: %d", s) + return + case err := <-errChan: + fmt.Printf("run server error: %v", err) + } +}