Skip to content

Commit

Permalink
networking: add ignore_collision for static port{} (#23956)
Browse files Browse the repository at this point in the history
so more than one copy of a program can run
at a time on the same port with SO_REUSEPORT.

requires host network mode.

some task drivers (like docker) may also need
config {
  network_mode = "host"
}
but this is not validated prior to placement.
  • Loading branch information
gulducat authored Sep 17, 2024
1 parent 603a747 commit ec81e7c
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 43 deletions.
3 changes: 3 additions & 0 deletions .changelog/23956.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
networking: Added an option to ignore static port collisions when scheduling, for programs that use the SO_REUSEPORT unix socket option
```
6 changes: 3 additions & 3 deletions api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCompose(t *testing.T) {
{
CIDR: "0.0.0.0/0",
MBits: pointerOf(100),
ReservedPorts: []Port{{"", 80, 0, ""}, {"", 443, 0, ""}},
ReservedPorts: []Port{{Label: "", Value: 80}, {Label: "", Value: 443}},
},
},
})
Expand Down Expand Up @@ -116,8 +116,8 @@ func TestCompose(t *testing.T) {
CIDR: "0.0.0.0/0",
MBits: pointerOf(100),
ReservedPorts: []Port{
{"", 80, 0, ""},
{"", 443, 0, ""},
{Label: "", Value: 80},
{Label: "", Value: 443},
},
},
},
Expand Down
9 changes: 5 additions & 4 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ func (n *NUMAResource) Canonicalize() {
}

type Port struct {
Label string `hcl:",label"`
Value int `hcl:"static,optional"`
To int `hcl:"to,optional"`
HostNetwork string `hcl:"host_network,optional"`
Label string `hcl:",label"`
Value int `hcl:"static,optional"`
To int `hcl:"to,optional"`
HostNetwork string `hcl:"host_network,optional"`
IgnoreCollision bool `hcl:"ignore_collision,optional"`
}

type DNSConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestTask_Require(t *testing.T) {
{
CIDR: "0.0.0.0/0",
MBits: pointerOf(100),
ReservedPorts: []Port{{"", 80, 0, ""}, {"", 443, 0, ""}},
ReservedPorts: []Port{{Label: "", Value: 80}, {Label: "", Value: 443}},
},
},
}
Expand Down
9 changes: 5 additions & 4 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,10 +1576,11 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe

func ApiPortToStructs(in api.Port) structs.Port {
return structs.Port{
Label: in.Label,
Value: in.Value,
To: in.To,
HostNetwork: in.HostNetwork,
Label: in.Label,
Value: in.Value,
To: in.To,
HostNetwork: in.HostNetwork,
IgnoreCollision: in.IgnoreCollision,
}
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2812,7 +2812,7 @@ func portDiffs(old, new []Port, dynamic bool, contextual bool) []*ObjectDiff {
filter := []string{"_struct"}
name := "Static Port"
if dynamic {
filter = []string{"_struct", "Value"}
filter = []string{"_struct", "Value", "IgnoreCollision"}
name = "Dynamic Port"
}

Expand Down
18 changes: 18 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4614,6 +4614,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "IgnoreCollision",
Old: "false",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Label",
Expand Down Expand Up @@ -6486,6 +6492,12 @@ func TestTaskDiff(t *testing.T) {
Type: DiffTypeAdded,
Name: "Static Port",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "IgnoreCollision",
Old: "",
New: "false",
},
{
Type: DiffTypeAdded,
Name: "Label",
Expand Down Expand Up @@ -6534,6 +6546,12 @@ func TestTaskDiff(t *testing.T) {
Type: DiffTypeDeleted,
Name: "Static Port",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "IgnoreCollision",
Old: "false",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Label",
Expand Down
29 changes: 17 additions & 12 deletions nomad/structs/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,16 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool, reasons

func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, reasons []string) {
for _, port := range ports {
used := idx.getUsedPortsFor(port.HostIP)
if port.Value < 0 || port.Value >= MaxValidPort {
return true, []string{fmt.Sprintf("invalid port %d", port.Value)}
}
used := idx.getUsedPortsFor(port.HostIP)
if used.Check(uint(port.Value)) {
collide = true
reason := fmt.Sprintf("port %d already in use", port.Value)
reasons = append(reasons, reason)
if !port.IgnoreCollision {
collide = true
reason := fmt.Sprintf("port %d already in use", port.Value)
reasons = append(reasons, reason)
}
} else {
used.Set(uint(port.Value))
}
Expand Down Expand Up @@ -518,23 +520,26 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
var allocPort *AllocatedPortMapping
var addrErr error
for _, addr := range idx.HostNetworks[port.HostNetwork] {
used := idx.getUsedPortsFor(addr.Address)
// Guard against invalid port
if port.Value < 0 || port.Value >= MaxValidPort {
return nil, fmt.Errorf("invalid port %d (out of range)", port.Value)
}

// Check if in use
if used != nil && used.Check(uint(port.Value)) {
addrErr = fmt.Errorf("reserved port collision %s=%d", port.Label, port.Value)
continue
if !port.IgnoreCollision {
used := idx.getUsedPortsFor(addr.Address)
if used != nil && used.Check(uint(port.Value)) {
addrErr = fmt.Errorf("reserved port collision %s=%d", port.Label, port.Value)
continue
}
}

allocPort = &AllocatedPortMapping{
Label: port.Label,
Value: port.Value,
To: port.To,
HostIP: addr.Address,
Label: port.Label,
Value: port.Value,
To: port.To,
HostIP: addr.Address,
IgnoreCollision: port.IgnoreCollision,
}
break
}
Expand Down
76 changes: 76 additions & 0 deletions nomad/structs/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,82 @@ func TestNetworkIndex_AssignPorts_TwoIp(t *testing.T) {

}

// TestNetworkIndex_IgnorePortCollision tests Port.IgnoreCollision.
func TestNetworkIndex_IgnorePortCollision(t *testing.T) {
ci.Parallel(t)

// set up some fake resources

ip := "127.3.2.1"
net := "test-ignore-port-collision"
n := &Node{
NodeResources: &NodeResources{
NodeNetworks: []*NodeNetworkResource{{
Addresses: []NodeNetworkAddress{{
Alias: net,
Address: ip,
}},
}},
},
}

getPortMappings := func(collideOK bool) []AllocatedPortMapping {
return []AllocatedPortMapping{{
HostIP: ip,
Label: "test-port",
Value: 10,
To: 10,
IgnoreCollision: collideOK,
}}
}
getPorts := func(collideOK bool) []Port {
return []Port{{
HostNetwork: net,
Label: "test-port",
Value: 10,
To: 10,
IgnoreCollision: collideOK,
}}
}
collidingPortMappings := getPortMappings(false)
nonCollidingPortMappings := getPortMappings(true)
collidingPorts := getPorts(false)
nonCollidingPorts := getPorts(true)

// now we can get started

idx := NewNetworkIndex()
idx.SetNode(n)

// initial reservation - pretend some other job has already used the port
// note the behavior below is the same whether this one is a collider or not
collide, reasons := idx.AddReservedPorts(collidingPortMappings)
must.False(t, collide, must.Sprint("expect no collision in first reservation"))
must.Len(t, 0, reasons, must.Sprint("expect no reasons in first reservation"))

t.Run("AddReservedPorts", func(t *testing.T) {
collide, reasons = idx.AddReservedPorts(collidingPortMappings)
must.True(t, collide, must.Sprint("expect collision"))
must.Eq(t, []string{"port 10 already in use"}, reasons, must.Sprint("expect collision reasons"))

collide, reasons = idx.AddReservedPorts(nonCollidingPortMappings)
must.False(t, collide, must.Sprint("expect no collision"))
must.Len(t, 0, reasons, must.Sprint("expect no collision reasons"))
})

t.Run("AssignPorts", func(t *testing.T) {
ask := &NetworkResource{ReservedPorts: collidingPorts}
allocated, err := idx.AssignPorts(ask)
must.ErrorContains(t, err, "reserved port collision test-port=10")
must.Nil(t, allocated, must.Sprint("expect no ports on AssignPorts error"))

ask = &NetworkResource{ReservedPorts: nonCollidingPorts}
allocated, err = idx.AssignPorts(ask)
must.NoError(t, err)
must.Eq(t, nonCollidingPortMappings, allocated)
})
}

func TestNetworkIndex_AssignTaskNetwork(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()
Expand Down
45 changes: 31 additions & 14 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2805,18 +2805,20 @@ type AllocatedPortMapping struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck

Label string
Value int
To int
HostIP string
Label string
Value int
To int
HostIP string
IgnoreCollision bool
}

func (m *AllocatedPortMapping) Copy() *AllocatedPortMapping {
return &AllocatedPortMapping{
Label: m.Label,
Value: m.Value,
To: m.To,
HostIP: m.HostIP,
Label: m.Label,
Value: m.Value,
To: m.To,
HostIP: m.HostIP,
IgnoreCollision: m.IgnoreCollision,
}
}

Expand All @@ -2833,6 +2835,8 @@ func (m *AllocatedPortMapping) Equal(o *AllocatedPortMapping) bool {
return false
case m.HostIP != o.HostIP:
return false
case m.IgnoreCollision != o.IgnoreCollision:
return false
}
return true
}
Expand Down Expand Up @@ -2875,6 +2879,11 @@ type Port struct {
// to. Jobs with a HostNetwork set can only be placed on nodes with
// that host network available.
HostNetwork string

// IgnoreCollision ignores port collisions, so the port can be used more
// than one time on a single network, for tasks that support SO_REUSEPORT
// Should be used only with static ports.
IgnoreCollision bool
}

type DNSConfig struct {
Expand Down Expand Up @@ -3044,10 +3053,11 @@ func (ns Networks) Port(label string) AllocatedPortMapping {
for _, p := range n.ReservedPorts {
if p.Label == label {
return AllocatedPortMapping{
Label: label,
Value: p.Value,
To: p.To,
HostIP: n.IP,
Label: label,
Value: p.Value,
To: p.To,
HostIP: n.IP,
IgnoreCollision: p.IgnoreCollision,
}
}
}
Expand Down Expand Up @@ -7267,8 +7277,10 @@ func (tg *TaskGroup) validateNetworks() error {
}
// static port
if other, ok := staticPorts[port.Value]; ok {
err := fmt.Errorf("Static port %d already reserved by %s", port.Value, other)
mErr.Errors = append(mErr.Errors, err)
if !port.IgnoreCollision {
err := fmt.Errorf("Static port %d already reserved by %s", port.Value, other)
mErr.Errors = append(mErr.Errors, err)
}
} else if port.Value > math.MaxUint16 {
err := fmt.Errorf("Port %s (%d) cannot be greater than %d", port.Label, port.Value, math.MaxUint16)
mErr.Errors = append(mErr.Errors, err)
Expand All @@ -7285,6 +7297,11 @@ func (tg *TaskGroup) validateNetworks() error {
err := fmt.Errorf("Port %q cannot be mapped to a port (%d) greater than %d", port.Label, port.To, math.MaxUint16)
mErr.Errors = append(mErr.Errors, err)
}

if port.IgnoreCollision && !(net.Mode == "" || net.Mode == "host") {
err := fmt.Errorf("Port %q collision may not be ignored on non-host network mode %q", port.Label, net.Mode)
mErr.Errors = append(mErr.Errors, err)
}
}
// Validate the cniArgs in each network resource. Make sure there are no duplicate Args in
// different network resources or invalid characters (;) in key or value ;)
Expand Down
25 changes: 25 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,31 @@ func TestTaskGroupNetwork_Validate(t *testing.T) {
},
ErrContains: "invalid ';' character in CNI arg value \"first_value;",
},
{
TG: &TaskGroup{
Name: "testing-port-ignore-collision-ok",
Networks: []*NetworkResource{{
Mode: "host",
ReservedPorts: []Port{
{Label: "one", Value: 10, IgnoreCollision: true},
{Label: "two", Value: 10, IgnoreCollision: true},
},
}},
},
},
{
TG: &TaskGroup{
Name: "testing-port-ignore-collision-non-host-network-mode",
Networks: []*NetworkResource{{
Mode: "not-host",
ReservedPorts: []Port{
{Label: "one", Value: 10, IgnoreCollision: true},
{Label: "two", Value: 10, IgnoreCollision: true},
},
}},
},
ErrContains: "collision may not be ignored on non-host network mode",
},
}

for i := range cases {
Expand Down
Loading

0 comments on commit ec81e7c

Please sign in to comment.