Skip to content

Commit

Permalink
Merge pull request #9 from ycvk/issue#8
Browse files Browse the repository at this point in the history
Issue#8
  • Loading branch information
ycvk authored May 17, 2024
2 parents fe04856 + 98cdb19 commit 6def10d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 72 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/gin-gonic/gin v1.10.0
github.com/imroc/req/v3 v3.43.4
github.com/imroc/req/v3 v3.43.5
github.com/samber/lo v1.39.0
github.com/sourcegraph/conc v0.3.0
github.com/ycvk/endless v0.0.0-20240425132555-71b31d16be07
Expand All @@ -24,7 +24,7 @@ require (
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/pprof v0.0.0-20240507183855-6f11f98ebb1c // indirect
github.com/google/pprof v0.0.0-20240509144519-723abb6459b7 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
11 changes: 4 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,22 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20240507183855-6f11f98ebb1c h1:GCixZ7sgey01Kjw8pxBzCD0uVrubxl8SRzRgI0jwP+A=
github.com/google/pprof v0.0.0-20240507183855-6f11f98ebb1c/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/pprof v0.0.0-20240509144519-723abb6459b7 h1:velgFPYr1X9TDwLIfkV7fWqsFlf7TeP11M/7kPd/dVI=
github.com/google/pprof v0.0.0-20240509144519-723abb6459b7/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/imroc/req/v3 v3.43.3 h1:WdZhpUev9THtuwEZsW2LOYacl12fm7IkB7OgACv40+k=
github.com/imroc/req/v3 v3.43.3/go.mod h1:SQIz5iYop16MJxbo8ib+4LnostGCok8NQf8ToyQc2xA=
github.com/imroc/req/v3 v3.43.4 h1:NSXlB5dELZuxzGEFRWLWEQ9dQmh8d9pUMPa7MevK1K4=
github.com/imroc/req/v3 v3.43.4/go.mod h1:SQIz5iYop16MJxbo8ib+4LnostGCok8NQf8ToyQc2xA=
github.com/imroc/req/v3 v3.43.5 h1:fL7dOEfld+iEv1rwnIxseJz2/Y7JZ/HgbAURLZkat80=
github.com/imroc/req/v3 v3.43.5/go.mod h1:SQIz5iYop16MJxbo8ib+4LnostGCok8NQf8ToyQc2xA=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
Expand Down
23 changes: 3 additions & 20 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"deeplx-local/channel"
"deeplx-local/cron"
"deeplx-local/domain"
"deeplx-local/pkg"
"deeplx-local/service"
"errors"
"fmt"
Expand All @@ -24,12 +24,7 @@ import (
)

var (
client = req.NewClient().SetTimeout(3 * time.Second)
validReq = domain.TranslateRequest{
Text: "I love you",
SourceLang: "EN",
TargetLang: "ZH",
}
client = req.NewClient().SetTimeout(3 * time.Second)
hunterKey = os.Getenv("hunter_api_key")
quakeKey = os.Getenv("360_api_key")
scanService service.ScanService
Expand Down Expand Up @@ -87,7 +82,7 @@ func getValidURLs() []string {
p := pool.New().WithMaxGoroutines(30)
for _, url := range urls {
p.Go(func() {
if availability, err := checkURLAvailability(url); err == nil && availability {
if availability, err := pkg.CheckURLAvailability(client, url); err == nil && availability {
validList = append(validList, url)
}
})
Expand Down Expand Up @@ -138,18 +133,6 @@ func distinctURLs(urls *[]string) {
}
}

// checkURLAvailability 检查URL是否可用
func checkURLAvailability(url string) (bool, error) {
var result domain.TranslateResponse
response, err := client.R().SetBody(&validReq).SetSuccessResult(&result).Post(url)
if err != nil {
//log.Printf("error: url:[%s] %s\n", url, err)
return false, err
}
defer response.Body.Close()
return "我爱你" == result.Data, nil
}

// 监听退出
func exit(engine *http.Server) {
osSig := make(chan os.Signal, 1)
Expand Down
25 changes: 25 additions & 0 deletions pkg/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package pkg

import (
"deeplx-local/domain"
"github.com/imroc/req/v3"
)

const validResp = "我爱你"

var validReq = domain.TranslateRequest{
Text: "I love you",
SourceLang: "EN",
TargetLang: "ZH",
}

// CheckURLAvailability 检查URL是否可用
func CheckURLAvailability(client *req.Client, url string) (bool, error) {
var result domain.TranslateResponse
response, err := client.R().SetBody(&validReq).SetSuccessResult(&result).Post(url)
if err != nil {
return false, err
}
defer response.Body.Close()
return validResp == result.Data, nil
}
3 changes: 1 addition & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

通过并发请求存在`url.txt`内的 deeplx 的翻译接口,来获取低延迟、可用的url。


初步实现了负载均衡,延迟越低响应越快的接口会被优先使用。
翻译超大文本时,会自动做拆分并行翻译合并处理。

### 一键启动
`docker run -itd -p 8080:62155 neccen/deeplx-local:latest`
Expand Down
96 changes: 55 additions & 41 deletions service/balancer_deeplx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,54 @@ package service
import (
"context"
"deeplx-local/domain"
"deeplx-local/pkg"
"github.com/imroc/req/v3"
lop "github.com/samber/lo/parallel"
"github.com/sourcegraph/conc/pool"
"github.com/sourcegraph/conc/stream"
"log"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
)

const maxLength = 4096
const (
maxLength = 4096
maxFailures = 3 //最大健康检查错误次数
)

type Server struct {
URL string
Weight int
CurrentWeight int
ResponseTime time.Duration
Weight int64
CurrentWeight int64
isAvailable bool
failureCount int
}

type LoadBalancer struct {
Servers []*Server
mutex sync.Mutex
re *regexp.Regexp
client *req.Client
Servers []*Server
re *regexp.Regexp
client *req.Client
index uint32
unavailableServers []*Server // 不可用的服务器
healthCheck *time.Ticker // 健康检查定时器
}

// NewLoadBalancer 负载均衡
func NewLoadBalancer(vlist *[]string) TranslateService {
servers := lop.Map(*vlist, func(item string, index int) *Server {
return &Server{URL: item, Weight: 1, CurrentWeight: 1}
return &Server{URL: item, Weight: 1, CurrentWeight: 1, isAvailable: true}
})
return &LoadBalancer{
Servers: servers,
client: req.NewClient().SetTimeout(2 * time.Second),
re: regexp.MustCompile(`[^.!?]+[.!?]`),
lb := &LoadBalancer{
Servers: servers,
client: req.NewClient().SetTimeout(2 * time.Second),
re: regexp.MustCompile(`[^.!?。!?]+[.!?。!?]`), //还有一种方式是 [^.!?。!?\s]+[.!?。!?]?\s* 这样能分割得更细小,但感觉没必要
unavailableServers: make([]*Server, 0),
healthCheck: time.NewTicker(time.Minute),
}
go lb.startHealthCheck() // 开启定时健康检查
return lb
}

func (lb *LoadBalancer) GetTranslateData(trReq domain.TranslateRequest) domain.TranslateResponse {
Expand Down Expand Up @@ -103,14 +114,11 @@ func (lb *LoadBalancer) sendRequest(trReq domain.TranslateRequest) domain.Transl
contextPool.Go(func(ctx context.Context) error {
server := lb.getServer()
var trResult domain.TranslateResponse
start := time.Now()
response, err := lb.client.R().
SetContext(ctx).
SetBody(trReq).
SetSuccessResult(&trResult).
Post(server.URL)
elapsed := time.Since(start)
lb.updateResponseTime(server, elapsed)

if err != nil {
return err
Expand All @@ -120,6 +128,9 @@ func (lb *LoadBalancer) sendRequest(trReq domain.TranslateRequest) domain.Transl
if trResult.Code == 200 && len(trResult.Data) > 0 {
resultChan <- trResult
cancelFunc()
} else {
server.isAvailable = false
lb.unavailableServers = append(lb.unavailableServers, server)
}
return nil
})
Expand Down Expand Up @@ -148,32 +159,35 @@ func (lb *LoadBalancer) sendRequest(trReq domain.TranslateRequest) domain.Transl
}

func (lb *LoadBalancer) getServer() *Server {
lb.mutex.Lock()
defer lb.mutex.Unlock()

var bestServer *Server
total := 0

for _, server := range lb.Servers {
server.CurrentWeight += server.Weight
total += server.Weight

if bestServer == nil || server.CurrentWeight > bestServer.CurrentWeight {
bestServer = server
}
}
index := atomic.AddUint32(&lb.index, 1) - 1
server := lb.Servers[index%uint32(len(lb.Servers))]

if bestServer != nil {
bestServer.CurrentWeight -= total
for !server.isAvailable {
index = atomic.AddUint32(&lb.index, 1) - 1
server = lb.Servers[index%uint32(len(lb.Servers))]
}

return bestServer
return server
}

func (lb *LoadBalancer) updateResponseTime(server *Server, responseTime time.Duration) {
lb.mutex.Lock()
defer lb.mutex.Unlock()

server.ResponseTime = responseTime
server.Weight = int(time.Second / (responseTime + 1))
func (lb *LoadBalancer) startHealthCheck() {
for range lb.healthCheck.C {
for i := 0; i < len(lb.unavailableServers); i++ {
server := lb.unavailableServers[i]
flag, _ := pkg.CheckURLAvailability(lb.client, server.URL)
if flag {
server.isAvailable = true
server.failureCount = 0
copy(lb.unavailableServers[i:], lb.unavailableServers[i+1:])
lb.unavailableServers = lb.unavailableServers[:len(lb.unavailableServers)-1]
i--
} else {
server.failureCount++
if server.failureCount >= maxFailures {
copy(lb.unavailableServers[i:], lb.unavailableServers[i+1:])
lb.unavailableServers = lb.unavailableServers[:len(lb.unavailableServers)-1]
i--
}
}
}
}
}

0 comments on commit 6def10d

Please sign in to comment.