Skip to content

Commit

Permalink
first init
Browse files Browse the repository at this point in the history
  • Loading branch information
hujm2023 committed Aug 18, 2024
0 parents commit 4470680
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 0 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<!-- @format -->

# go-cmpp-server

This GitHub repository contains a CMPP 2.0 service implemented in Go language. The service is built on top of the [cloudwego/netpoll](https://github.com/cloudwego/netpoll). It has the following features:

- Event-driven: The service uses an event-driven architecture to efficiently handle a large number of concurrent connections.
- High-performance: By taking advantage of Go's concurrency capabilities, the service provides high-performance CMPP 2.0 service.
- Quick start: The simple and easy-to-use design allows developers to quickly get started and deploy the service.
Key Features
28 changes: 28 additions & 0 deletions cmd/demo/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"fmt"
"os"
"os/signal"
"syscall"

cmppserver "github.com/hujm2023/go-cmpp-server"
)

func main() {
s := cmppserver.NewCMPPServer()
errChan := make(chan error)
go func() {
errChan <- s.Listen("tcp", "[::]:8899")
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)

select {
case s := <-signals:
fmt.Printf("receive signal: %d\n", s)
return
case err := <-errChan:
fmt.Printf("run server error: %v\n", err)
}
}
37 changes: 37 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cmppserver

import (
"github.com/cloudwego/netpoll"
)

type Connection struct {
conn netpoll.Reader
}

func (c *Connection) Peek(n int) ([]byte, error) {
return c.conn.Peek(n)
}

func (c *Connection) Discard(n int) (int, error) {
return n, c.conn.Skip(n)
}

func (c *Connection) Size() int {
return c.conn.Len()
}

// Read .
// Do not use.
func (c *Connection) Read(p []byte) (n int, err error) {
// make a copy
r, err := c.conn.Slice(n)
if err != nil {
return 0, err
}
data, err := r.Next(-1)
if err != nil {
return 0, err
}
copy(p, data)
return len(data), nil
}
17 changes: 17 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module github.com/hujm2023/go-cmpp-server

go 1.23.0

require (
github.com/cloudwego/netpoll v0.6.3
github.com/hujm2023/go-sms-protocol v0.6.0
github.com/hujm2023/hlog v0.2.0
github.com/samber/lo v1.47.0
)

require (
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect
)
38 changes: 38 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/cloudwego/netpoll v0.6.3 h1:t+ndlwBFjQZimUj3ul31DwI45t18eOr2pcK3juZZm+E=
github.com/cloudwego/netpoll v0.6.3/go.mod h1:kaqvfZ70qd4T2WtIIpCOi5Cxyob8viEpzLhCrTrz3HM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/hujm2023/go-sms-protocol v0.6.0 h1:OqI9n1HWTtJvrdorzy39aXzzilbYmQCQWg9/M+jxSss=
github.com/hujm2023/go-sms-protocol v0.6.0/go.mod h1:U9Ji8qlk9sBWHPvgtCXGsUmEGX/Wcuhwmre6uIXVrH4=
github.com/hujm2023/hlog v0.2.0 h1:4yqL60WqhwHUi4RPvwDrY7z6KXFKBwGzKdC9e4qpl8U=
github.com/hujm2023/hlog v0.2.0/go.mod h1:VfRfBpprTbLXJ2Y8xmCIQ/uZosNCvYCgQ/Ty8O2E08Q=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
101 changes: 101 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package cmppserver

import (
"context"
"errors"
"fmt"

protocol "github.com/hujm2023/go-sms-protocol"
"github.com/hujm2023/go-sms-protocol/cmpp"
"github.com/hujm2023/go-sms-protocol/cmpp/cmpp20"
"github.com/hujm2023/hlog"
)

var ErrInvalidPDUAssert = errors.New("invalid pdu assert")

type CommandHandler func(ctx context.Context, pdu protocol.PDU) (resp []byte, err error)

type Dispatcher struct {
handlers map[protocol.ICommander]CommandHandler
}

func newDisPatcher() *Dispatcher {
return &Dispatcher{
handlers: make(map[protocol.ICommander]CommandHandler),
}
}

func (d *Dispatcher) Register(cmd protocol.ICommander, handler CommandHandler) {
if _, ok := d.handlers[cmd]; ok {
panic(fmt.Sprintf("%s has been registered", cmd.String()))
}
d.handlers[cmd] = handler
}

func (d *Dispatcher) Dispatch(ctx context.Context, data []byte) (resp []byte, err error) {
pdu, err := cmpp20.DecodeCMPP20(data)
if err != nil {
return nil, fmt.Errorf("decode cmpp20 error: %w", err)
}
cmd := pdu.GetCommand()
handler, ok := d.handlers[cmd]
if !ok {
return nil, fmt.Errorf("%s not implemented", cmd.String())
}
return handler(ctx, pdu)
}

var cmpp20Dispatcher = newDisPatcher()

func init() {
cmpp20Dispatcher.Register(cmpp.CommandConnect, cmpp20Connect)
cmpp20Dispatcher.Register(cmpp.CommandSubmit, cmpp20Submit)
cmpp20Dispatcher.Register(cmpp.CommandActiveTest, cmpp20ActiveTest)
cmpp20Dispatcher.Register(cmpp.CommandActiveTestResp, cmpp20ActiveTestResp)
cmpp20Dispatcher.Register(cmpp.CommandDeliverResp, cmpp20DeliveyResp)
}

func cmpp20Connect(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) {
connect, ok := pdu.(*cmpp20.PduConnect)
if !ok {
return nil, ErrInvalidPDUAssert
}
hlog.CtxInfo(ctx, "[cmpp20Connect] user:%s", connect.SourceAddr)

// TODO: handle auth

return connect.GenEmptyResponse().IEncode()
}

func cmpp20Submit(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) {
// handle sumit content
submit, ok := pdu.(*cmpp20.PduSubmit)
if !ok {
return nil, ErrInvalidPDUAssert
}

content, err := protocol.DecodeCMPPCContent(ctx, submit.MsgContent, submit.MsgFmt)
if err != nil {
hlog.CtxWarn(ctx, "[cmpp20Submit] decode content error: %v", err)
return nil, nil
}
submitResp := submit.GenEmptyResponse().(*cmpp20.PduSubmitResp)
submitResp.MsgID = GenMsgID()
submitResp.Result = 0

hlog.CtxInfo(ctx, "[cmpp20Submit] content:%s, msgID:%d", content, submitResp.MsgID)
return submitResp.IEncode()
}

func cmpp20ActiveTest(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) {
return pdu.GenEmptyResponse().IEncode()
}

func cmpp20ActiveTestResp(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) {
hlog.CtxInfo(ctx, "[cmpp20ActiveTestResp] received an active test resp")
return nil, nil
}

func cmpp20DeliveyResp(ctx context.Context, pdu protocol.PDU) (resp []byte, err error) {
return nil, nil
}
20 changes: 20 additions & 0 deletions msgid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cmppserver

import (
"sync/atomic"
"time"

"github.com/bytedance/gopkg/lang/fastrand"
"github.com/hujm2023/go-sms-protocol/cmpp"
)

var (
msgID uint64
gateID = fastrand.Uint64n(1024)
)

func GenMsgID() uint64 {
now := time.Now()
sequenceID := atomic.AddUint64(&msgID, 1)
return cmpp.CombineMsgID(uint64(now.Month()), uint64(now.Day()), uint64(now.Hour()), uint64(now.Minute()), uint64(now.Day()), gateID, sequenceID)
}
108 changes: 108 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cmppserver

import (
"context"
"errors"
"net"
"time"

"github.com/cloudwego/netpoll"
"github.com/hujm2023/go-sms-protocol/codec"
"github.com/hujm2023/hlog"
"github.com/samber/lo"
)

type CMPPServer struct {
*codec.CMPPCodec
handler *Dispatcher
eventLoop netpoll.EventLoop
}

func NewCMPPServer() *CMPPServer {
c := &CMPPServer{
CMPPCodec: codec.NewCMPPCodec(),
handler: cmpp20Dispatcher,
}
c.eventLoop = lo.Must(
netpoll.NewEventLoop(
c.onRequest,
netpoll.WithOnPrepare(c.onPrepare),
netpoll.WithOnConnect(c.onConnect),
netpoll.WithReadTimeout(time.Second),
netpoll.WithOnDisconnect(c.onDisConnect),
),
)

return c
}

// Listen 阻塞监听.
func (c *CMPPServer) Listen(network, address string) error {
listener, err := net.Listen(network, address)
if err != nil {
return err
}
hlog.Noticef("===> cmpp2.0 server listened at %s://%s", network, address)
return c.eventLoop.Serve(listener)
}

func (c *CMPPServer) Shutdown(waitFor time.Duration) error {
ctx, cancel := context.WithTimeout(context.TODO(), waitFor)
defer cancel()
return c.eventLoop.Shutdown(ctx)
}

// onPrepare means connection Connected but not initialized.
// connection is not registered into poller.
func (c *CMPPServer) onPrepare(connection netpoll.Connection) context.Context {
hlog.Noticef("[onPrepare] remote address: %s", connection.RemoteAddr().String())
return context.Background()
}

// onConnect means connection has Connected and been initialized.
// This connection is ready for read and write.
func (c *CMPPServer) onConnect(ctx context.Context, connection netpoll.Connection) context.Context {
hlog.Noticef("[onConnect] remote address: %s", connection.RemoteAddr().String())
// 注意:这里的connection里面没数据
return nil
}

// onRequest means the first byte has beed sent to this side.
func (c *CMPPServer) onRequest(ctx context.Context, connection netpoll.Connection) error {
reader, writer := connection.Reader(), connection.Writer()
defer func() {
writer.Flush()
reader.Release()
}()

conn := &Connection{conn: reader}

// 水平触发,在一个for中读全部的数据
for {
data, err := c.CMPPCodec.Decode(conn)
if err != nil {
if errors.Is(err, codec.ErrPacketNotComplete) {
break
}
return err
}

// TODO: handle pdu
hlog.CtxInfo(ctx, "[onRequest] read data: %+v", data)
resp, err := c.handler.Dispatch(ctx, data)
if err != nil {
hlog.CtxError(ctx, "[onRequest] dispatch data error: %v", err)
return err
}

if len(resp) > 0 {
_, _ = writer.WriteBinary(resp)
}
}

return nil
}

func (c *CMPPServer) onDisConnect(ctx context.Context, connection netpoll.Connection) {
hlog.Noticef("[onDisConnect] remote address: %s", connection.RemoteAddr().String())
}
27 changes: 27 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cmppserver

import (
"fmt"
"os"
"os/signal"
"syscall"
"testing"
)

func TestServer(t *testing.T) {
s := NewCMPPServer()
errChan := make(chan error)
go func() {
errChan <- s.Listen("tcp", "0.0.0.0:8899")
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)

select {
case s := <-signals:
fmt.Printf("receive signal: %d", s)
return
case err := <-errChan:
fmt.Printf("run server error: %v", err)
}
}

0 comments on commit 4470680

Please sign in to comment.