Skip to content

Commit

Permalink
Resolved/Balancer: Pass a serverID down to the balancer to avoid loop…
Browse files Browse the repository at this point in the history
…-redirection.
  • Loading branch information
cdujeu committed Jul 11, 2023
1 parent da4cc0c commit 2c3e311
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 20 deletions.
22 changes: 16 additions & 6 deletions common/client/http/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Balancer interface {
PickEndpoint(path string) (*httputil.ReverseProxy, error)
}

func NewBalancer() Balancer {
func NewBalancer(excludeID string) Balancer {
var clusterConfig *client.ClusterConfig
config.Get("cluster").Default(&client.ClusterConfig{}).Scan(&clusterConfig)
clientConfig := clusterConfig.GetClientConfig("http")
Expand All @@ -54,12 +54,14 @@ func NewBalancer() Balancer {
return &balancer{
readyProxies: map[string]*reverseProxy{},
options: opts,
excludeID: excludeID,
}
}

type balancer struct {
readyProxies map[string]*reverseProxy
options *client.BalancerOptions
excludeID string
}

type reverseProxy struct {
Expand All @@ -84,7 +86,10 @@ func (p *proxyBalancerTarget) Attributes() *attributes.Attributes {

func (b *balancer) Build(m map[string]*client.ServerAttributes) error {
usedAddr := map[string]struct{}{}
for _, mm := range m {
for srvID, mm := range m {
if b.excludeID != "" && srvID == b.excludeID {
continue
}
for _, addr := range mm.Addresses {
usedAddr[addr] = struct{}{}
proxy, ok := b.readyProxies[addr]
Expand Down Expand Up @@ -162,23 +167,28 @@ func (b *balancer) PickService(name string) (*httputil.ReverseProxy, error) {
}

func (b *balancer) PickEndpoint(path string) (*httputil.ReverseProxy, error) {
var targets []*proxyBalancerTarget
dedup := map[string]*proxyBalancerTarget{}

for addr, proxy := range b.readyProxies {
for _, endpoint := range proxy.Endpoints {
if endpoint == "/" {
continue
}
if strings.HasPrefix(path, endpoint) {
targets = append(targets, &proxyBalancerTarget{
dedup[addr] = &proxyBalancerTarget{
proxy: proxy,
address: addr,
})
}
}
}
}
if len(targets) == 0 {
if len(dedup) == 0 {
return nil, fmt.Errorf("no proxy found for endpoint %s", path)
}
var targets []*proxyBalancerTarget
for _, pbt := range dedup {
targets = append(targets, pbt)
}
if b.options != nil && len(b.options.Filters) > 0 {
for _, f := range b.options.Filters {
targets = b.applyFilter(f, targets)
Expand Down
6 changes: 3 additions & 3 deletions common/client/http/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var grpcTransport = &http.Transport{

type Resolver interface {
ServeHTTP(w http.ResponseWriter, r *http.Request) (bool, error)
Init(ctx context.Context, s server.HttpMux)
Init(ctx context.Context, serverID string, s server.HttpMux)
Stop()
}

Expand All @@ -49,12 +49,12 @@ type resolver struct {
userReady bool
}

func (m *resolver) Init(ctx context.Context, s server.HttpMux) {
func (m *resolver) Init(ctx context.Context, serverID string, s server.HttpMux) {

conn := clientcontext.GetClientConn(ctx)
reg := servercontext.GetRegistry(ctx)
rc, _ := client.NewResolverCallback(reg)
bal := NewBalancer()
bal := NewBalancer(serverID)
rc.Add(bal.Build)

m.c = conn
Expand Down
5 changes: 3 additions & 2 deletions common/server/caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func New(ctx context.Context, dir string) (server.Server, error) {
})

srvMUX := server.NewListableMux()
mux.RegisterServerMux(ctx, srvMUX)
srvID := "caddy-" + uuid.New()
mux.RegisterServerMux(ctx, srvID, srvMUX)

caddyStorePath := filepath.Join(runtime.ApplicationWorkingDir(), "caddy")
_ = os.MkdirAll(caddyStorePath, 0755)
Expand All @@ -170,7 +171,7 @@ func New(ctx context.Context, dir string) (server.Server, error) {
}

srv := &Server{
id: "caddy-" + uuid.New(),
id: srvID,
name: "caddy",
meta: make(map[string]string),

Expand Down
10 changes: 5 additions & 5 deletions common/server/caddy/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ var (
module *Middleware
)

func RegisterServerMux(ctx context.Context, s server.HttpMux) {
func RegisterServerMux(ctx context.Context, serverID string, s server.HttpMux) {
if module != nil {
module.Stop()
module.Init(ctx, s)
module.Init(ctx, serverID, s)
return
}
module = &Middleware{Resolver: clienthttp.NewResolver()}
module.Init(ctx, s)
module.Init(ctx, serverID, s)
caddy.RegisterModule(module)
httpcaddyfile.RegisterHandlerDirective("mux", parseCaddyfile)
}
Expand All @@ -53,8 +53,8 @@ type Middleware struct {
clienthttp.Resolver
}

func (m *Middleware) Init(ctx context.Context, s server.HttpMux) {
m.Resolver.Init(ctx, s)
func (m *Middleware) Init(ctx context.Context, serverID string, s server.HttpMux) {
m.Resolver.Init(ctx, serverID, s)
}

func (m *Middleware) Stop() {
Expand Down
5 changes: 3 additions & 2 deletions common/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ func New(ctx context.Context) server.Server {
*/

srv := &http.Server{}
srv.Handler = mux.NewMiddleware(ctx, lMux)
srvID := "http-" + uuid.New()
srv.Handler = mux.NewMiddleware(ctx, srvID, lMux)
srv.Handler = ContextMiddlewareHandler(middleware.ClientConnIncomingContext(ctx))(srv.Handler)
srv.Handler = ContextMiddlewareHandler(middleware.RegistryIncomingContext(ctx))(srv.Handler)

ctx, cancel := context.WithCancel(ctx)

return server.NewServer(ctx, &Server{
id: "http-" + uuid.New(),
id: srvID,
name: "http",
meta: make(map[string]string),

Expand Down
4 changes: 2 additions & 2 deletions common/server/http/mux/registrymux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ type Middleware struct {
clienthttp.Resolver
}

func NewMiddleware(ctx context.Context, s server.HttpMux) Middleware {
func NewMiddleware(ctx context.Context, serverID string, s server.HttpMux) Middleware {
m := Middleware{
Resolver: clienthttp.NewResolver(),
}
m.Resolver.Init(ctx, s)
m.Resolver.Init(ctx, serverID, s)
return m
}

Expand Down

0 comments on commit 2c3e311

Please sign in to comment.