Skip to content

Commit 0156a6b

Browse files
committed
Merge branch 'master' into mh/discovery
2 parents 3a43ba1 + 0b54a8e commit 0156a6b

File tree

5 files changed

+134
-25
lines changed

5 files changed

+134
-25
lines changed

CHANGELOG_PENDING.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,9 @@
1616

1717
### Bug Fixes 🐞
1818

19+
#### General
20+
21+
* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
22+
* [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode)
23+
1924
#### CLI

ai/worker/docker.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const containerPort = "8000/tcp"
3434
const pollingInterval = 500 * time.Millisecond
3535
const externalContainerTimeout = 2 * time.Minute
3636
const optFlagsContainerTimeout = 5 * time.Minute
37+
const containerStopTimeout = 8 * time.Second
3738
const containerRemoveTimeout = 30 * time.Second
3839
const containerCreatorLabel = "creator"
3940
const containerCreator = "ai-worker"
@@ -411,12 +412,9 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo
411412
}
412413

413414
restartPolicy := container.RestartPolicy{
414-
Name: "on-failure",
415+
Name: container.RestartPolicyOnFailure,
415416
MaximumRetryCount: 3,
416417
}
417-
if keepWarm {
418-
restartPolicy = container.RestartPolicy{Name: "always"}
419-
}
420418

421419
hostConfig := &container.HostConfig{
422420
Resources: container.Resources{
@@ -736,7 +734,8 @@ func dockerRemoveContainer(client DockerClient, containerID string) error {
736734
ctx, cancel := context.WithTimeout(context.Background(), containerRemoveTimeout)
737735
defer cancel()
738736

739-
err := client.ContainerStop(ctx, containerID, container.StopOptions{})
737+
timeoutSec := int(containerStopTimeout.Seconds())
738+
err := client.ContainerStop(ctx, containerID, container.StopOptions{Timeout: &timeoutSec})
740739
// Ignore "not found" or "already stopped" errors
741740
if err != nil && !docker.IsErrNotFound(err) && !errdefs.IsNotModified(err) {
742741
return err
@@ -780,9 +779,26 @@ tickerLoop:
780779
return err
781780
}
782781

782+
// If the container is running, we're done.
783783
if json.State.Running {
784784
break tickerLoop
785785
}
786+
787+
// Fail fast on states that won't become running after startup.
788+
if json.State != nil {
789+
status := strings.ToLower(json.State.Status)
790+
// Consider exited/dead as terminal. "removing" will surface via
791+
// inspect error or transition to exited/dead shortly.
792+
if status == "exited" || status == "dead" {
793+
return fmt.Errorf("container entered terminal state before running: %s (exitCode=%d)", json.State.Status, json.State.ExitCode)
794+
}
795+
if !json.State.Restarting && json.State.ExitCode != 0 {
796+
return fmt.Errorf("container exited before running (status=%s, exitCode=%d)", json.State.Status, json.State.ExitCode)
797+
}
798+
if !json.State.Restarting && json.State.Error != "" {
799+
return fmt.Errorf("container error before running: %s", json.State.Error)
800+
}
801+
}
786802
}
787803
}
788804

ai/worker/docker_test.go

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"github.com/stretchr/testify/require"
2121
)
2222

23+
var stopTimeout = 8
24+
var expectedContainerStopOptions = container.StopOptions{Timeout: &stopTimeout}
25+
2326
type MockDockerClient struct {
2427
mock.Mock
2528
}
@@ -136,15 +139,15 @@ func TestNewDockerManager(t *testing.T) {
136139
{ID: "container2", Names: []string{"/container2"}, Labels: map[string]string{containerCreatorLabel: containerCreator}},
137140
}
138141
mockDockerClient.On("ContainerList", mock.Anything, mock.Anything).Return(existingContainers, nil)
139-
mockDockerClient.On("ContainerStop", mock.Anything, "container1", mock.Anything).Return(nil)
140-
mockDockerClient.On("ContainerStop", mock.Anything, "container2", mock.Anything).Return(nil)
142+
mockDockerClient.On("ContainerStop", mock.Anything, "container1", expectedContainerStopOptions).Return(nil)
143+
mockDockerClient.On("ContainerStop", mock.Anything, "container2", expectedContainerStopOptions).Return(nil)
141144
mockDockerClient.On("ContainerRemove", mock.Anything, "container1", mock.Anything).Return(nil)
142145
mockDockerClient.On("ContainerRemove", mock.Anything, "container2", mock.Anything).Return(nil)
143146

144147
// Verify that existing containers were stopped and removed.
145148
createAndVerifyManager()
146-
mockDockerClient.AssertCalled(t, "ContainerStop", mock.Anything, "container1", mock.Anything)
147-
mockDockerClient.AssertCalled(t, "ContainerStop", mock.Anything, "container2", mock.Anything)
149+
mockDockerClient.AssertCalled(t, "ContainerStop", mock.Anything, "container1", expectedContainerStopOptions)
150+
mockDockerClient.AssertCalled(t, "ContainerStop", mock.Anything, "container2", expectedContainerStopOptions)
148151
mockDockerClient.AssertCalled(t, "ContainerRemove", mock.Anything, "container1", mock.Anything)
149152
mockDockerClient.AssertCalled(t, "ContainerRemove", mock.Anything, "container2", mock.Anything)
150153
mockDockerClient.AssertExpectations(t)
@@ -236,7 +239,7 @@ func TestDockerManager_Stop(t *testing.T) {
236239
},
237240
}
238241

239-
MockDockerClient.On("ContainerStop", mock.Anything, containerID, container.StopOptions{Timeout: nil}).Return(nil)
242+
MockDockerClient.On("ContainerStop", mock.Anything, containerID, expectedContainerStopOptions).Return(nil)
240243
MockDockerClient.On("ContainerRemove", mock.Anything, containerID, container.RemoveOptions{}).Return(nil)
241244
err := dockerManager.Stop(ctx)
242245
require.NoError(t, err)
@@ -650,7 +653,7 @@ func TestDockerManager_allocGPU(t *testing.T) {
650653
dockerManager.gpuContainers[rc.GPU] = rc
651654
dockerManager.containers[rc.Name] = rc
652655
// Mock client methods to simulate the removal of the warm container.
653-
mockDockerClient.On("ContainerStop", mock.Anything, "container1", container.StopOptions{}).Return(nil)
656+
mockDockerClient.On("ContainerStop", mock.Anything, "container1", expectedContainerStopOptions).Return(nil)
654657
mockDockerClient.On("ContainerRemove", mock.Anything, "container1", container.RemoveOptions{}).Return(nil)
655658
},
656659
expectedAllocatedGPU: "gpu0",
@@ -694,7 +697,7 @@ func TestDockerManager_destroyContainer(t *testing.T) {
694697
dockerManager.gpuContainers[gpu] = rc
695698
dockerManager.containers[containerID] = rc
696699

697-
mockDockerClient.On("ContainerStop", mock.Anything, containerID, container.StopOptions{}).Return(nil)
700+
mockDockerClient.On("ContainerStop", mock.Anything, containerID, expectedContainerStopOptions).Return(nil)
698701
mockDockerClient.On("ContainerRemove", mock.Anything, containerID, container.RemoveOptions{}).Return(nil)
699702

700703
err := dockerManager.destroyContainer(rc, true)
@@ -844,7 +847,7 @@ func TestDockerManager_watchContainer(t *testing.T) {
844847
tt.mockServerSetup(mockServer)
845848

846849
// Mock destroyContainer to verify it is called.
847-
mockDockerClient.On("ContainerStop", mock.Anything, rc.Name, mock.Anything).Return(nil).Once()
850+
mockDockerClient.On("ContainerStop", mock.Anything, rc.Name, expectedContainerStopOptions).Return(nil).Once()
848851
mockDockerClient.On("ContainerRemove", mock.Anything, rc.Name, mock.Anything).Return(nil).Once()
849852

850853
done := make(chan struct{})
@@ -906,7 +909,7 @@ func TestDockerManager_watchContainer(t *testing.T) {
906909
mockDockerClient.AssertNotCalled(t, "ContainerRemove", mock.Anything, rc.Name, mock.Anything)
907910

908911
// Mock destroyContainer to verify it is called.
909-
mockDockerClient.On("ContainerStop", mock.Anything, rc.Name, mock.Anything).Return(nil).Once()
912+
mockDockerClient.On("ContainerStop", mock.Anything, rc.Name, expectedContainerStopOptions).Return(nil).Once()
910913
mockDockerClient.On("ContainerRemove", mock.Anything, rc.Name, mock.Anything).Return(nil).Once()
911914

912915
// after the first failure, there should only 1 more healthcheck for the container to be stopped
@@ -993,8 +996,8 @@ func TestRemoveExistingContainers(t *testing.T) {
993996
{ID: "container2", Names: []string{"/container2"}, Labels: map[string]string{containerCreatorLabel: containerCreator}},
994997
}
995998
mockDockerClient.On("ContainerList", mock.Anything, mock.Anything).Return(existingContainers, nil)
996-
mockDockerClient.On("ContainerStop", mock.Anything, "container1", mock.Anything).Return(nil)
997-
mockDockerClient.On("ContainerStop", mock.Anything, "container2", mock.Anything).Return(nil)
999+
mockDockerClient.On("ContainerStop", mock.Anything, "container1", expectedContainerStopOptions).Return(nil)
1000+
mockDockerClient.On("ContainerStop", mock.Anything, "container2", expectedContainerStopOptions).Return(nil)
9981001
mockDockerClient.On("ContainerRemove", mock.Anything, "container1", mock.Anything).Return(nil)
9991002
mockDockerClient.On("ContainerRemove", mock.Anything, "container2", mock.Anything).Return(nil)
10001003

@@ -1036,9 +1039,9 @@ func TestRemoveExistingContainers_InMemoryFilterLegacyAndOwnerID(t *testing.T) {
10361039
{ID: "mine-1", Names: []string{"/mine-1"}, Labels: map[string]string{containerCreatorLabel: containerCreator, containerCreatorIDLabel: "owner-A"}}, // match -> remove
10371040
}, nil).
10381041
Once()
1039-
mockDockerClient.On("ContainerStop", mock.Anything, "legacy-1", mock.Anything).Return(nil).Once()
1042+
mockDockerClient.On("ContainerStop", mock.Anything, "legacy-1", expectedContainerStopOptions).Return(nil).Once()
10401043
mockDockerClient.On("ContainerRemove", mock.Anything, "legacy-1", mock.Anything).Return(nil).Once()
1041-
mockDockerClient.On("ContainerStop", mock.Anything, "mine-1", mock.Anything).Return(nil).Once()
1044+
mockDockerClient.On("ContainerStop", mock.Anything, "mine-1", expectedContainerStopOptions).Return(nil).Once()
10421045
mockDockerClient.On("ContainerRemove", mock.Anything, "mine-1", mock.Anything).Return(nil).Once()
10431046

10441047
removed, err := RemoveExistingContainers(ctx, mockDockerClient, "owner-A")
@@ -1095,7 +1098,7 @@ func TestDockerContainerName(t *testing.T) {
10951098
func TestDockerRemoveContainer(t *testing.T) {
10961099
mockDockerClient := new(MockDockerClient)
10971100

1098-
mockDockerClient.On("ContainerStop", mock.Anything, "container1", container.StopOptions{}).Return(nil)
1101+
mockDockerClient.On("ContainerStop", mock.Anything, "container1", expectedContainerStopOptions).Return(nil)
10991102
mockDockerClient.On("ContainerRemove", mock.Anything, "container1", container.RemoveOptions{}).Return(nil)
11001103

11011104
err := dockerRemoveContainer(mockDockerClient, "container1")
@@ -1165,6 +1168,63 @@ func TestDockerWaitUntilRunning(t *testing.T) {
11651168
require.Contains(t, err.Error(), "timed out waiting for managed container")
11661169
mockDockerClient.AssertExpectations(t)
11671170
})
1171+
1172+
t.Run("FailFastOnExited", func(t *testing.T) {
1173+
// If the container is immediately exited, we should fail fast instead of waiting.
1174+
mockDockerClient := new(MockDockerClient)
1175+
// Always return non-running, exited state
1176+
mockDockerClient.On("ContainerInspect", mock.Anything, containerID).Return(types.ContainerJSON{
1177+
ContainerJSONBase: &types.ContainerJSONBase{
1178+
State: &types.ContainerState{
1179+
Status: "exited",
1180+
Running: false,
1181+
ExitCode: 137,
1182+
},
1183+
},
1184+
}, nil)
1185+
1186+
err := dockerWaitUntilRunning(ctx, mockDockerClient, containerID, pollingInterval)
1187+
require.Error(t, err)
1188+
require.Contains(t, err.Error(), "terminal state")
1189+
mockDockerClient.AssertExpectations(t)
1190+
})
1191+
1192+
t.Run("FailFastOnDead", func(t *testing.T) {
1193+
mockDockerClient := new(MockDockerClient)
1194+
mockDockerClient.On("ContainerInspect", mock.Anything, containerID).Return(types.ContainerJSON{
1195+
ContainerJSONBase: &types.ContainerJSONBase{
1196+
State: &types.ContainerState{
1197+
Status: "dead",
1198+
Running: false,
1199+
Error: "killed",
1200+
},
1201+
},
1202+
}, nil)
1203+
1204+
err := dockerWaitUntilRunning(ctx, mockDockerClient, containerID, pollingInterval)
1205+
require.Error(t, err)
1206+
require.Contains(t, err.Error(), "container entered terminal state")
1207+
mockDockerClient.AssertExpectations(t)
1208+
})
1209+
1210+
t.Run("FailFastOnExitCodeNonZeroWithoutRestarting", func(t *testing.T) {
1211+
mockDockerClient := new(MockDockerClient)
1212+
mockDockerClient.On("ContainerInspect", mock.Anything, containerID).Return(types.ContainerJSON{
1213+
ContainerJSONBase: &types.ContainerJSONBase{
1214+
State: &types.ContainerState{
1215+
Status: "created",
1216+
Running: false,
1217+
Restarting: false,
1218+
ExitCode: 1,
1219+
},
1220+
},
1221+
}, nil)
1222+
1223+
err := dockerWaitUntilRunning(ctx, mockDockerClient, containerID, pollingInterval)
1224+
require.Error(t, err)
1225+
require.Contains(t, err.Error(), "exited before running")
1226+
mockDockerClient.AssertExpectations(t)
1227+
})
11681228
}
11691229

11701230
func TestHwGPU(t *testing.T) {

box/box.sh

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,25 @@
11
#!/bin/bash
22
set -e
33

4+
DOCKER=${DOCKER:-false}
45
FRONTEND=${FRONTEND:-false}
56

7+
if [ "$FRONTEND" = "true" && "$DOCKER" = "false" ]; then
8+
echo "Running the box with FRONTEND=true requires DOCKER=true"
9+
exit 1
10+
fi
11+
12+
# Ensure backgrounded services are stopped gracefully when this script is interrupted or exits.
13+
cleanup() {
14+
if [ "$DOCKER" = "true" ]; then
15+
echo "Stopping dockerized services..."
16+
docker stop orchestrator --time 15 >/dev/null 2>&1 || true
17+
docker stop gateway --time 15 >/dev/null 2>&1 || true
18+
docker stop mediamtx --time 15 >/dev/null 2>&1 || true
19+
fi
20+
}
21+
trap cleanup INT TERM HUP EXIT
22+
623
# Start multiple processes and output their logs to the console
724
gateway() {
825
echo "Starting Gateway..."
@@ -34,9 +51,9 @@ gateway &
3451
orchestrator &
3552
mediamtx &
3653

37-
if [ "$DOCKER" = "true" ]; then
54+
if [ "$FRONTEND" = "true" ]; then
3855
supabase &
39-
mediamtx &
56+
frontend &
4057
fi
4158

4259
# Wait for all background processes to finish

cmd/livepeer/livepeer.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"os/signal"
1212
"runtime"
13+
"syscall"
1314
"time"
1415

1516
"github.com/livepeer/go-livepeer/cmd/livepeer/starter"
@@ -20,6 +21,8 @@ import (
2021
"github.com/livepeer/go-livepeer/core"
2122
)
2223

24+
const shutdownTimeout = 10 * time.Second
25+
2326
func main() {
2427
// Override the default flag set since there are dependencies that
2528
// incorrectly add their own flags (specifically, due to the 'testing'
@@ -76,17 +79,25 @@ func main() {
7679
lc := make(chan struct{})
7780

7881
go func() {
82+
defer close(lc)
7983
starter.StartLivepeer(ctx, cfg)
80-
lc <- struct{}{}
8184
}()
8285

83-
c := make(chan os.Signal)
84-
signal.Notify(c, os.Interrupt)
86+
c := make(chan os.Signal, 1)
87+
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
88+
8589
select {
8690
case sig := <-c:
8791
glog.Infof("Exiting Livepeer: %v", sig)
8892
cancel()
89-
time.Sleep(time.Second * 2) //Give time for other processes to shut down completely
9093
case <-lc:
94+
// fallthrough to normal shutdown below
95+
}
96+
select {
97+
case <-lc:
98+
glog.Infof("Graceful shutdown complete")
99+
case <-time.After(shutdownTimeout):
100+
glog.Infof("Shutdown timed out, forcing exit")
101+
os.Exit(1)
91102
}
92103
}

0 commit comments

Comments
 (0)