Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminal for workbench #80

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4b83b3e
basic implementation
Fluder-Paradyne Jul 10, 2024
46a8c41
stream stdio using emit
Fluder-Paradyne Jul 10, 2024
17bea3d
revert yarn.lock, delete accidental additions and fix session
Fluder-Paradyne Jul 11, 2024
d782685
go mod tidy
Fluder-Paradyne Jul 11, 2024
73db86d
add authentication for terminal endpoint
Fluder-Paradyne Jul 11, 2024
d4a0c6b
add run command controller
Fluder-Paradyne Jul 11, 2024
7eabd21
fix defer func, stop tty from being closed by handler
rounak610 Jul 9, 2024
2311ead
fix loggers
Fluder-Paradyne Jul 16, 2024
4073bfa
split into functions and fix wait group add
Fluder-Paradyne Jul 16, 2024
b309083
fix concurrent write issues
Fluder-Paradyne Jul 16, 2024
e0ff2f3
add better logs
Fluder-Paradyne Jul 16, 2024
a97af6e
frontend for xterm integration
jedan2506 Jul 17, 2024
c9a6886
remove buffer out of loop and add more logs and change log levels
Fluder-Paradyne Jul 17, 2024
40eb205
changing terminal designs
jedan2506 Jul 17, 2024
112691a
Merge remote-tracking branch 'rakesh/terminal-poc' into terminal-poc
jedan2506 Jul 17, 2024
49dfd06
fix tty closing automatically issue and add cancel function in places…
Fluder-Paradyne Jul 17, 2024
b2ccae8
revert commented out node-executor
Fluder-Paradyne Jul 17, 2024
c5dc83d
sort imports
Fluder-Paradyne Jul 17, 2024
be18681
add restore from history
Fluder-Paradyne Jul 17, 2024
00729c9
changing terminal designs
jedan2506 Jul 17, 2024
9c4aeb6
Merge remote-tracking branch 'rakesh/terminal-poc' into terminal-poc
jedan2506 Jul 17, 2024
3c860c3
new changes for integration
jedan2506 Jul 19, 2024
76e0d70
finally got it working
jedan2506 Jul 21, 2024
953fc99
changes to the terminal code
jedan2506 Jul 21, 2024
4c4d1cf
build fix
jedan2506 Jul 21, 2024
4f02966
css changes
jedan2506 Jul 21, 2024
be8f19c
css changes
jedan2506 Jul 21, 2024
6d8bce0
add CreateWorkspaceTerminal
Fluder-Paradyne Jul 29, 2024
cd0ca36
Merge branch 'main' into terminal-poc
Fluder-Paradyne Jul 30, 2024
9d72b1d
fix allowed host requirements
Fluder-Paradyne Jul 29, 2024
ec2ba68
change terminal style
Fluder-Paradyne Jul 29, 2024
32ceea6
terminal url changes
jedan2506 Jul 30, 2024
616c773
remove DefaultConnectionErrorLimit as it is unused
Fluder-Paradyne Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ WORKDIR $GOPATH/src/packages/ai-developer/

RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /go/executor executor.go

FROM build-base AS terminal-base

WORKDIR $GOPATH/src/packages/ai-developer/

RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /go/terminal terminal.go


FROM build-base AS worker-development

Expand Down Expand Up @@ -105,6 +111,23 @@ COPY ./app/prompts /go/prompts

ENTRYPOINT ["bash", "-c", "/go/executor"]

FROM superagidev/supercoder-python-ide:latest AS terminal

RUN git config --global user.email "supercoder@superagi.com"
RUN git config --global user.name "SuperCoder"

ENV TERM xterm
ENV HOME /home/coder

# to make the terminal look nice
RUN echo "PS1='\[\033[01;32m\]\u:\[\033[01;34m\]\w\[\033[00m\]\$ '" >> /home/coder/.bashrc


COPY --from=terminal-base /go/terminal /go/terminal
COPY ./app/prompts /go/prompts

ENTRYPOINT ["bash", "-c", "/go/terminal"]

FROM public.ecr.aws/docker/library/debian:bookworm-slim as production

# install git
Expand Down
1 change: 1 addition & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func LoadConfig() (*koanf.Koanf, error) {
"aws": map[string]interface{}{
"region": "us-west-2",
},
"terminal.allowed.host": "localhost",
}, "."), nil)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions app/config/terminal_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package config

func GetAllowedHost() string {
return config.String("terminal.allowed.host")
}
296 changes: 296 additions & 0 deletions app/controllers/terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package controllers

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"

"ai-developer/app/types/request"
"ai-developer/app/utils"

"github.com/creack/pty"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)

type TTYSize struct {
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
X uint16 `json:"x"`
Y uint16 `json:"y"`
}

var WebsocketMessageType = map[int]string{
websocket.BinaryMessage: "binary",
websocket.TextMessage: "text",
websocket.CloseMessage: "close",
websocket.PingMessage: "ping",
websocket.PongMessage: "pong",
}

type TerminalController struct {
MaxBufferSizeBytes int
KeepalivePingTimeout time.Duration
ConnectionErrorLimit int
cmd *exec.Cmd
Command string
Arguments []string
AllowedHostnames []string
logger *zap.Logger
tty *os.File
cancelFunc context.CancelFunc
writeMutex sync.Mutex
historyBuffer bytes.Buffer
}

func NewTerminalController(logger *zap.Logger, command string, arguments []string, allowedHostnames []string) (*TerminalController, error) {
cmd := exec.Command(command, arguments...)
tty, err := pty.Start(cmd)
if err != nil {
logger.Warn("failed to start command", zap.Error(err))
return nil, err
}
ttyBuffer := bytes.Buffer{}
return &TerminalController{
MaxBufferSizeBytes: 1024,
KeepalivePingTimeout: 20 * time.Second,
ConnectionErrorLimit: 10,
tty: tty,
cmd: cmd,
Arguments: arguments,
AllowedHostnames: allowedHostnames,
logger: logger,
historyBuffer: ttyBuffer,
}, nil
}

func (controller *TerminalController) RunCommand(ctx *gin.Context) {
var commandRequest request.RunCommandRequest
if err := ctx.ShouldBindJSON(&commandRequest); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
command := commandRequest.Command
if command == "" {
ctx.JSON(400, gin.H{"error": "command is required"})
return
}
if !strings.HasSuffix(command, "\n") {
command += "\n"
}

_, err := controller.tty.Write([]byte(command))
if err != nil {
return
}
}

func (controller *TerminalController) NewTerminal(ctx *gin.Context) {
subCtx, cancelFunc := context.WithCancel(ctx)
controller.cancelFunc = cancelFunc

controller.logger.Info("setting up new terminal connection...")

connection, err := controller.setupConnection(ctx, ctx.Writer, ctx.Request)
defer func(connection *websocket.Conn) {
controller.logger.Info("closing websocket connection...")
err := connection.Close()
if err != nil {
controller.logger.Warn("failed to close connection", zap.Error(err))
}
}(connection)
if err != nil {
controller.logger.Warn("failed to setup connection", zap.Error(err))
return
}

// restore history from buffer
controller.writeMutex.Lock()
if err := connection.WriteMessage(websocket.BinaryMessage, controller.historyBuffer.Bytes()); err != nil {
controller.logger.Info("failed to write tty buffer to xterm.js", zap.Error(err))
}
controller.writeMutex.Unlock()

var waiter sync.WaitGroup

waiter.Add(3)

go controller.keepAlive(subCtx, connection, &waiter)

go controller.readFromTTY(subCtx, connection, &waiter)

go controller.writeToTTY(subCtx, connection, &waiter)

waiter.Wait()

controller.logger.Info("closing connection...")
}

func (controller *TerminalController) setupConnection(ctx context.Context, w gin.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
upgrader := utils.GetConnectionUpgrader(controller.AllowedHostnames, controller.MaxBufferSizeBytes)
connection, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return nil, err
}
return connection, nil
}

func (controller *TerminalController) keepAlive(ctx context.Context, connection *websocket.Conn, waiter *sync.WaitGroup) {
defer func() {
waiter.Done()
controller.logger.Info("keepAlive goroutine exiting...")
}()
lastPongTime := time.Now()
keepalivePingTimeout := controller.KeepalivePingTimeout

connection.SetPongHandler(func(msg string) error {
lastPongTime = time.Now()
return nil
})

for {
select {
case <-ctx.Done():
controller.logger.Info("done keeping alive...")
return
default:
controller.logger.Info("sending keepalive ping message...")
controller.writeMutex.Lock()
if err := connection.WriteMessage(websocket.PingMessage, []byte("keepalive")); err != nil {
controller.writeMutex.Unlock()
controller.logger.Error("failed to write ping message", zap.Error(err))
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
controller.cancelFunc()
}
return
}
controller.writeMutex.Unlock()

time.Sleep(keepalivePingTimeout / 2)

if time.Now().Sub(lastPongTime) > keepalivePingTimeout {
controller.logger.Warn("failed to get response from ping, triggering disconnect now...")
return
}
controller.logger.Info("received response from ping successfully")
}
}
}

func (controller *TerminalController) readFromTTY(ctx context.Context, connection *websocket.Conn, waiter *sync.WaitGroup) {
defer func() {
waiter.Done()
controller.logger.Info("readFromTTY goroutine exiting...")
}()
errorCounter := 0
buffer := make([]byte, controller.MaxBufferSizeBytes)
for {
select {
case <-ctx.Done():
controller.logger.Info("done reading from tty...")
return
default:

readLength, err := controller.tty.Read(buffer)
if err != nil {
controller.logger.Warn("failed to read from tty", zap.Error(err))
controller.writeMutex.Lock()
if err := connection.WriteMessage(websocket.TextMessage, []byte("bye!")); err != nil {
controller.logger.Warn("failed to send termination message from tty to xterm.js", zap.Error(err))
}
controller.writeMutex.Unlock()
return
}

controller.writeMutex.Lock()
// save to history buffer
controller.historyBuffer.Write(buffer[:readLength])
if err := connection.WriteMessage(websocket.BinaryMessage, buffer[:readLength]); err != nil {
controller.writeMutex.Unlock()
controller.logger.Warn(fmt.Sprintf("failed to send %v bytes from tty to xterm.js", readLength), zap.Int("read_length", readLength), zap.Error(err))
errorCounter++
if errorCounter > controller.ConnectionErrorLimit {
return
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
controller.logger.Info("WebSocket closed by client")
controller.cancelFunc()
return
}
continue
}
controller.writeMutex.Unlock()

controller.logger.Info(fmt.Sprintf("sent message of size %v bytes from tty to xterm.js", readLength), zap.Int("read_length", readLength))
errorCounter = 0
}
}
}

func (controller *TerminalController) writeToTTY(ctx context.Context, connection *websocket.Conn, waiter *sync.WaitGroup) {
defer func() {
waiter.Done()
controller.logger.Info("writeToTTY goroutine exiting...")
}()
for {
select {
case <-ctx.Done():
controller.logger.Info("done writing from tty...")
return
default:

messageType, data, err := connection.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
controller.logger.Info("WebSocket closed by client")
controller.cancelFunc()
return
}
controller.logger.Warn("failed to get next reader", zap.Error(err))
return
}

dataLength := len(data)
dataBuffer := bytes.Trim(data, "\x00")
dataType := WebsocketMessageType[messageType]

controller.logger.Info(fmt.Sprintf("received %s (type: %v) message of size %v byte(s) from xterm.js with key sequence: %v", dataType, messageType, dataLength, dataBuffer))

if messageType == websocket.BinaryMessage && dataBuffer[0] == 1 {
controller.resizeTTY(dataBuffer)
continue
}

bytesWritten, err := controller.tty.Write(dataBuffer)
if err != nil {
controller.logger.Error(fmt.Sprintf("failed to write %v bytes to tty: %s", len(dataBuffer), err), zap.Int("bytes_written", bytesWritten), zap.Error(err))
continue
}
controller.logger.Info("bytes written to tty...", zap.Int("bytes_written", bytesWritten))
}
}
}

func (controller *TerminalController) resizeTTY(dataBuffer []byte) {
ttySize := &TTYSize{}
resizeMessage := bytes.Trim(dataBuffer[1:], " \n\r\t\x00\x01")
if err := json.Unmarshal(resizeMessage, ttySize); err != nil {
controller.logger.Warn(fmt.Sprintf("failed to unmarshal received resize message '%s'", resizeMessage), zap.ByteString("resizeMessage", resizeMessage), zap.Error(err))
return
}
controller.logger.Info("resizing tty ", zap.Uint16("rows", ttySize.Rows), zap.Uint16("cols", ttySize.Cols))
if err := pty.Setsize(controller.tty, &pty.Winsize{
Rows: ttySize.Rows,
Cols: ttySize.Cols,
}); err != nil {
controller.logger.Warn("failed to resize tty", zap.Error(err))
}
}
5 changes: 5 additions & 0 deletions app/types/request/run_command_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package request

type RunCommandRequest struct {
Command string `json:"command"`
}
33 changes: 33 additions & 0 deletions app/utils/websocket_connection_upgrader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package utils

import (
"fmt"
"net/http"
"strings"

"github.com/gorilla/websocket"
)

func GetConnectionUpgrader(
allowedHostnames []string,
maxBufferSizeBytes int,
) websocket.Upgrader {
return websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
requesterHostname := r.Host
if strings.Index(requesterHostname, ":") != -1 {
requesterHostname = strings.Split(requesterHostname, ":")[0]
}
for _, allowedHostname := range allowedHostnames {
if strings.HasSuffix(requesterHostname, allowedHostname) {
return true
}
}
fmt.Printf("failed to find '%s' in the list of allowed hostnames ('%s')\n", requesterHostname)
return false
},
HandshakeTimeout: 0,
ReadBufferSize: maxBufferSizeBytes,
WriteBufferSize: maxBufferSizeBytes,
}
}
Loading