Skip to content

Commit

Permalink
improve auto remove logic
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianceding committed Oct 21, 2021
1 parent 9d4a5b5 commit 63b684a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/binance-proxy/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

func NewHandler(
ctx context.Context, class service.Class,
enableFakeKline, startTickerWithKline, startOrderbookWithKline bool,
enableFakeKline, startTickerWithKline, startDepthWithKline bool,
) func(w http.ResponseWriter, r *http.Request) {
handler := &Handler{
srv: service.NewService(ctx, class, startTickerWithKline, startOrderbookWithKline),
srv: service.NewService(ctx, class, startTickerWithKline, startDepthWithKline),
class: class,
enableFakeKline: enableFakeKline,
}
Expand Down
6 changes: 3 additions & 3 deletions src/binance-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func startProxy(ctx context.Context, address string, class service.Class) {
mux := http.NewServeMux()
mux.HandleFunc("/", handler.NewHandler(ctx, class, flagEnableFakeKline, flagStartTickerWithKline, flagStartOrderbookWithKline))
mux.HandleFunc("/", handler.NewHandler(ctx, class, flagEnableFakeKline, flagStartTickerWithKline, flagStartDepthWithKline))

log.Infof("Start %s proxy !Address: %s", class, address)
if err := http.ListenAndServe(address, mux); err != nil {
Expand All @@ -42,14 +42,14 @@ var flagFuturesAddress string
var flagDebug bool
var flagEnableFakeKline bool
var flagStartTickerWithKline bool
var flagStartOrderbookWithKline bool
var flagStartDepthWithKline bool

func main() {
flag.StringVar(&flagSpotAddress, "s", ":8090", "spot bind address.")
flag.StringVar(&flagFuturesAddress, "f", ":8091", "futures bind address.")
flag.BoolVar(&flagEnableFakeKline, "fakekline", false, "enable fake kline.")
flag.BoolVar(&flagStartTickerWithKline, "at", false, "auto monitor ticker with kline request.")
flag.BoolVar(&flagStartOrderbookWithKline, "ao", false, "auto monitor orderbook with kline request.")
flag.BoolVar(&flagStartDepthWithKline, "ao", false, "auto monitor depth with kline request.")
flag.BoolVar(&flagDebug, "v", false, "print debug log.")
flag.Parse()

Expand Down
78 changes: 53 additions & 25 deletions src/binance-proxy/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,25 @@ type Service struct {
ctx context.Context
cancel context.CancelFunc

startTickerWithKline bool
startOrderbookWithKline bool
startTickerWithKline bool
startDepthWithKline bool

class Class
exchangeInfoSrv *ExchangeInfoSrv
klinesSrv sync.Map // map[symbolInterval]*Klines
depthSrv sync.Map // map[symbolInterval]*Depth
tickerSrv sync.Map // map[symbolInterval]*Ticker

klineIntervalMap sync.Map // map[string]string

lastGetKlines sync.Map // map[symbolInterval]time.Time
lastGetDepth sync.Map // map[symbolInterval]time.Time
lastGetTicker sync.Map // map[symbolInterval]time.Time
}

func NewService(ctx context.Context, class Class, startTickerWithKline, startOrderbookWithKline bool) *Service {
func NewService(ctx context.Context, class Class, startTickerWithKline, startDepthWithKline bool) *Service {
s := &Service{
class: class,
startTickerWithKline: startTickerWithKline,
startOrderbookWithKline: startOrderbookWithKline,
class: class,
startTickerWithKline: startTickerWithKline,
startDepthWithKline: startDepthWithKline,
}
s.ctx, s.cancel = context.WithCancel(ctx)
s.exchangeInfoSrv = NewExchangeInfoSrv(s.ctx, NewSymbolInterval(s.class, "", ""))
Expand All @@ -59,10 +57,13 @@ func NewService(ctx context.Context, class Class, startTickerWithKline, startOrd
}

func (s *Service) autoRemoveExpired() {
var aliveKlines = make(map[string]struct{})
s.klinesSrv.Range(func(k, v interface{}) bool {
si := k.(symbolInterval)
srv := v.(*KlinesSrv)

aliveKlines[si.Symbol] = struct{}{}

if t, ok := s.lastGetKlines.Load(si); ok {
if time.Now().Sub(t.(time.Time)) > 2*INTERVAL_2_DURATION[si.Interval] {
log.Debugf("%s.Kline srv expired!Removed", si)
Expand All @@ -82,7 +83,9 @@ func (s *Service) autoRemoveExpired() {
srv := v.(*DepthSrv)

if t, ok := s.lastGetDepth.Load(si); ok {
if time.Now().Sub(t.(time.Time)) > 2*time.Minute {
_, isKlineAlive := aliveKlines[si.Symbol]

if ((s.startDepthWithKline && !isKlineAlive) || !s.startDepthWithKline) && time.Now().Sub(t.(time.Time)) > 2*time.Minute {
log.Debugf("%s.Depth srv expired!Removed", si)
s.lastGetDepth.Delete(si)

Expand All @@ -100,7 +103,9 @@ func (s *Service) autoRemoveExpired() {
srv := v.(*TickerSrv)

if t, ok := s.lastGetTicker.Load(si); ok {
if time.Now().Sub(t.(time.Time)) > 2*time.Minute {
_, isKlineAlive := aliveKlines[si.Symbol]

if ((s.startTickerWithKline && !isKlineAlive) || !s.startTickerWithKline) && time.Now().Sub(t.(time.Time)) > 2*time.Minute {
log.Debugf("%s.Ticker srv expired!Removed", si)
s.lastGetTicker.Delete(si)

Expand All @@ -117,15 +122,11 @@ func (s *Service) autoRemoveExpired() {

func (s *Service) Ticker(symbol string) *Ticker24hr {
si := NewSymbolInterval(s.class, symbol, "")
srv, loaded := s.tickerSrv.Load(*si)
if !loaded {
if srv, loaded = s.tickerSrv.LoadOrStore(*si, NewTickerSrv(s.ctx, si)); loaded == false {
srv.(*TickerSrv).Start()
}
}
srv := s.StartTickerSrv(si)

s.lastGetTicker.Store(*si, time.Now())

return srv.(*TickerSrv).GetTicker()
return srv.GetTicker()
}

func (s *Service) ExchangeInfo() []byte {
Expand All @@ -134,27 +135,54 @@ func (s *Service) ExchangeInfo() []byte {

func (s *Service) Klines(symbol, interval string) []*Kline {
si := NewSymbolInterval(s.class, symbol, interval)
srv, loaded := s.klinesSrv.Load(*si)
if !loaded {
if srv, loaded = s.klinesSrv.LoadOrStore(*si, NewKlinesSrv(s.ctx, si)); loaded == false {
srv.(*KlinesSrv).Start()
}
srv := s.StartKlineSrv(si)
if s.startTickerWithKline {
s.StartTickerSrv(NewSymbolInterval(s.class, symbol, ""))
}
if s.startDepthWithKline {
s.StartDepthSrv(NewSymbolInterval(s.class, symbol, ""))
}

s.lastGetKlines.Store(*si, time.Now())

return srv.(*KlinesSrv).GetKlines()
return srv.GetKlines()
}

func (s *Service) Depth(symbol string) *Depth {
si := NewSymbolInterval(s.class, symbol, "")
srv := s.StartDepthSrv(si)

s.lastGetDepth.Store(*si, time.Now())

return srv.GetDepth()
}

func (s *Service) StartKlineSrv(si *symbolInterval) *KlinesSrv {
srv, loaded := s.klinesSrv.Load(*si)
if !loaded {
if srv, loaded = s.klinesSrv.LoadOrStore(*si, NewKlinesSrv(s.ctx, si)); loaded == false {
srv.(*KlinesSrv).Start()
}
}
return srv.(*KlinesSrv)
}

func (s *Service) StartDepthSrv(si *symbolInterval) *DepthSrv {
srv, loaded := s.depthSrv.Load(*si)
if !loaded {
if srv, loaded = s.depthSrv.LoadOrStore(*si, NewDepthSrv(s.ctx, si)); loaded == false {
srv.(*DepthSrv).Start()
}
}
s.lastGetDepth.Store(*si, time.Now())
return srv.(*DepthSrv)
}

return srv.(*DepthSrv).GetDepth()
func (s *Service) StartTickerSrv(si *symbolInterval) *TickerSrv {
srv, loaded := s.tickerSrv.Load(*si)
if !loaded {
if srv, loaded = s.tickerSrv.LoadOrStore(*si, NewTickerSrv(s.ctx, si)); loaded == false {
srv.(*TickerSrv).Start()
}
}
return srv.(*TickerSrv)
}
10 changes: 5 additions & 5 deletions src/binance-proxy/service/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ func (s *TickerSrv) Start() {
s.bookTicker = nil
s.rw.Unlock()

ticker24hrDoneC, ticker24hrstopC, err := s.connectTicker24hr()
bookDoneC, bookStopC, err := s.connectTickerBook()
if err != nil {
log.Errorf("%s.Websocket 24hr ticker connect error!Error:%s", s.si, err)
bookStopC <- struct{}{}
log.Errorf("%s.Websocket book ticker connect error!Error:%s", s.si, err)
continue
}

bookDoneC, bookStopC, err := s.connectTickerBook()
ticker24hrDoneC, ticker24hrstopC, err := s.connectTicker24hr()
if err != nil {
bookStopC <- struct{}{}
log.Errorf("%s.Websocket book ticker connect error!Error:%s", s.si, err)
log.Errorf("%s.Websocket 24hr ticker connect error!Error:%s", s.si, err)
continue
}

Expand Down

0 comments on commit 63b684a

Please sign in to comment.