diff --git a/src/binance-proxy/handler/handler.go b/src/binance-proxy/handler/handler.go index 6a3f143..a21563e 100644 --- a/src/binance-proxy/handler/handler.go +++ b/src/binance-proxy/handler/handler.go @@ -12,10 +12,10 @@ import ( func NewHandler( ctx context.Context, class service.Class, - enableFakeKline, startTickerWithKline, startOrderbookWithKline bool, + enableFakeKline, startTickerWithKline, startDepthWithKline bool, ) func(w http.ResponseWriter, r *http.Request) { handler := &Handler{ - srv: service.NewService(ctx, class, startTickerWithKline, startOrderbookWithKline), + srv: service.NewService(ctx, class, startTickerWithKline, startDepthWithKline), class: class, enableFakeKline: enableFakeKline, } diff --git a/src/binance-proxy/main.go b/src/binance-proxy/main.go index 039452a..952f79b 100644 --- a/src/binance-proxy/main.go +++ b/src/binance-proxy/main.go @@ -17,7 +17,7 @@ import ( func startProxy(ctx context.Context, address string, class service.Class) { mux := http.NewServeMux() - mux.HandleFunc("/", handler.NewHandler(ctx, class, flagEnableFakeKline, flagStartTickerWithKline, flagStartOrderbookWithKline)) + mux.HandleFunc("/", handler.NewHandler(ctx, class, flagEnableFakeKline, flagStartTickerWithKline, flagStartDepthWithKline)) log.Infof("Start %s proxy !Address: %s", class, address) if err := http.ListenAndServe(address, mux); err != nil { @@ -42,14 +42,14 @@ var flagFuturesAddress string var flagDebug bool var flagEnableFakeKline bool var flagStartTickerWithKline bool -var flagStartOrderbookWithKline bool +var flagStartDepthWithKline bool func main() { flag.StringVar(&flagSpotAddress, "s", ":8090", "spot bind address.") flag.StringVar(&flagFuturesAddress, "f", ":8091", "futures bind address.") flag.BoolVar(&flagEnableFakeKline, "fakekline", false, "enable fake kline.") flag.BoolVar(&flagStartTickerWithKline, "at", false, "auto monitor ticker with kline request.") - flag.BoolVar(&flagStartOrderbookWithKline, "ao", false, "auto monitor orderbook with kline request.") + flag.BoolVar(&flagStartDepthWithKline, "ao", false, "auto monitor depth with kline request.") flag.BoolVar(&flagDebug, "v", false, "print debug log.") flag.Parse() diff --git a/src/binance-proxy/service/service.go b/src/binance-proxy/service/service.go index ff4c313..ddc691d 100644 --- a/src/binance-proxy/service/service.go +++ b/src/binance-proxy/service/service.go @@ -12,8 +12,8 @@ type Service struct { ctx context.Context cancel context.CancelFunc - startTickerWithKline bool - startOrderbookWithKline bool + startTickerWithKline bool + startDepthWithKline bool class Class exchangeInfoSrv *ExchangeInfoSrv @@ -21,18 +21,16 @@ type Service struct { depthSrv sync.Map // map[symbolInterval]*Depth tickerSrv sync.Map // map[symbolInterval]*Ticker - klineIntervalMap sync.Map // map[string]string - lastGetKlines sync.Map // map[symbolInterval]time.Time lastGetDepth sync.Map // map[symbolInterval]time.Time lastGetTicker sync.Map // map[symbolInterval]time.Time } -func NewService(ctx context.Context, class Class, startTickerWithKline, startOrderbookWithKline bool) *Service { +func NewService(ctx context.Context, class Class, startTickerWithKline, startDepthWithKline bool) *Service { s := &Service{ - class: class, - startTickerWithKline: startTickerWithKline, - startOrderbookWithKline: startOrderbookWithKline, + class: class, + startTickerWithKline: startTickerWithKline, + startDepthWithKline: startDepthWithKline, } s.ctx, s.cancel = context.WithCancel(ctx) s.exchangeInfoSrv = NewExchangeInfoSrv(s.ctx, NewSymbolInterval(s.class, "", "")) @@ -59,10 +57,13 @@ func NewService(ctx context.Context, class Class, startTickerWithKline, startOrd } func (s *Service) autoRemoveExpired() { + var aliveKlines = make(map[string]struct{}) s.klinesSrv.Range(func(k, v interface{}) bool { si := k.(symbolInterval) srv := v.(*KlinesSrv) + aliveKlines[si.Symbol] = struct{}{} + if t, ok := s.lastGetKlines.Load(si); ok { if time.Now().Sub(t.(time.Time)) > 2*INTERVAL_2_DURATION[si.Interval] { log.Debugf("%s.Kline srv expired!Removed", si) @@ -82,7 +83,9 @@ func (s *Service) autoRemoveExpired() { srv := v.(*DepthSrv) if t, ok := s.lastGetDepth.Load(si); ok { - if time.Now().Sub(t.(time.Time)) > 2*time.Minute { + _, isKlineAlive := aliveKlines[si.Symbol] + + if ((s.startDepthWithKline && !isKlineAlive) || !s.startDepthWithKline) && time.Now().Sub(t.(time.Time)) > 2*time.Minute { log.Debugf("%s.Depth srv expired!Removed", si) s.lastGetDepth.Delete(si) @@ -100,7 +103,9 @@ func (s *Service) autoRemoveExpired() { srv := v.(*TickerSrv) if t, ok := s.lastGetTicker.Load(si); ok { - if time.Now().Sub(t.(time.Time)) > 2*time.Minute { + _, isKlineAlive := aliveKlines[si.Symbol] + + if ((s.startTickerWithKline && !isKlineAlive) || !s.startTickerWithKline) && time.Now().Sub(t.(time.Time)) > 2*time.Minute { log.Debugf("%s.Ticker srv expired!Removed", si) s.lastGetTicker.Delete(si) @@ -117,15 +122,11 @@ func (s *Service) autoRemoveExpired() { func (s *Service) Ticker(symbol string) *Ticker24hr { si := NewSymbolInterval(s.class, symbol, "") - srv, loaded := s.tickerSrv.Load(*si) - if !loaded { - if srv, loaded = s.tickerSrv.LoadOrStore(*si, NewTickerSrv(s.ctx, si)); loaded == false { - srv.(*TickerSrv).Start() - } - } + srv := s.StartTickerSrv(si) + s.lastGetTicker.Store(*si, time.Now()) - return srv.(*TickerSrv).GetTicker() + return srv.GetTicker() } func (s *Service) ExchangeInfo() []byte { @@ -134,27 +135,54 @@ func (s *Service) ExchangeInfo() []byte { func (s *Service) Klines(symbol, interval string) []*Kline { si := NewSymbolInterval(s.class, symbol, interval) - srv, loaded := s.klinesSrv.Load(*si) - if !loaded { - if srv, loaded = s.klinesSrv.LoadOrStore(*si, NewKlinesSrv(s.ctx, si)); loaded == false { - srv.(*KlinesSrv).Start() - } + srv := s.StartKlineSrv(si) + if s.startTickerWithKline { + s.StartTickerSrv(NewSymbolInterval(s.class, symbol, "")) + } + if s.startDepthWithKline { + s.StartDepthSrv(NewSymbolInterval(s.class, symbol, "")) } s.lastGetKlines.Store(*si, time.Now()) - return srv.(*KlinesSrv).GetKlines() + return srv.GetKlines() } func (s *Service) Depth(symbol string) *Depth { si := NewSymbolInterval(s.class, symbol, "") + srv := s.StartDepthSrv(si) + + s.lastGetDepth.Store(*si, time.Now()) + + return srv.GetDepth() +} + +func (s *Service) StartKlineSrv(si *symbolInterval) *KlinesSrv { + srv, loaded := s.klinesSrv.Load(*si) + if !loaded { + if srv, loaded = s.klinesSrv.LoadOrStore(*si, NewKlinesSrv(s.ctx, si)); loaded == false { + srv.(*KlinesSrv).Start() + } + } + return srv.(*KlinesSrv) +} + +func (s *Service) StartDepthSrv(si *symbolInterval) *DepthSrv { srv, loaded := s.depthSrv.Load(*si) if !loaded { if srv, loaded = s.depthSrv.LoadOrStore(*si, NewDepthSrv(s.ctx, si)); loaded == false { srv.(*DepthSrv).Start() } } - s.lastGetDepth.Store(*si, time.Now()) + return srv.(*DepthSrv) +} - return srv.(*DepthSrv).GetDepth() +func (s *Service) StartTickerSrv(si *symbolInterval) *TickerSrv { + srv, loaded := s.tickerSrv.Load(*si) + if !loaded { + if srv, loaded = s.tickerSrv.LoadOrStore(*si, NewTickerSrv(s.ctx, si)); loaded == false { + srv.(*TickerSrv).Start() + } + } + return srv.(*TickerSrv) } diff --git a/src/binance-proxy/service/ticker.go b/src/binance-proxy/service/ticker.go index 1339608..faa5077 100644 --- a/src/binance-proxy/service/ticker.go +++ b/src/binance-proxy/service/ticker.go @@ -71,16 +71,16 @@ func (s *TickerSrv) Start() { s.bookTicker = nil s.rw.Unlock() - ticker24hrDoneC, ticker24hrstopC, err := s.connectTicker24hr() + bookDoneC, bookStopC, err := s.connectTickerBook() if err != nil { - log.Errorf("%s.Websocket 24hr ticker connect error!Error:%s", s.si, err) + bookStopC <- struct{}{} + log.Errorf("%s.Websocket book ticker connect error!Error:%s", s.si, err) continue } - bookDoneC, bookStopC, err := s.connectTickerBook() + ticker24hrDoneC, ticker24hrstopC, err := s.connectTicker24hr() if err != nil { - bookStopC <- struct{}{} - log.Errorf("%s.Websocket book ticker connect error!Error:%s", s.si, err) + log.Errorf("%s.Websocket 24hr ticker connect error!Error:%s", s.si, err) continue }