From 78cce24e6c087e278ecc1e95e07b55453fb305d0 Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Tue, 4 Jun 2024 12:25:56 -0400 Subject: [PATCH] Disable session recording for non-interactive sessions (#41991) (#42320) These recordings only contain session.start, session.end and session.leave events, all of which are already included in the audit log. Removing these recordings should produce no data loss but will greatly reduce the amount of work performed by the agents, the auth service, and storage costs. The only case where non-interactive sessions are still recording is when BPF is enabled. This is required, for now, because enhanced session recording can generate more events than the audit log has traditionally been able to ingest. --- constants.go | 3 - integration/assist/command_test.go | 20 -- lib/bpf/bpf.go | 4 + lib/bpf/common.go | 7 + .../eventstest/mock_recorder_emitter.go | 16 +- lib/srv/ctx.go | 6 - lib/srv/exec.go | 16 +- lib/srv/forward/sshserver.go | 3 +- lib/srv/mock.go | 25 ++ lib/srv/sess.go | 41 ++-- lib/srv/sess_test.go | 230 +++++++++++++++++- lib/web/command.go | 3 - 12 files changed, 293 insertions(+), 81 deletions(-) diff --git a/constants.go b/constants.go index a7ddfd7b7312f..ffccba09faf37 100644 --- a/constants.go +++ b/constants.go @@ -48,9 +48,6 @@ const ( // SSHSessionID is the UUID of the current session. SSHSessionID = "SSH_SESSION_ID" - - // EnableNonInteractiveSessionRecording can be used to record non-interactive SSH session. - EnableNonInteractiveSessionRecording = "SSH_TELEPORT_RECORD_NON_INTERACTIVE" ) const ( diff --git a/integration/assist/command_test.go b/integration/assist/command_test.go index 95d3a48e5a184..4a3679d26922d 100644 --- a/integration/assist/command_test.go +++ b/integration/assist/command_test.go @@ -40,7 +40,6 @@ import ( "github.com/gorilla/websocket" "github.com/gravitational/trace" "github.com/sashabaranov/go-openai" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" "google.golang.org/protobuf/types/known/timestamppb" @@ -87,7 +86,6 @@ func TestAssistCommandOpenSSH(t *testing.T) { openAIMock := mockOpenAI(t) rc := setupTeleport(t, testDir, openAIMock.URL) - auth := rc.Process.GetAuthServer() proxyAddr, err := rc.Process.ProxyWebAddr() require.NoError(t, err) @@ -158,24 +156,6 @@ func TestAssistCommandOpenSSH(t *testing.T) { require.NoError(t, err) require.Equal(t, defaults.WebsocketClose, envelope.Type) // Now the execution is finished - - // Waiting for the session recording to be uploaded and available - require.Eventually(t, func() bool { - chunk, err := auth.GetSessionChunk(apidefaults.Namespace, sessionMetadata.Session.ID, 0, 4096) - if err != nil { - if trace.IsNotFound(err) { - return false - } - assert.Fail(t, "error should be nil or not found, is %s", err) - } - assert.NotNil(t, chunk) - return true - }, 10*time.Second, 200*time.Millisecond) - - // Validating the session recording contains the SSH server output - chunk, err := auth.GetSessionChunk(apidefaults.Namespace, sessionMetadata.Session.ID, 0, 4096) - require.NoError(t, err) - require.Equal(t, testCommandOutput, string(chunk)) } // mockOpenAI starts an OpenAI mock server that answers one completion request diff --git a/lib/bpf/bpf.go b/lib/bpf/bpf.go index 47fa109c8d77e..bdaf5b52f6440 100644 --- a/lib/bpf/bpf.go +++ b/lib/bpf/bpf.go @@ -300,6 +300,10 @@ func (s *Service) CloseSession(ctx *SessionContext) error { return trace.NewAggregate(errs...) } +func (s *Service) Enabled() bool { + return true +} + // processAccessEvents pulls events off the perf ring buffer, parses them, and emits them to // the audit log. func (s *Service) processAccessEvents() { diff --git a/lib/bpf/common.go b/lib/bpf/common.go index 0c99b4cd41d3f..0ebd72a2c3156 100644 --- a/lib/bpf/common.go +++ b/lib/bpf/common.go @@ -42,6 +42,9 @@ type BPF interface { // Close will stop any running BPF programs. Close(restarting bool) error + + // Enabled returns whether enhanced recording is active. + Enabled() bool } // SessionContext contains all the information needed to track and emit @@ -101,6 +104,10 @@ func (s *NOP) CloseSession(_ *SessionContext) error { return nil } +func (s *NOP) Enabled() bool { + return false +} + // IsHostCompatible checks that BPF programs can run on this host. func IsHostCompatible() error { minKernel := semver.New(constants.EnhancedRecordingMinKernel) diff --git a/lib/events/eventstest/mock_recorder_emitter.go b/lib/events/eventstest/mock_recorder_emitter.go index 7726ed2c395e2..dcec70c62353d 100644 --- a/lib/events/eventstest/mock_recorder_emitter.go +++ b/lib/events/eventstest/mock_recorder_emitter.go @@ -28,8 +28,9 @@ import ( // MockRecorderEmitter is a recorder and emitter that stores all events. type MockRecorderEmitter struct { - mu sync.RWMutex - events []apievents.AuditEvent + mu sync.RWMutex + events []apievents.AuditEvent + recordedEvents []apievents.PreparedSessionEvent } func (e *MockRecorderEmitter) Write(_ []byte) (int, error) { @@ -48,6 +49,7 @@ func (e *MockRecorderEmitter) EmitAuditEvent(ctx context.Context, event apievent func (e *MockRecorderEmitter) RecordEvent(ctx context.Context, event apievents.PreparedSessionEvent) error { e.mu.Lock() defer e.mu.Unlock() + e.recordedEvents = append(e.recordedEvents, event) e.events = append(e.events, event.GetAuditEvent()) return nil } @@ -72,6 +74,16 @@ func (e *MockRecorderEmitter) Events() []apievents.AuditEvent { return result } +// RecordedEvents returns all the emitted events. +func (e *MockRecorderEmitter) RecordedEvents() []apievents.PreparedSessionEvent { + e.mu.RLock() + defer e.mu.RUnlock() + + result := make([]apievents.PreparedSessionEvent, len(e.recordedEvents)) + copy(result, e.recordedEvents) + return result +} + // Reset clears the emitted events. func (e *MockRecorderEmitter) Reset() { e.mu.Lock() diff --git a/lib/srv/ctx.go b/lib/srv/ctx.go index 12b6cadd15ea1..f9d5c159e5811 100644 --- a/lib/srv/ctx.go +++ b/lib/srv/ctx.go @@ -407,12 +407,6 @@ type ServerContext struct { killShellr *os.File killShellw *os.File - // multiWriter is used to record non-interactive session output. - // Currently, used by Assist. - multiWriter io.Writer - // recordNonInteractiveSession enables non-interactive session recording. Used by Assist. - recordNonInteractiveSession bool - // ExecType holds the type of the channel or request. For example "session" or // "direct-tcpip". Used to create correct subcommand during re-exec. ExecType string diff --git a/lib/srv/exec.go b/lib/srv/exec.go index abc95f8643b8f..61ef8baade8ae 100644 --- a/lib/srv/exec.go +++ b/lib/srv/exec.go @@ -161,13 +161,7 @@ func (e *localExec) Start(ctx context.Context, channel ssh.Channel) (*ExecResult // Connect stdout and stderr to the channel so the user can interact with the command. e.Cmd.Stderr = channel.Stderr() - - if e.Ctx.recordNonInteractiveSession { - e.Ctx.Tracef("Starting local exec and recording non-interactive session") - e.Cmd.Stdout = io.MultiWriter(e.Ctx.multiWriter, channel) - } else { - e.Cmd.Stdout = channel - } + e.Cmd.Stdout = channel // Copy from the channel (client input) into stdin of the process. inputWriter, err := e.Cmd.StdinPipe() @@ -380,13 +374,7 @@ func (e *remoteExec) Start(ctx context.Context, ch ssh.Channel) (*ExecResult, er } // hook up stdout/err the channel so the user can interact with the command - if e.ctx.recordNonInteractiveSession { - e.ctx.Tracef("Starting remote exec and recording non-interactive session") - e.session.Stdout = io.MultiWriter(e.ctx.multiWriter, ch) - } else { - e.session.Stdout = ch - } - + e.session.Stdout = ch e.session.Stderr = ch.Stderr() inputWriter, err := e.session.StdinPipe() if err != nil { diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 4889c13635303..bea84f7caa598 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -1587,8 +1587,7 @@ func (s *Server) handlePuTTYWinadj(ch ssh.Channel, req *ssh.Request) error { // teleportVarPrefixes contains the list of prefixes used by Teleport environment // variables. Matching variables are saved in the session context when forwarding // the calls to a remote SSH server as they can contain Teleport-specific -// information used to process the session properly (e.g. TELEPORT_SESSION or -// SSH_TELEPORT_RECORD_NON_INTERACTIVE) +// information used to process the session properly (e.g. TELEPORT_SESSION) var teleportVarPrefixes = []string{"TELEPORT_", "SSH_TELEPORT_"} func isTeleportEnv(varName string) bool { diff --git a/lib/srv/mock.go b/lib/srv/mock.go index fa0859cdded69..ae58be8bac31e 100644 --- a/lib/srv/mock.go +++ b/lib/srv/mock.go @@ -157,6 +157,7 @@ type mockServer struct { auth *auth.Server component string clock clockwork.FakeClock + bpf bpf.BPF } // ID is the unique ID of the server. @@ -251,6 +252,10 @@ func (m *mockServer) UseTunnel() bool { // GetBPF returns the BPF service used for enhanced session recording. func (m *mockServer) GetBPF() bpf.BPF { + if m.bpf != nil { + return m.bpf + } + return &bpf.NOP{} } @@ -387,3 +392,23 @@ func (c *mockSSHChannel) SendRequest(name string, wantReply bool, payload []byte func (c *mockSSHChannel) Stderr() io.ReadWriter { return c.stdErr } + +type fakeBPF struct { + bpf bpf.NOP +} + +func (f fakeBPF) OpenSession(ctx *bpf.SessionContext) (uint64, error) { + return f.bpf.OpenSession(ctx) +} + +func (f fakeBPF) CloseSession(ctx *bpf.SessionContext) error { + return f.bpf.CloseSession(ctx) +} + +func (f fakeBPF) Close(restarting bool) error { + return f.bpf.Close(restarting) +} + +func (f fakeBPF) Enabled() bool { + return true +} diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 78741bfbbf822..60424788167e3 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -368,11 +368,6 @@ func (s *SessionRegistry) OpenExecSession(ctx context.Context, channel ssh.Chann scx.Tracef("Session found, reusing it %s", sessionID) } - _, found = scx.GetEnv(teleport.EnableNonInteractiveSessionRecording) - if found { - scx.recordNonInteractiveSession = true - } - // This logic allows concurrent request to create a new session // to fail, what is ok because we should never have this condition. sess, _, err := newSession(ctx, sessionID, s, scx, channel) @@ -1015,8 +1010,18 @@ func (s *session) emitSessionJoinEvent(ctx *ServerContext) { sessionJoinEvent.ConnectionMetadata.LocalAddr = ctx.ServerConn.LocalAddr().String() } + var notifyPartyPayload []byte preparedEvent, err := s.Recorder().PrepareSessionEvent(sessionJoinEvent) if err == nil { + // Try marshaling the event prior to emitting it to prevent races since + // the audit/recording machinery might try to set some fields while the + // marshal is underway. + if eventPayload, err := json.Marshal(preparedEvent); err != nil { + s.log.Warnf("Unable to marshal %v: %v.", events.SessionJoinEvent, err) + } else { + notifyPartyPayload = eventPayload + } + if err := s.recordEvent(ctx.srv.Context(), preparedEvent); err != nil { s.log.WithError(err).Warn("Failed to record session join event.") } @@ -1030,12 +1035,15 @@ func (s *session) emitSessionJoinEvent(ctx *ServerContext) { // Notify all members of the party that a new member has joined over the // "x-teleport-event" channel. for _, p := range s.parties { - eventPayload, err := json.Marshal(sessionJoinEvent) - if err != nil { - s.log.Warnf("Unable to marshal %v for %v: %v.", events.SessionJoinEvent, p.sconn.RemoteAddr(), err) + if len(notifyPartyPayload) == 0 { + s.log.Warnf("No join event to send to %v", p.sconn.RemoteAddr()) continue } - _, _, err = p.sconn.SendRequest(teleport.SessionEvent, false, eventPayload) + + payload := make([]byte, len(notifyPartyPayload)) + copy(payload, notifyPartyPayload) + + _, _, err = p.sconn.SendRequest(teleport.SessionEvent, false, payload) if err != nil { s.log.Warnf("Unable to send %v to %v: %v.", events.SessionJoinEvent, p.sconn.RemoteAddr(), err) continue @@ -1384,6 +1392,11 @@ func newRecorder(s *session, ctx *ServerContext) (events.SessionPreparerRecorder return events.WithNoOpPreparer(events.NewDiscardRecorder()), nil } + // Don't record non-interactive sessions when enhanced recording is disabled. + if ctx.GetTerm() == nil && !ctx.srv.GetBPF().Enabled() { + return events.WithNoOpPreparer(events.NewDiscardRecorder()), nil + } + rec, err := recorder.New(recorder.Config{ SessionID: s.id, ServerID: s.serverMeta.ServerID, @@ -1414,12 +1427,6 @@ func newRecorder(s *session, ctx *ServerContext) (events.SessionPreparerRecorder } func (s *session) startExec(ctx context.Context, channel ssh.Channel, scx *ServerContext) error { - if scx.recordNonInteractiveSession { - // enable recording. - s.io.AddWriter(sessionRecorderID, utils.WriteCloserWithContext(scx.srv.Context(), s.Recorder())) - s.scx.multiWriter = s.io - } - // Emit a session.start event for the exec session. s.emitSessionStartEvent(scx) @@ -1474,10 +1481,6 @@ func (s *session) startExec(ctx context.Context, channel ssh.Channel, scx *Serve // Process has been placed in a cgroup, continue execution. execRequest.Continue() - if scx.recordNonInteractiveSession { - s.io.On() - } - // Process is running, wait for it to stop. go func() { result = execRequest.Wait() diff --git a/lib/srv/sess_test.go b/lib/srv/sess_test.go index 57797c2821b28..3f9cc160b5e15 100644 --- a/lib/srv/sess_test.go +++ b/lib/srv/sess_test.go @@ -22,6 +22,7 @@ import ( "context" "io" "os/user" + "slices" "sync/atomic" "testing" "time" @@ -262,6 +263,7 @@ func TestSession_newRecorder(t *testing.T) { }, sctx: &ServerContext{ SessionRecordingConfig: proxyRecording, + term: &terminal{}, }, errAssertion: require.NoError, recAssertion: isNotSessionWriter, @@ -281,6 +283,7 @@ func TestSession_newRecorder(t *testing.T) { }, sctx: &ServerContext{ SessionRecordingConfig: proxyRecordingSync, + term: &terminal{}, }, errAssertion: require.NoError, recAssertion: isNotSessionWriter, @@ -303,6 +306,7 @@ func TestSession_newRecorder(t *testing.T) { srv: &mockServer{ component: teleport.ComponentNode, }, + term: &terminal{}, Identity: IdentityContext{ AccessChecker: services.NewAccessCheckerWithRoleSet(&services.AccessInfo{ Roles: []string{"dev"}, @@ -359,6 +363,7 @@ func TestSession_newRecorder(t *testing.T) { }, }), }, + term: &terminal{}, }, errAssertion: require.NoError, recAssertion: func(t require.TestingT, i interface{}, _ ...interface{}) { @@ -388,6 +393,7 @@ func TestSession_newRecorder(t *testing.T) { MockRecorderEmitter: &eventstest.MockRecorderEmitter{}, datadir: t.TempDir(), }, + term: &terminal{}, }, errAssertion: require.NoError, recAssertion: func(t require.TestingT, i interface{}, i2 ...interface{}) { @@ -456,12 +462,14 @@ func TestSession_emitAuditEvent(t *testing.T) { }) } -// TestInteractiveSession tests interaction session lifecycles. -// Multiple sessions are opened in parallel tests to test for -// deadlocks between session registry, sessions, and parties. +// TestInteractiveSession tests interactive session lifecycles +// and validates audit events and session recordings are emitted. func TestInteractiveSession(t *testing.T) { + t.Parallel() + srv := newMockServer(t) srv.component = teleport.ComponentNode + t.Cleanup(func() { require.NoError(t, srv.auth.Close()) }) reg, err := NewSessionRegistry(SessionRegistryConfig{ Srv: srv, @@ -470,19 +478,217 @@ func TestInteractiveSession(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { reg.Close() }) - t.Run("Stop", func(t *testing.T) { + // Create a server context with an overridden recording mode + // so that sessions are recorded with the test emitter. + scx := newTestServerContext(t, reg.Srv, nil) + rcfg := types.DefaultSessionRecordingConfig() + rcfg.SetMode(types.RecordAtNodeSync) + scx.SessionRecordingConfig = rcfg + + // Allocate a terminal for the session so that + // events are properly recorded. + terminal, err := newLocalTerminal(scx) + require.NoError(t, err) + scx.term = terminal + + // Open a new session + sshChanOpen := newMockSSHChannel() + go func() { + // Consume stdout sent to the channel + io.ReadAll(sshChanOpen) + }() + require.NoError(t, reg.OpenSession(context.Background(), sshChanOpen, scx)) + require.NotNil(t, scx.session) + + // Simulate changing window size to capture an additional event. + require.NoError(t, reg.NotifyWinChange(context.Background(), rsession.TerminalParams{W: 100, H: 100}, scx)) + + // Stopping the session should trigger the session + // to end and cleanup in the background + scx.session.Stop() + + // Wait for the session to be removed from the registry. + require.Eventually(t, func() bool { + _, found := reg.findSession(scx.session.id) + return !found + }, time.Second*15, time.Millisecond*500) + + // Validate that the expected audit events were emitted. + expectedEvents := []string{events.SessionStartEvent, events.ResizeEvent, events.SessionEndEvent, events.SessionLeaveEvent} + require.Eventually(t, func() bool { + actual := srv.MockRecorderEmitter.Events() + + for _, evt := range expectedEvents { + contains := slices.ContainsFunc(actual, func(event apievents.AuditEvent) bool { + return event.GetType() == evt + }) + if !contains { + return false + } + } + return true + }, 15*time.Second, 500*time.Millisecond) + + // Validate that the expected recording events were emitted. + require.Eventually(t, func() bool { + actual := srv.MockRecorderEmitter.RecordedEvents() + + for _, evt := range expectedEvents { + contains := slices.ContainsFunc(actual, func(event apievents.PreparedSessionEvent) bool { + return event.GetAuditEvent().GetType() == evt + }) + if !contains { + return false + } + } + + return true + }, 15*time.Second, 500*time.Millisecond) +} + +// TestNonInteractiveSession tests non-interactive session lifecycles +// and validates audit events and session recordings are emitted when +// appropriate. +func TestNonInteractiveSession(t *testing.T) { + t.Parallel() + + t.Run("without BPF", func(t *testing.T) { + t.Parallel() + + srv := newMockServer(t) + srv.component = teleport.ComponentNode + t.Cleanup(func() { require.NoError(t, srv.auth.Close()) }) + + reg, err := NewSessionRegistry(SessionRegistryConfig{ + Srv: srv, + SessionTrackerService: srv.auth, + }) + require.NoError(t, err) + t.Cleanup(func() { reg.Close() }) + + // Create a server context with an overridden recording mode + // so that sessions are recorded with the test emitter. + scx := newTestServerContext(t, reg.Srv, nil) + rcfg := types.DefaultSessionRecordingConfig() + rcfg.SetMode(types.RecordAtNodeSync) + scx.SessionRecordingConfig = rcfg + + // Modify the execRequest to actually execute a command. + scx.execRequest = &localExec{Ctx: scx, Command: "true"} + + // Open a new session + sshChanOpen := newMockSSHChannel() + go func() { + // Consume stdout sent to the channel + io.ReadAll(sshChanOpen) + }() + require.NoError(t, reg.OpenExecSession(context.Background(), sshChanOpen, scx)) + require.NotNil(t, scx.session) + + // Wait for the command execution to complete and the session to be terminated. + require.Eventually(t, func() bool { + _, found := reg.findSession(scx.session.id) + return !found + }, time.Second*15, time.Millisecond*500) + + // Verify that all the expected audit events are eventually emitted. + expected := []string{events.SessionStartEvent, events.ExecEvent, events.SessionEndEvent, events.SessionLeaveEvent} + require.Eventually(t, func() bool { + actual := srv.MockRecorderEmitter.Events() + + for _, evt := range expected { + contains := slices.ContainsFunc(actual, func(event apievents.AuditEvent) bool { + return event.GetType() == evt + }) + if !contains { + return false + } + } + + return true + }, 15*time.Second, 500*time.Millisecond) + + // Verify that NO recordings were emitted + require.Empty(t, srv.MockRecorderEmitter.RecordedEvents()) + }) + + t.Run("with BPF", func(t *testing.T) { t.Parallel() - sess, _ := testOpenSession(t, reg, nil) - // Stopping the session should trigger the session - // to end and cleanup in the background - sess.Stop() + srv := newMockServer(t) + srv.component = teleport.ComponentNode + // Modify bpf to "enable" enhanced recording. This should + // trigger recordings to be captured. + srv.bpf = fakeBPF{} + t.Cleanup(func() { require.NoError(t, srv.auth.Close()) }) - sessionClosed := func() bool { - _, found := reg.findSession(sess.id) + reg, err := NewSessionRegistry(SessionRegistryConfig{ + Srv: srv, + SessionTrackerService: srv.auth, + }) + require.NoError(t, err) + t.Cleanup(func() { reg.Close() }) + + // Create a server context with an overridden recording mode + // so that sessions are recorded with the test emitter. + scx := newTestServerContext(t, reg.Srv, nil) + rcfg := types.DefaultSessionRecordingConfig() + rcfg.SetMode(types.RecordAtNodeSync) + scx.SessionRecordingConfig = rcfg + + // Modify the execRequest to actually execute a command. + scx.execRequest = &localExec{Ctx: scx, Command: "true"} + + // Open a new session + sshChanOpen := newMockSSHChannel() + go func() { + // Consume stdout sent to the channel + io.ReadAll(sshChanOpen) + }() + require.NoError(t, reg.OpenExecSession(context.Background(), sshChanOpen, scx)) + require.NotNil(t, scx.session) + + // Wait for the command execution to complete and the session to be terminated. + require.Eventually(t, func() bool { + _, found := reg.findSession(scx.session.id) return !found - } - require.Eventually(t, sessionClosed, time.Second*15, time.Millisecond*500) + }, time.Second*15, time.Millisecond*500) + + // Verify that all the expected audit events are eventually emitted. + expectedEvents := []string{events.SessionStartEvent, events.ExecEvent, events.SessionEndEvent, events.SessionLeaveEvent} + require.Eventually(t, func() bool { + actual := srv.MockRecorderEmitter.Events() + + for _, evt := range expectedEvents { + contains := slices.ContainsFunc(actual, func(event apievents.AuditEvent) bool { + return event.GetType() == evt + }) + if !contains { + return false + } + } + + return true + }, 15*time.Second, 500*time.Millisecond) + + // Validate that the expected recording events were emitted. + require.Eventually(t, func() bool { + actual := srv.MockRecorderEmitter.RecordedEvents() + + for _, evt := range expectedEvents { + if evt == events.ExecEvent { + continue + } + contains := slices.ContainsFunc(actual, func(event apievents.PreparedSessionEvent) bool { + return event.GetAuditEvent().GetType() == evt + }) + if !contains { + return false + } + } + + return true + }, 15*time.Second, 500*time.Millisecond) }) } diff --git a/lib/web/command.go b/lib/web/command.go index 014e5229a34f7..b647f1949dc31 100644 --- a/lib/web/command.go +++ b/lib/web/command.go @@ -675,9 +675,6 @@ func (t *commandHandler) streamOutput(ctx context.Context, tc *client.TeleportCl defer nc.Close() - // Enable session recording - nc.AddEnv(teleport.EnableNonInteractiveSessionRecording, "true") - // Establish SSH connection to the server. This function will block until // either an error occurs or it completes successfully. if err = nc.RunCommand(ctx, t.interactiveCommand); err != nil {