diff --git a/go.mod b/go.mod index 4be7842..f77694c 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,11 @@ go 1.17 require ( github.com/urfave/cli/v2 v2.3.0 - github.com/v8platform/protos v0.1.4 + github.com/v8platform/protos v0.2.0 google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 ) -//replace github.com/v8platform/protos v0.1.2 => ../protos -//replace github.com/v8platform/encoder v0.0.3 => ../../khorevaa/encoder - require ( github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/golang/protobuf v1.5.2 // indirect diff --git a/go.sum b/go.sum index 5575b65..8094207 100644 --- a/go.sum +++ b/go.sum @@ -97,8 +97,8 @@ github.com/v8platform/encoder v0.0.3 h1:iqNmisoePgWUKNij7FmH6u2nY0StrjGjuqAINeeD github.com/v8platform/encoder v0.0.3/go.mod h1:/Ki7kVtFEtydZaIOZ7lbV+L20r4ikPRDrUE+2qZlFPA= github.com/v8platform/protoc-gen-go-ras v0.0.0-20210902165457-013367855358 h1:0F2yUabLX4vpkmIYXcDb+qoQnlqcJaEL25H/8/betKw= github.com/v8platform/protoc-gen-go-ras v0.0.0-20210902165457-013367855358/go.mod h1:1CEQnN/e7zOjnlO8o+ZkwFvyrGUYb4JCDns3ovp923w= -github.com/v8platform/protos v0.1.4 h1:n2BRwwmqWQFYr+qRW4LtMqvfOWqrQIegjoOadbr1EM4= -github.com/v8platform/protos v0.1.4/go.mod h1:8JbrMbSBBP7xsA2bMOSljgejVHgVClPNZ1oPQTP8Cdk= +github.com/v8platform/protos v0.2.0 h1:dwcwXBnIKNsZulxmzLkVubd5WZuaL++k0MxXPFF4guU= +github.com/v8platform/protos v0.2.0/go.mod h1:8JbrMbSBBP7xsA2bMOSljgejVHgVClPNZ1oPQTP8Cdk= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.buf.build/v8platform/go-gen-ras/v8platform/encodingapis v1.2.2/go.mod h1:M7qxAD3sf3aKWMnTTqSFKJYFahMjnf0EdhGpQeoXuUY= go.buf.build/v8platform/go-gen-ras/v8platform/rasapis v1.2.1/go.mod h1:y0b6WuLLqVK8EI2bS8CeGUypLqpWkZ8D1J97yPQpSWE= diff --git a/pkg/client/client.go b/pkg/client/client.go new file mode 100644 index 0000000..bbb9458 --- /dev/null +++ b/pkg/client/client.go @@ -0,0 +1,367 @@ +package client + +import ( + "context" + "github.com/spf13/cast" + clientv1 "github.com/v8platform/protos/gen/ras/client/v1" + protocolv1 "github.com/v8platform/protos/gen/ras/protocol/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "io" + "log" + "net" + "sync" + "sync/atomic" + "time" +) + +var defaultVersion = "10.0" + +var _ clientv1.ClientImpl = (*ClientConn)(nil) +var _ clientv1.ClientServiceImpl = (*ClientConn)(nil) + +func NewClientConn(host string, opts ...Options) *ClientConn { + + opt := defaultClientOptions + if len(opts) > 0 { + opt = opts[0] + } + + client := &ClientConn{ + host: host, + Options: opt, + mu: &sync.Mutex{}, + connMu: &sync.Mutex{}, + version: defaultVersion, + endpoints: &sync.Map{}, + } + + client.ClientServiceImpl = clientv1.NewClientService(client) + + return client +} + +type ClientConn struct { + Options + + host string + conn net.Conn + usedAt uint32 // atomic + _closed uint32 // atomic + _connected uint32 // atomic + _locked uint32 // atomic + + stats Stats + mu *sync.Mutex // Блокировка всего клиента + connMu *sync.Mutex // Блокировка только соединения + endpoints *sync.Map + version string + + clientv1.ClientServiceImpl +} + +type Stats struct { + Recv uint32 + Send uint32 + Wrong uint32 + Ping uint32 +} + +type Options struct { + IdleTimeout time.Duration + IdleCheckFrequency time.Duration + Timeout time.Duration + NegotiateMessage *protocolv1.NegotiateMessage + ConnectMessage *protocolv1.ConnectMessage + OpenEndpoint *protocolv1.EndpointOpen +} + +var defaultClientOptions = Options{ + IdleTimeout: 30 * time.Minute, + IdleCheckFrequency: 5 * time.Minute, + Timeout: 3 * time.Second, + NegotiateMessage: protocolv1.NewNegotiateMessage(), + ConnectMessage: &protocolv1.ConnectMessage{}, + OpenEndpoint: &protocolv1.EndpointOpen{ + Service: "v8.service.Admin.Cluster", + Version: defaultVersion, + }, +} + +func (c *ClientConn) GetEndpoint(ctx context.Context) (clientv1.EndpointServiceImpl, error) { + + md, ok := metadata.FromIncomingContext(ctx) + + if !ok { + return nil, status.Errorf(codes.DataLoss, "Client: failed to get metadata") + } + + if t, ok := md["endpoint_id"]; ok { + + for _, e := range t { + if endpoint, ok := c.getEndpoint(e); ok { + return clientv1.NewEndpointService(c, endpoint), nil + + } + } + } + + endpoint, err := c.turnEndpoint(ctx) + if err != nil { + return nil, err + } + + return clientv1.NewEndpointService(c, endpoint), nil + +} + +func (c *ClientConn) getEndpoint(id string) (*protocolv1.Endpoint, bool) { + + if val, ok := c.endpoints.Load(id); ok { + return val.(*protocolv1.Endpoint), ok + } + return nil, false +} + +func (c *ClientConn) addEndpoint(endpoint *protocolv1.Endpoint) { + + id := cast.ToString(endpoint.GetId()) + log.Println(id) + c.endpoints.Store(id, endpoint) + +} + +func (c *ClientConn) turnEndpoint(ctx context.Context) (*protocolv1.Endpoint, error) { + + EndpointOpenAck, err := c.EndpointOpen(ctx, &protocolv1.EndpointOpen{ + Service: "v8.service.Admin.Cluster", + Version: c.version, + }) + + if err != nil { + var version string + + if version = clientv1.DetectSupportedVersion(err); len(version) == 0 { + return nil, err + } + if EndpointOpenAck, err = c.EndpointOpen(ctx, &protocolv1.EndpointOpen{ + Service: "v8.service.Admin.Cluster", + Version: version, + }); err != nil { + return nil, err + } + + c.version = version + } + + end, err := c.NewEndpoint(ctx, EndpointOpenAck) + if err != nil { + return nil, err + } + + c.addEndpoint(end) + + return end, nil +} + +func (c *ClientConn) EndpointMessage(ctx context.Context, req *protocolv1.EndpointMessage) (*protocolv1.EndpointMessage, error) { + defer func() { + header := metadata.New(map[string]string{ + "endpoint_id": cast.ToString(req.GetEndpointId()), + "endpoint_format": cast.ToString(req.GetFormat()), + }) + + _ = grpc.SendHeader(ctx, header) + + }() + + return c.ClientServiceImpl.EndpointMessage(ctx, req) + +} + +func (c *ClientConn) Read(p []byte) (n int, err error) { + + if c.closed() { + if err := c.reconnect(); err != nil { + return 0, err + } + } + + err = c.conn.SetReadDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return 0, err + } + defer func() { + c.SetUsedAt(time.Now()) + }() + + return c.conn.Read(p) + +} + +func (c *ClientConn) Write(p []byte) (n int, err error) { + + if c.closed() { + if err := c.reconnect(); err != nil { + return 0, err + } + } + + err = c.conn.SetWriteDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return 0, err + } + defer func() { + c.SetUsedAt(time.Now()) + }() + + return c.conn.Write(p) +} + +func (c *ClientConn) UsedAt() time.Time { + unix := atomic.LoadUint32(&c.usedAt) + return time.Unix(int64(unix), 0) +} + +func (c *ClientConn) SetUsedAt(tm time.Time) { + atomic.StoreUint32(&c.usedAt, uint32(tm.Unix())) +} + +func (c *ClientConn) Close() error { + + if !atomic.CompareAndSwapUint32(&c._closed, 0, 1) { + return nil + } + + ctx := context.Background() + var err error + // c.endpoints.Range(func(key, value interface{}) bool { + // + // // err = c.request(ctx, &protocolv1.EndpointClose{EndpointId: key.(int32)}, nil) + // // if err != nil { + // // return false + // // } + // // + // return true + // }) + + if atomic.CompareAndSwapUint32(&c._connected, 0, 1) { + + _, err = c.Disconnect(ctx, &protocolv1.DisconnectMessage{}) + if err != nil { + return err + } + + } + + if c.closed() { + return nil + } + + return c.conn.Close() +} + +func (c *ClientConn) Lock() { + c.connMu.Lock() +} + +func (c *ClientConn) Unlock() { + c.connMu.Unlock() +} + +func (c *ClientConn) reconnect() (err error) { + + c.mu.Lock() + defer c.mu.Unlock() + + if c.connected() { + return nil + } + + c.endpoints = &sync.Map{} + + ctx := context.Background() + + err = c.populateConn() + if err != nil { + return err + } + + err = c.NegotiateMessage.Formatter(c.conn, 0) + if err != nil { + return + } + + _, err = c.connect(ctx, c.ConnectMessage) + if err != nil { + return + } + + atomic.StoreUint32(&c._connected, 1) + + return err + +} + +func (x *ClientConn) connect(ctx context.Context, req *protocolv1.ConnectMessage) (*protocolv1.ConnectMessageAck, error) { + + // Check context + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + packet, err := protocolv1.NewPacket(req) + if err != nil { + return nil, err + } + if _, err := packet.WriteTo(x.conn); err != nil { + return nil, err + } + ackPacket, err := protocolv1.NewPacket(x.conn) + if err != nil { + return nil, err + } + resp := new(protocolv1.ConnectMessageAck) + return resp, ackPacket.Unpack(resp) +} + +func (c *ClientConn) connected() bool { + return atomic.LoadUint32(&c._connected) == 1 +} + +func (c *ClientConn) populateConn() (err error) { + + conn, err := net.Dial("tcp", c.host) + if err != nil { + return err + } + + c.conn = conn + return nil +} + +func (c *ClientConn) closed() bool { + + if atomic.LoadUint32(&c._closed) == 1 || c.conn == nil { + return true + } + _ = c.conn.SetReadDeadline(time.Now()) + _, err := c.conn.Read(make([]byte, 0)) + var zero time.Time + _ = c.conn.SetReadDeadline(zero) + + if err == nil { + return false + } + + netErr, _ := err.(net.Error) + if err != io.EOF && !netErr.Timeout() { + atomic.StoreUint32(&c._closed, 1) + return true + } + return false +} diff --git a/pkg/server/server.go b/pkg/server/server.go index b468838..c935720 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -5,13 +5,12 @@ import ( "fmt" clientv1 "github.com/v8platform/protos/gen/ras/client/v1" messagesv1 "github.com/v8platform/protos/gen/ras/messages/v1" - protocolv1 "github.com/v8platform/protos/gen/ras/protocol/v1" ras_service "github.com/v8platform/protos/gen/ras/service/api/v1" + "github.com/v8platform/ras-grpc-gw/pkg/client" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "log" "net" - "sync" ) func NewRASServer(rasAddr string) *RASServer { @@ -48,137 +47,89 @@ func (s *RASServer) Serve(host string) error { func NewRasClientServiceServer(rasAddr string) ras_service.RASServiceServer { return &rasClientServiceServer{ - Host: rasAddr, - ClientServiceImpl: clientv1.NewClientService(rasAddr), - once: &sync.Once{}, + client: client.NewClientConn(rasAddr), } } type rasClientServiceServer struct { ras_service.UnimplementedRASServiceServer - clientv1.ClientServiceImpl - rasService clientv1.RasServiceImpl - endpoint clientv1.EndpointServiceImpl - Host string - once *sync.Once + client *client.ClientConn } func (s *rasClientServiceServer) AuthenticateCluster(ctx context.Context, request *messagesv1.ClusterAuthenticateRequest) (*emptypb.Empty, error) { - if err := s.initOnce(); err != nil { + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } + auth := clientv1.NewAuthService(endpoint) - auth := clientv1.NewAuthService(s.endpoint) - return auth.AuthenticateCluster(request) + return auth.AuthenticateCluster(ctx, request) } func (s *rasClientServiceServer) AuthenticateInfobase(ctx context.Context, request *messagesv1.AuthenticateInfobaseRequest) (*emptypb.Empty, error) { - if err := s.initOnce(); err != nil { + + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } + auth := clientv1.NewAuthService(endpoint) - auth := clientv1.NewAuthService(s.endpoint) - return auth.AuthenticateInfobase(request) + return auth.AuthenticateInfobase(ctx, request) } func (s *rasClientServiceServer) AuthenticateAgent(ctx context.Context, request *messagesv1.AuthenticateAgentRequest) (*emptypb.Empty, error) { - if err := s.initOnce(); err != nil { + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } + auth := clientv1.NewAuthService(endpoint) - auth := clientv1.NewAuthService(s.endpoint) - return auth.AuthenticateAgent(request) + return auth.AuthenticateAgent(ctx, request) } func (s *rasClientServiceServer) GetClusters(ctx context.Context, request *messagesv1.GetClustersRequest) (*messagesv1.GetClustersResponse, error) { - if err := s.initOnce(); err != nil { + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } - - service := clientv1.NewClustersService(s.endpoint) - return service.GetClusters(request) + service := clientv1.NewClustersService(endpoint) + return service.GetClusters(ctx, request) } func (s *rasClientServiceServer) GetClusterInfo(ctx context.Context, request *messagesv1.GetClusterInfoRequest) (*messagesv1.GetClusterInfoResponse, error) { - if err := s.initOnce(); err != nil { + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } - - service := clientv1.NewClustersService(s.endpoint) - return service.GetClusterInfo(request) + service := clientv1.NewClustersService(endpoint) + return service.GetClusterInfo(ctx, request) } func (s *rasClientServiceServer) GetSessions(ctx context.Context, request *messagesv1.GetSessionsRequest) (*messagesv1.GetSessionsResponse, error) { - if err := s.initOnce(); err != nil { + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } - service := clientv1.NewSessionsService(s.endpoint) - return service.GetSessions(request) + service := clientv1.NewSessionsService(endpoint) + return service.GetSessions(ctx, request) } func (s *rasClientServiceServer) GetShortInfobases(ctx context.Context, request *messagesv1.GetInfobasesShortRequest) (*messagesv1.GetInfobasesShortResponse, error) { - if err := s.initOnce(); err != nil { + endpoint, err := s.client.GetEndpoint(ctx) + if err != nil { return nil, err } - - service := clientv1.NewInfobasesService(s.endpoint) - return service.GetShortInfobases(request) + service := clientv1.NewInfobasesService(endpoint) + return service.GetShortInfobases(ctx, request) } func (s *rasClientServiceServer) GetInfobaseSessions(ctx context.Context, request *messagesv1.GetInfobaseSessionsRequest) (*messagesv1.GetInfobaseSessionsResponse, error) { - service := clientv1.NewInfobasesService(s.endpoint) - return service.GetSessions(request) -} - -func (s *rasClientServiceServer) initOnce() (err error) { - s.once.Do(func() { - err = s.init() - }) - - return err -} - -func (s *rasClientServiceServer) init() error { - - _, err := s.Negotiate(protocolv1.NewNegotiateMessage()) - if err != nil { - return err - } - - _, err = s.Connect(&protocolv1.ConnectMessage{}) - if err != nil { - return err - } - - endpointOpenAck, err := s.EndpointOpen(&protocolv1.EndpointOpen{ - Service: "v8.service.Admin.Cluster", - Version: "10.0", - }) - + endpoint, err := s.client.GetEndpoint(ctx) if err != nil { - supporsedVarsion := s.DetectSupportedVersion(err) - if len(supporsedVarsion) > 0 { - endpointOpenAck, err = s.EndpointOpen(&protocolv1.EndpointOpen{ - Service: "v8.service.Admin.Cluster", - Version: supporsedVarsion, - }) - if err != nil { - return err - } - } else { - return err - } - } - - endpoint, err := s.NewEndpoint(endpointOpenAck) - if err != nil { - return err + return nil, err } - - s.endpoint = clientv1.NewEndpointService(s, endpoint) - - return nil - + service := clientv1.NewInfobasesService(endpoint) + return service.GetSessions(ctx, request) } diff --git a/protos/protoset.bin b/protos/protoset.bin index 3ba6db4..7581c12 100644 Binary files a/protos/protoset.bin and b/protos/protoset.bin differ diff --git a/readme.md b/readme.md index 5402bc4..a80f643 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,7 @@ # ras-grpc-gw [![Release](https://img.shields.io/github/release/v8platform/ras-grpc-gw.svg?style=for-the-badge)](https://github.com/v8platform/ras-grpc-gw/releases/latest) -[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=for-the-badge)](/LICENSE.md) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=for-the-badge)](LICENSE) [![Build status](https://img.shields.io/github/workflow/status/v8platform/ras-grpc-gw/goreleaser?style=for-the-badge)](https://github.com/v8platform/ras-grpc-gw/actions?workflow=goreleaser) [![Codecov branch](https://img.shields.io/codecov/c/github/v8platform/ras-grpc-gw/master.svg?style=for-the-badge)](https://codecov.io/gh/v8platform/ras-grpc-gw) [![Go Doc](https://img.shields.io/badge/godoc-reference-blue.svg?style=for-the-badge)](http://godoc.org/github.com/v8platform/ras-grpc-gw) @@ -21,6 +21,11 @@ RAS GRPC gateway (`ras-grpc-gw`) - прокси сервер для службы Предполагаемое использование, разворачивание рядом со службой RAS в виде docker-контейнера или отдельной службы +Особенности: + +* Подключение к RAS только при первом запросе +* Если соединение было закрыто то делается `одна` попытка переподключения, при этом все точки сбрасываются + ### Реализованная функциональность * Сервис авторизации `AuthService` @@ -47,6 +52,17 @@ RAS GRPC gateway (`ras-grpc-gw`) - прокси сервер для службы ## Как использовать +### Работа с `endpoint` + +Для работы с точками обмена используются заголовки (метаданные) сообщений + +Если в заголовке (или метаданные) не передать ключе `endpoint_id` тогда операция выполниться в новой точке, +И в ответном сообщении будет указан `endpoint_id` - новой открытой точки, для дальнейшей работы с этом отрытой точкой обмена + +Для `grpcurl` указание происходить через флаг `-H`. Например, `-H endpoint_id:1` + +Для других клиентов надо передавать заголовок (метаданные) + ### Запуск локально Запуск сервера для службы RAS локального кластера 1С по адресу `localhost` @@ -93,7 +109,7 @@ go install github.com/fullstorydev/grpcurl/cmd/grpcurl `CLI` ```shell -grpcurl -protoset ./protos/protoset.bin -plaintext -d '{}' localhost:3002 ras.service.api.v1.ClustersService/GetClusters +grpcurl -protoset ./protos/protoset.bin -plaintext -H endpoint_id:1 -d '{}' localhost:3002 ras.service.api.v1.ClustersService/GetClusters ``` or @@ -104,17 +120,17 @@ docker run -it -v $PWD/protos/protoset.bin:/protos/protoset.bin fullstorydev/grp *Установка авторизации на кластере* ```shell -grpcurl -protoset ./protos/protoset.bin -plaintext -d '{\"cluster_id\": \"e9261ed1-c9d0-40e5-8222-c7996493d507\"}' localhost:3002 ras.service.api.v1.AuthService/AuthenticateCluster +grpcurl -protoset ./protos/protoset.bin -plaintext -H endpoint_id:1 -d '{\"cluster_id\": \"e9261ed1-c9d0-40e5-8222-c7996493d507\"}' localhost:3002 ras.service.api.v1.AuthService/AuthenticateCluster ``` *Получение списка сессий кластера* ```shell -grpcurl -protoset ./protos/protoset.bin -plaintext -d '{\"cluster_id\": \"e9261ed1-c9d0-40e5-8222-c7996493d507\"}' localhost:3002 ras.service.api.v1.SessionsService/GetSessions +grpcurl -protoset ./protos/protoset.bin -plaintext -H endpoint_id:1 -d '{\"cluster_id\": \"e9261ed1-c9d0-40e5-8222-c7996493d507\"}' localhost:3002 ras.service.api.v1.SessionsService/GetSessions ``` *Получение списка информационных баз* ```shell -grpcurl -protoset ./protos/protoset.bin -plaintext -d '{\"cluster_id\": \"e9261ed1-c9d0-40e5-8222-c7996493d507\"}' localhost:3002 ras.service.api.v1.InfobasesService/GetShortInfobases +grpcurl -protoset ./protos/protoset.bin -plaintext -H endpoint_id:1 -d '{\"cluster_id\": \"e9261ed1-c9d0-40e5-8222-c7996493d507\"}' localhost:3002 ras.service.api.v1.InfobasesService/GetShortInfobases ``` ## License