Skip to content

Commit

Permalink
refactor: replace thrift with grpc (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
Singularity0909 committed Aug 18, 2021
1 parent 167e829 commit 5512a06
Show file tree
Hide file tree
Showing 22 changed files with 2,309 additions and 9,933 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# paster_core

Paster 服务端核心模块,使用字节跳动开源的微服务 RPC 框架 [KiteX](https://github.com/cloudwego/kitex)
通过 [Thrift](https://github.com/apache/thrift) 协议与上游门面模块 [paster_facade](https://github.com/ameidance/paster_facade) 通信。
通过 [gRPC](https://github.com/grpc/grpc-go) 协议与上游门面模块 [paster_facade](https://github.com/ameidance/paster_facade) 通信。

<details>
<summary><b>时序图</b></summary>
Expand All @@ -21,7 +21,7 @@ Paster 服务端核心模块,使用字节跳动开源的微服务 RPC 框架 [

- `kitex.yml` 为 KiteX 框架服务启动配置,默认端口号为 8888。
- `mysql.yml` 为 MySQL 连接配置,其中 `name` 为数据库名,**需要预先建立数据库**
- `consul.yml` 为 Consul 连接配置,其中 `port` 为注册中心端口号`check_port` 为当前服务用以响应健康检查的端口号
- `consul.yml` 为 Consul 连接配置,其中 `port` 为注册中心端口号。

配置完成后在项目根目录下执行以下命令来构建、运行项目。

Expand All @@ -31,3 +31,11 @@ sh output/bootstrap.sh
```

> ⚠️ 请确保在运行项目前已经按照配置信息启动 MySQL 和 Consul 服务端。
### Todo

- [x] 实现 KiteX 服务注册扩展接口,支持 Consul 服务注册
- [x] 使用 gRPC 替换 Thrift 消息协议,支持 Consul 健康检查
- [ ] 优化 frame 层中间件完善请求响应日志(依赖下一版本 KiteX)
- [ ] 将项目打包成 Dokcer 镜像
- [ ] ...
25 changes: 5 additions & 20 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package client
import (
"fmt"
"io/ioutil"
"net/http"
"sync/atomic"

"github.com/ameidance/paster_core/constant"
"github.com/ameidance/paster_core/frame"
Expand All @@ -16,15 +14,13 @@ import (
)

var (
consulCheckCounter int64
consulConf *_ConsulConf
consulClient *api.Client
consulConf *_ConsulConf
consulClient *api.Client
)

type _ConsulConf struct {
Hostname string `yaml:"hostname"`
Port int `yaml:"port"`
CheckPort int `yaml:"check_port"`
Hostname string `yaml:"hostname"`
Port int `yaml:"port"`
}

func InitConsul() {
Expand All @@ -39,7 +35,6 @@ func InitConsul() {
if consulClient == nil || err != nil {
panic(err)
}
go healthCheckHandler(consulConf.CheckPort)
}

type ConsulRegistry struct {
Expand Down Expand Up @@ -68,7 +63,7 @@ func (m *ConsulRegistry) Register(info *registry.Info) (err error) {
}

registration.Check = new(api.AgentServiceCheck)
registration.Check.HTTP = fmt.Sprintf("http://%s:%d/health", registration.Address, consulConf.CheckPort)
registration.Check.GRPC = fmt.Sprintf("%s:%d/%s", registration.Address, registration.Port, registration.Name)
registration.Check.Timeout = "5s"
registration.Check.Interval = "5s"
registration.Check.DeregisterCriticalServiceAfter = "10s"
Expand Down Expand Up @@ -99,13 +94,3 @@ func getConsulConfig() (*_ConsulConf, error) {
}
return conf, nil
}

func healthCheckHandler(port int) {
http.HandleFunc("/health", func(writer http.ResponseWriter, request *http.Request) {
atomic.AddInt64(&consulCheckCounter, 1)
//klog.Debugf("[healthCheckHandler] counter:%v", atomic.LoadInt64(&consulCheckCounter))
})
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
klog.Errorf("[healthCheckHandler] serve failed. err:%v", err)
}
}
3 changes: 1 addition & 2 deletions conf/consul.example.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
hostname: "127.0.0.1"
port: 8500
check_port: 8800
port: 8500
14 changes: 3 additions & 11 deletions frame/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,14 @@ import (

var _ endpoint.Middleware = LogMiddleware

type args interface {
GetFirstArgument() interface{}
}

type result interface {
GetResult() interface{}
}

func LogMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
method := rpcinfo.GetRPCInfo(ctx).To().Method()
klog.Debugf("[LogMiddleware] rpc method:%v, request:%v", method, util.GetJsonString(req.(args).GetFirstArgument()))
method := rpcinfo.GetRPCInfo(ctx).Invocation().MethodName()
klog.Debugf("[LogMiddleware] rpc method:%v, request:%v", method, util.GetJsonString(req))
if err = next(ctx, req, resp); err != nil {
return
}
klog.Debugf("[LogMiddleware] rpc method:%v, response:%v", method, util.GetJsonString(resp.(result).GetResult()))
klog.Debugf("[LogMiddleware] rpc method:%v, response:%v", method, util.GetJsonString(resp))
return nil
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ module github.com/ameidance/paster_core
go 1.15

require (
github.com/apache/thrift v0.13.0
github.com/cloudwego/kitex v0.0.3
github.com/cloudwego/kitex v0.0.4-0.20210818083940-6f39863ea550
github.com/hashicorp/consul/api v1.9.1
github.com/json-iterator/go v1.1.11
google.golang.org/protobuf v1.26.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gorm.io/driver/mysql v1.1.1
gorm.io/gorm v1.21.12
Expand Down
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/kitex v0.0.3 h1:bZN8rxh5uArwFbO2W5jx6Ie7l53/gzS4b22roNjnMgY=
github.com/cloudwego/kitex v0.0.3/go.mod h1:NfZ3Zj8MA+DklZd+z8i6/PEFmrnJAtHrz3AofZAKhKE=
github.com/cloudwego/kitex v0.0.4-0.20210818083940-6f39863ea550 h1:dDIIbih6HxC4STL6S/ZuJWiHATprHUATvviQwR3Pako=
github.com/cloudwego/kitex v0.0.4-0.20210818083940-6f39863ea550/go.mod h1:EIjPJ4Dom2ornk7xDCdKpUpOnf4Tulevimh4Tn05OGc=
github.com/cloudwego/netpoll v0.0.2 h1:YIFcR1aN59/nof3KGqOnfjNWCiWr98xfD19J3QYpCPU=
github.com/cloudwego/netpoll v0.0.2/go.mod h1:rZOiNI0FYjuvNybXKKhAPUja03loJi/cdv2F55AE6E8=
github.com/cloudwego/netpoll v0.0.3 h1:LemwD6LQDhy3GNz/evTyh7DlxGnBua0Jl+5KmeVHXxA=
github.com/cloudwego/netpoll v0.0.3/go.mod h1:rZOiNI0FYjuvNybXKKhAPUja03loJi/cdv2F55AE6E8=
github.com/cloudwego/netpoll-http2 v0.0.4 h1:pN4uqjklPdvuALd3nFH5UFlmxB7vy7t8MkvKaV8HavU=
github.com/cloudwego/netpoll-http2 v0.0.4/go.mod h1:iFr5SzJCXIYgBg0ubL0fZiCQ6W36s9p0KjXpV04lmoY=
github.com/cloudwego/thriftgo v0.0.2-0.20210726073420-0145861fcd04 h1:xrb9zM079RW/DgxuOWF9/MdD0b3sM1W76BN7SxJW+vo=
github.com/cloudwego/thriftgo v0.0.2-0.20210726073420-0145861fcd04/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cloudwego/thriftgo v0.1.2 h1:AXpGJiWE3VggfiRHwA6raRJUIcjxliEIfJfGlvRiYUA=
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -244,6 +246,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
35 changes: 22 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,43 @@ package main
import (
"context"

"github.com/ameidance/paster_core/model/dto/kitex_gen/ameidance/paster/core"
"github.com/ameidance/paster_core/model/dto/kitex_gen/core"
"github.com/ameidance/paster_core/service"
)

// PasterCoreServiceImpl implements the last service interface defined in the IDL.
type PasterCoreServiceImpl struct{}
// PasterCoreImpl implements the last service interface defined in the IDL.
type PasterCoreImpl struct{}

// GetPost implements the PasterCoreServiceImpl interface.
func (s *PasterCoreServiceImpl) GetPost(ctx context.Context, req *core.GetPostRequest) (resp *core.GetPostResponse, err error) {
// GetPost implements the PasterCoreImpl interface.
func (s *PasterCoreImpl) GetPost(ctx context.Context, req *core.GetPostRequest) (resp *core.GetPostResponse, err error) {
return service.GetPost(ctx, req), nil
}

// SavePost implements the PasterCoreServiceImpl interface.
func (s *PasterCoreServiceImpl) SavePost(ctx context.Context, req *core.SavePostRequest) (resp *core.SavePostResponse, err error) {
// SavePost implements the PasterCoreImpl interface.
func (s *PasterCoreImpl) SavePost(ctx context.Context, req *core.SavePostRequest) (resp *core.SavePostResponse, err error) {
return service.SavePost(ctx, req), nil
}

// DeletePost implements the PasterCoreServiceImpl interface.
func (s *PasterCoreServiceImpl) DeletePost(ctx context.Context, req *core.DeletePostRequest) (resp *core.DeletePostResponse, err error) {
// DeletePost implements the PasterCoreImpl interface.
func (s *PasterCoreImpl) DeletePost(ctx context.Context, req *core.DeletePostRequest) (resp *core.DeletePostResponse, err error) {
return service.DeletePost(ctx, req), nil
}

// GetComments implements the PasterCoreServiceImpl interface.
func (s *PasterCoreServiceImpl) GetComments(ctx context.Context, req *core.GetCommentsRequest) (resp *core.GetCommentsResponse, err error) {
// GetComments implements the PasterCoreImpl interface.
func (s *PasterCoreImpl) GetComments(ctx context.Context, req *core.GetCommentsRequest) (resp *core.GetCommentsResponse, err error) {
return service.GetComments(ctx, req), nil
}

// SaveComment implements the PasterCoreServiceImpl interface.
func (s *PasterCoreServiceImpl) SaveComment(ctx context.Context, req *core.SaveCommentRequest) (resp *core.SaveCommentResponse, err error) {
// SaveComment implements the PasterCoreImpl interface.
func (s *PasterCoreImpl) SaveComment(ctx context.Context, req *core.SaveCommentRequest) (resp *core.SaveCommentResponse, err error) {
return service.SaveComment(ctx, req), nil
}

// Check implements the PasterCoreImpl interface.
func (s *PasterCoreImpl) Check(ctx context.Context, req *core.HealthCheckRequest) (resp *core.HealthCheckResponse, err error) {
return &core.HealthCheckResponse{Status: core.HealthCheckResponse_SERVING}, nil
}

func (s *PasterCoreImpl) Watch(req *core.HealthCheckRequest, stream core.PasterCore_WatchServer) (err error) {
return stream.Close()
}
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"github.com/ameidance/paster_core/client"
"github.com/ameidance/paster_core/frame"
"github.com/ameidance/paster_core/model/dto/kitex_gen/ameidance/paster/core/pastercoreservice"
"github.com/ameidance/paster_core/model/dto/kitex_gen/core/pastercore"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/server"
)
Expand All @@ -12,8 +12,7 @@ func main() {
client.InitDB()
client.InitConsul()

srv := pastercoreservice.NewServer(new(PasterCoreServiceImpl), server.WithServerBasicInfo(frame.EBI),
server.WithRegistry(client.NewConsulRegistry()), server.WithMiddleware(frame.LogMiddleware))
srv := pastercore.NewServer(new(PasterCoreImpl), server.WithServerBasicInfo(frame.EBI), server.WithRegistry(client.NewConsulRegistry()))
if err := srv.Run(); err != nil {
klog.Errorf("[main] server stopped with error. err:%v", err)
panic(err)
Expand Down
Loading

0 comments on commit 5512a06

Please sign in to comment.