Skip to content

Commit

Permalink
jobspec: add support for destination partition to upstream block (#…
Browse files Browse the repository at this point in the history
…20167)

Adds support for specifying a destination Consul admin partition in the
`upstream` block.

Fixes: #19785
  • Loading branch information
tgross authored Mar 22, 2024
1 parent de218d1 commit bdf3ff3
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .changelog/20167.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
consul/connect: Added support for destination partition in `upstream` block
```
19 changes: 6 additions & 13 deletions api/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ type ConsulUpstream struct {
DestinationName string `mapstructure:"destination_name" hcl:"destination_name,optional"`
DestinationNamespace string `mapstructure:"destination_namespace" hcl:"destination_namespace,optional"`
DestinationPeer string `mapstructure:"destination_peer" hcl:"destination_peer,optional"`
DestinationPartition string `mapstructure:"destination_partition" hcl:"destination_partition,optional"`
DestinationType string `mapstructure:"destination_type" hcl:"destination_type,optional"`
LocalBindPort int `mapstructure:"local_bind_port" hcl:"local_bind_port,optional"`
Datacenter string `mapstructure:"datacenter" hcl:"datacenter,optional"`
Expand All @@ -239,19 +240,11 @@ func (cu *ConsulUpstream) Copy() *ConsulUpstream {
if cu == nil {
return nil
}
return &ConsulUpstream{
DestinationName: cu.DestinationName,
DestinationNamespace: cu.DestinationNamespace,
DestinationPeer: cu.DestinationPeer,
DestinationType: cu.DestinationType,
LocalBindPort: cu.LocalBindPort,
Datacenter: cu.Datacenter,
LocalBindAddress: cu.LocalBindAddress,
LocalBindSocketPath: cu.LocalBindSocketPath,
LocalBindSocketMode: cu.LocalBindSocketMode,
MeshGateway: cu.MeshGateway.Copy(),
Config: maps.Clone(cu.Config),
}
up := new(ConsulUpstream)
*up = *cu
up.MeshGateway = cu.MeshGateway.Copy()
up.Config = maps.Clone(cu.Config)
return up
}

func (cu *ConsulUpstream) Canonicalize() {
Expand Down
1 change: 1 addition & 0 deletions api/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func TestConsulUpstream_Copy(t *testing.T) {
DestinationName: "dest1",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
Datacenter: "dc2",
LocalBindPort: 2000,
Expand Down
1 change: 1 addition & 0 deletions command/agent/consul/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func connectUpstreams(in []structs.ConsulUpstream) []api.Upstream {
DestinationName: upstream.DestinationName,
DestinationNamespace: upstream.DestinationNamespace,
DestinationType: api.UpstreamDestType(upstream.DestinationType),
DestinationPartition: upstream.DestinationPartition,
DestinationPeer: upstream.DestinationPeer,
LocalBindPort: upstream.LocalBindPort,
LocalBindSocketPath: upstream.LocalBindSocketPath,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/consul/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func TestConnect_connectUpstreams(t *testing.T) {
}, {
DestinationName: "bar",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
DestinationNamespace: "ns2",
LocalBindPort: 9000,
Expand All @@ -391,6 +392,7 @@ func TestConnect_connectUpstreams(t *testing.T) {
DestinationName: "bar",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 9000,
LocalBindSocketPath: "/var/run/testsocket.sock",
Expand Down
2 changes: 2 additions & 0 deletions command/agent/consul/service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ func proxyUpstreamsDifferent(wanted *api.AgentServiceConnect, sidecar *api.Agent
return true
case A.DestinationPeer != B.DestinationPeer:
return true
case A.DestinationPartition != B.DestinationPartition:
return true
case A.DestinationType != B.DestinationType:
return true
case A.LocalBindSocketPath != B.LocalBindSocketPath:
Expand Down
9 changes: 9 additions & 0 deletions command/agent/consul/service_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,15 @@ func TestSyncLogic_proxyUpstreamsDifferent(t *testing.T) {
}
})

try(t, "different destination partition", func(p proxy) {
diff := upstream1()
diff.DestinationPartition = "foo"
p.Upstreams = []api.Upstream{
diff,
upstream2(),
}
})

try(t, "different destination type", func(p proxy) {
diff := upstream1()
diff.DestinationType = "service"
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,7 @@ func apiUpstreamsToStructs(in []*api.ConsulUpstream) []structs.ConsulUpstream {
DestinationName: upstream.DestinationName,
DestinationNamespace: upstream.DestinationNamespace,
DestinationPeer: upstream.DestinationPeer,
DestinationPartition: upstream.DestinationPartition,
DestinationType: upstream.DestinationType,
LocalBindPort: upstream.LocalBindPort,
LocalBindSocketPath: upstream.LocalBindSocketPath,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4027,6 +4027,7 @@ func TestConversion_apiUpstreamsToStructs(t *testing.T) {
DestinationName: "upstream",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 8000,
LocalBindSocketPath: "/var/run/testsocket.sock",
Expand All @@ -4038,6 +4039,7 @@ func TestConversion_apiUpstreamsToStructs(t *testing.T) {
DestinationName: "upstream",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 8000,
LocalBindSocketPath: "/var/run/testsocket.sock",
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,7 @@ func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) {
valid := []string{
"destination_name",
"destination_peer",
"destination_partition",
"destination_type",
"local_bind_port",
"local_bind_address",
Expand Down
17 changes: 9 additions & 8 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,14 +1210,15 @@ func TestParse(t *testing.T) {
LocalServicePort: 8080,
Upstreams: []*api.ConsulUpstream{
{
DestinationName: "other-service",
DestinationPeer: "10.0.0.1:6379",
DestinationType: "tcp",
LocalBindPort: 4567,
LocalBindAddress: "0.0.0.0",
LocalBindSocketPath: "/var/run/testsocket.sock",
LocalBindSocketMode: "0666",
Datacenter: "dc1",
DestinationName: "other-service",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 4567,
LocalBindAddress: "0.0.0.0",
LocalBindSocketPath: "/var/run/testsocket.sock",
LocalBindSocketMode: "0666",
Datacenter: "dc1",

MeshGateway: &api.ConsulMeshGateway{
Mode: "local",
Expand Down
1 change: 1 addition & 0 deletions jobspec/test-fixtures/tg-network.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ job "foo" {
upstreams {
destination_name = "other-service"
destination_peer = "10.0.0.1:6379"
destination_partition = "infra"
destination_type = "tcp"
local_bind_port = 4567
local_bind_address = "0.0.0.0"
Expand Down
30 changes: 22 additions & 8 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3830,6 +3830,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "ns2",
},
{
Type: DiffTypeNone,
Name: "DestinationPartition",
Old: "",
New: "",
},
{
Type: DiffTypeNone,
Name: "DestinationPeer",
Expand Down Expand Up @@ -9940,14 +9946,15 @@ func TestServicesDiff(t *testing.T) {
LocalServicePort: 8080,
Upstreams: []ConsulUpstream{
{
DestinationName: "count-api",
LocalBindPort: 8080,
Datacenter: "dc2",
LocalBindAddress: "127.0.0.1",
LocalBindSocketMode: "0700",
LocalBindSocketPath: "/tmp/redis_5678.sock",
DestinationPeer: "cloud-services",
DestinationType: "service",
DestinationName: "count-api",
LocalBindPort: 8080,
Datacenter: "dc2",
LocalBindAddress: "127.0.0.1",
LocalBindSocketMode: "0700",
LocalBindSocketPath: "/tmp/redis_5678.sock",
DestinationPeer: "cloud-services",
DestinationPartition: "infra",
DestinationType: "service",
MeshGateway: ConsulMeshGateway{
Mode: "remote",
},
Expand Down Expand Up @@ -9979,6 +9986,13 @@ func TestServicesDiff(t *testing.T) {
Type: DiffTypeEdited,
Name: "ConsulUpstreams",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "DestinationPartition",
Old: "",
New: "infra",
Annotations: nil,
},
{
Type: DiffTypeAdded,
Name: "DestinationPeer",
Expand Down
6 changes: 6 additions & 0 deletions nomad/structs/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ func hashConnect(h hash.Hash, connect *ConsulConnect) {
hashStringIfNonEmpty(h, upstream.Datacenter)
hashStringIfNonEmpty(h, upstream.LocalBindAddress)
hashString(h, upstream.DestinationPeer)
hashString(h, upstream.DestinationPartition)
hashString(h, upstream.DestinationType)
hashString(h, upstream.LocalBindSocketPath)
hashString(h, upstream.LocalBindSocketMode)
Expand Down Expand Up @@ -1608,6 +1609,9 @@ type ConsulUpstream struct {
// DestinationNamespace is the namespace of the upstream service.
DestinationNamespace string

// DestinationNamespace is the admin partition of the upstream service.
DestinationPartition string

// DestinationPeer the destination service address
DestinationPeer string

Expand Down Expand Up @@ -1654,6 +1658,8 @@ func (u *ConsulUpstream) Equal(o *ConsulUpstream) bool {
return false
case u.DestinationPeer != o.DestinationPeer:
return false
case u.DestinationPartition != o.DestinationPartition:
return false
case u.DestinationType != o.DestinationType:
return false
case u.LocalBindPort != o.LocalBindPort:
Expand Down
34 changes: 23 additions & 11 deletions nomad/structs/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ func TestServiceCheck_validate_FailingTypes(t *testing.T) {

t.Run("invalid", func(t *testing.T) {
err := (&ServiceCheck{
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
SuccessBeforePassing: 0,
FailuresBeforeWarning: 3,
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
SuccessBeforePassing: 0,
FailuresBeforeWarning: 3,
}).validateConsul()
require.EqualError(t, err, `failures_before_warning not supported for check of type "script"`)
})
Expand Down Expand Up @@ -298,10 +298,10 @@ func TestServiceCheck_validateNomad(t *testing.T) {
{
name: "failures_before_warning",
sc: &ServiceCheck{
Type: ServiceCheckTCP,
FailuresBeforeWarning: 3, // consul only
Interval: 3 * time.Second,
Timeout: 1 * time.Second,
Type: ServiceCheckTCP,
FailuresBeforeWarning: 3, // consul only
Interval: 3 * time.Second,
Timeout: 1 * time.Second,
},
exp: `failures_before_warning may only be set for Consul service checks`,
},
Expand Down Expand Up @@ -786,6 +786,16 @@ func TestConsulUpstream_upstreamEqual(t *testing.T) {
must.False(t, upstreamsEquals(a, b))
})

t.Run("different dest partition", func(t *testing.T) {
a := []ConsulUpstream{up("foo", 8000)}
a[0].DestinationPeer = "infra"

b := []ConsulUpstream{up("foo", 8000)}
b[0].DestinationPeer = "dev"

must.False(t, upstreamsEquals(a, b))
})

t.Run("different dest type", func(t *testing.T) {
a := []ConsulUpstream{up("foo", 8000)}
a[0].DestinationType = "tcp"
Expand Down Expand Up @@ -832,10 +842,12 @@ func TestConsulUpstream_upstreamEqual(t *testing.T) {
a := []ConsulUpstream{up("foo", 8000), up("bar", 9000)}
b := []ConsulUpstream{up("foo", 8000), up("bar", 9000)}
a[0].DestinationPeer = "10.0.0.1:6379"
a[0].DestinationPartition = "infra"
a[0].DestinationType = "tcp"
a[0].LocalBindSocketPath = "/var/run/mysocket.sock"
a[0].LocalBindSocketMode = "0666"
b[0].DestinationPeer = "10.0.0.1:6379"
b[0].DestinationPartition = "infra"
b[0].DestinationType = "tcp"
b[0].LocalBindSocketPath = "/var/run/mysocket.sock"
b[0].LocalBindSocketMode = "0666"
Expand Down
1 change: 1 addition & 0 deletions website/content/docs/job-specification/upstreams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ job "countdash" {
for details. Keys and values support [runtime variable interpolation][interpolation].
- `destination_name` `(string: <required>)` - Name of the upstream service.
- `destination_namespace` `(string: <required>)` - Name of the upstream Consul namespace.
- `destination_partition` `(string: "")` - Name of the Cluster admin partition containing the upstream service.
- `destination_peer` `(string: "")` - Name of the peer cluster containing the upstream service.
- `destination_type` - `(string: "service")` - The type of discovery query the proxy should use for finding service mesh instances.
- `local_bind_port` - `(int: <required>)` - The port the proxy will receive
Expand Down

0 comments on commit bdf3ff3

Please sign in to comment.