-
Notifications
You must be signed in to change notification settings - Fork 869
feat(shard-distributor): add SpectatorPeerChooser for shard-aware routing #7478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(shard-distributor): add SpectatorPeerChooser for shard-aware routing #7478
Conversation
642ad52 to
1597904
Compare
1597904 to
f4bb653
Compare
Implement a YARPC peer chooser that routes requests to the correct
executor based on shard ownership. This is the shard distributor
equivalent of Cadence's RingpopPeerChooser.
Flow:
1. Client calls RPC with yarpc.WithShardKey("shard-key")
2. Chooser queries Spectator for shard owner
3. Extracts grpc_address from owner metadata
4. Creates/reuses peer for that address
5. Returns peer to YARPC for connection
The peer chooser maintains a cache of peers and handles concurrent
access safely. It uses the x-shard-distributor-namespace header to
determine which namespace's spectator to query.
Dependencies:
- Requires spectator GetShardOwner to return metadata (see previous commit)
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
The tests have dependency issues with mock generation that need to be resolved separately. The peer chooser implementation is complete and functional. Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
Tests cover: - Success path with peer creation - Peer reuse on subsequent calls - Error cases (missing shard key, namespace header, spectator not found) - Lifecycle methods (Start, Stop, IsRunning) - SetSpectators method Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
f4bb653 to
85017c4
Compare
| logger log.Logger | ||
| namespace string | ||
|
|
||
| mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe comment that mu is exactly to cover changes in peers (and not, maybe spectators).
| // | ||
| // The ShardKey in the request is the actual shard key (e.g., workflow ID, shard ID), | ||
| // NOT the ip:port address. This is the key distinction from directPeerChooser. | ||
| func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it is already explained in the inrterface, but since comments are here, I think returning func(error) deserves an explanation. Especially since it's always either nil or [no-op] func(error) {}.
| } | ||
|
|
||
| // Extract GRPC address from owner metadata | ||
| grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a bit unexpected to see such specifics as grpcAddressMetadataKey here.
| // Check if we already have a peer for this address | ||
| c.mu.RLock() | ||
| p, ok := c.peers[grpcAddress] | ||
| if ok { | ||
| c.mu.RUnlock() | ||
| return p, func(error) {}, nil | ||
| } | ||
| c.mu.RUnlock() | ||
|
|
||
| // Create new peer for this address | ||
| p, err = c.addPeer(grpcAddress) | ||
| if err != nil { | ||
| return nil, nil, yarpcerrors.InternalErrorf("failed to add peer for address %s: %v", grpcAddress, err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving this logic to getOrCreatePeer sort-a thing.
Then it will be a single place dealing with c.mu + the usage [optimised get covered by RLock] should become more obvious.
This PR depends on #7476 (spectator return metadata) being merged first.
What changed?
Added
SpectatorPeerChooserthat implements YARPC's peer.Chooser interface to route requests to the correct executor based on shard ownership.Why?
Enable executor-to-executor communication in the shard distributor canary system. The peer chooser queries the Spectator to find which executor owns a shard, then routes requests to that executor's gRPC address.
How did you test it?
Unit tests
Potential risks
None
Release notes
Documentation Changes