From e4a9b3d9d80446cf059052ecf0303a91a5b2fa7a Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Fri, 16 Jan 2026 22:44:06 +0100 Subject: [PATCH 01/11] chore: started cli --- go.mod | 2 - go.sum | 6 --- lib/cli/cli.go | 130 +++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 6 +++ 4 files changed, 136 insertions(+), 8 deletions(-) create mode 100644 lib/cli/cli.go diff --git a/go.mod b/go.mod index 047ac7e..6f1edb9 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/gofiber/swagger v1.1.1 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 - github.com/gorilla/sessions v1.4.0 github.com/gorilla/websocket v1.5.3 github.com/ledongthuc/pdf v0.0.0-20250511090121-5959a4027728 github.com/lib/pq v1.10.9 @@ -75,7 +74,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/gorilla/securecookie v1.1.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.7 // indirect diff --git a/go.sum b/go.sum index bcd298f..246a9d6 100644 --- a/go.sum +++ b/go.sum @@ -151,18 +151,12 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= -github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c= -github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kXD8ePA= -github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= -github.com/gorilla/sessions v1.4.0 h1:kpIYOp/oi6MG/p5PgxApU8srsSw9tuFbt46Lt7auzqQ= -github.com/gorilla/sessions v1.4.0/go.mod h1:FLWm50oby91+hl7p/wRxDth9bWSuk0qVL2emc7lT5ik= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= diff --git a/lib/cli/cli.go b/lib/cli/cli.go new file mode 100644 index 0000000..4a7f91c --- /dev/null +++ b/lib/cli/cli.go @@ -0,0 +1,130 @@ +package cli + +import ( + "fmt" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/ether/etherpad-go/lib/utils" + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +// Dummy Pad struct und Methoden für Demo-Zwecke +// In echter Implementierung: WebSocket/HTTP-Client für Etherpad + +type Pad struct { + host string + padId string + atext string +} + +type PadState struct { + Host string + Path string + PadId string +} + +func connect(host string) *Pad { + + padState := PadState{} + + if host == "" { + padState.Host = "http://127.0.0.1:9001" + padState.Path = "/p/test" + padState.PadId = utils.RandomString(10) + } else { + parsedUrl, err := url.Parse(host) + if err != nil { + fmt.Println("Invalid host URL:", err) + os.Exit(1) + } + padState.Host = fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Host) + const padIdParam = "/p/" + indexOfPadId := strings.Index(parsedUrl.Path, padIdParam) + if indexOfPadId == -1 { + padState.Path = "" + padState.PadId = utils.RandomString(10) + } else { + padState.Path = parsedUrl.Path[0:indexOfPadId] + padState.PadId = parsedUrl.Path[indexOfPadId+len(padIdParam):] + } + } + + httpClient := &http.Client{} + resp, err := httpClient.Get(fmt.Sprintf("%s%s/p/%s", padState.Host, padState.Path, padState.PadId)) + if err != nil || resp.StatusCode != http.StatusOK { + fmt.Printf("Failed to connect to pad at %s%s/p/%s\n", padState.Host, padState.Path, padState.PadId) + os.Exit(1) + } + + websocket.DefaultDialer.Dial() +} + +func (p *Pad) OnConnected(callback func(padState *Pad)) { + // Simuliere initialen Verbindungsaufbau + callback(p) +} + +func (p *Pad) OnNewContents(callback func(atext string)) { + // Simuliere neue Inhalte (hier: alle 3 Sekunden Dummy-Update) + for i := 0; i < 3; i++ { + time.Sleep(3 * time.Second) + p.atext = fmt.Sprintf("Demo Pad Inhalt Update %d", i+1) + callback(p.atext) + } +} + +func (p *Pad) Append(s string) { + p.atext += s +} + +func StartCLI(logger *zap.SugaredLogger) { + args := os.Args + if len(args) < 3 { + fmt.Println("No host specified..") + fmt.Println("Stream Pad to CLI: etherpad http://127.0.0.1:9001/p/test") + fmt.Println("Append contents to pad: etherpad http://127.0.0.1:9001/p/test -a 'hello world'") + os.Exit(1) + } + + host := args[2] + action := "" + if len(args) > 3 { + action = args[3] + } + str := "" + if len(args) > 4 { + str = args[4] + } + + if host != "" { + if action == "" { + pad := connect(host) + pad.OnConnected(func(padState *Pad) { + fmt.Printf("Connected to %s with padId %s\n", padState.host, padState.padId) + fmt.Print("\u001b[2J\u001b[0;0H") + fmt.Println("Pad Contents", "\n"+padState.atext) + }) + pad.OnNewContents(func(atext string) { + fmt.Print("\u001b[2J\u001b[0;0H") + fmt.Println("Pad Contents", "\n"+atext) + }) + } + if action == "-a" { + if str == "" { + fmt.Println("No string specified with pad") + os.Exit(1) + } + pad := connect(host) + pad.OnConnected(func(_ *Pad) { + pad.Append(str) + fmt.Printf("Appended %q to %s\n", str, host) + os.Exit(0) + }) + } + } +} diff --git a/main.go b/main.go index fb64432..a7e7dbb 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "github.com/ether/etherpad-go/lib" api2 "github.com/ether/etherpad-go/lib/api" "github.com/ether/etherpad-go/lib/author" + "github.com/ether/etherpad-go/lib/cli" "github.com/ether/etherpad-go/lib/hooks" "github.com/ether/etherpad-go/lib/pad" "github.com/ether/etherpad-go/lib/plugins" @@ -41,6 +42,11 @@ var uiAssets embed.FS func main() { setupLogger := utils.SetupLogger() defer setupLogger.Sync() + if len(os.Args) > 1 && os.Args[1] == "cli" { + cli.StartCLI(setupLogger) + return + } + settings2.InitSettings(setupLogger) var settings = settings2.Displayed From 6eb577a6e0f8c2e68d640b924d8f02b11d942e80 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 17 Jan 2026 00:02:33 +0100 Subject: [PATCH 02/11] chore: added streaming via cli --- lib/cli/cli.go | 395 ++++++++++++++++++++++++++++++++--- lib/models/ws/clientReady.go | 30 +-- lib/models/ws/userChange.go | 30 +-- lib/utils/logger.go | 10 +- 4 files changed, 411 insertions(+), 54 deletions(-) diff --git a/lib/cli/cli.go b/lib/cli/cli.go index 4a7f91c..e121476 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -1,34 +1,113 @@ package cli import ( + "bytes" + "encoding/json" "fmt" "net/http" "net/url" "os" + "strconv" "strings" - "time" + "github.com/ether/etherpad-go/lib/apool" + "github.com/ether/etherpad-go/lib/changeset" + "github.com/ether/etherpad-go/lib/models/ws" "github.com/ether/etherpad-go/lib/utils" "github.com/gorilla/websocket" "go.uber.org/zap" ) -// Dummy Pad struct und Methoden für Demo-Zwecke -// In echter Implementierung: WebSocket/HTTP-Client für Etherpad - type Pad struct { - host string - padId string - atext string + host string + padId string + apool *apool.APool + baseRev int + atext *apool.AText + conn *websocket.Conn + events map[string][]func(interface{}) + closeChan chan struct{} + inFlight *PadChangeset + outgoing *PadChangeset +} + +// NewPad erstellt ein neues Pad-Objekt +func NewPad(host, padId string, conn *websocket.Conn) *Pad { + return &Pad{ + host: host, + padId: padId, + conn: conn, + events: make(map[string][]func(interface{})), + closeChan: make(chan struct{}), + } +} + +// On registriert einen Event-Handler +func (p *Pad) On(event string, handler func(interface{})) { + p.events[event] = append(p.events[event], handler) +} + +// emit löst ein Event aus +func (p *Pad) emit(event string, data interface{}) { + for _, handler := range p.events[event] { + go handler(data) + } +} + +// Close schließt die Verbindung +func (p *Pad) Close() { + close(p.closeChan) + if p.conn != nil { + _ = p.conn.Close() // Fehler ignorieren + } + p.emit("disconnect", nil) } +// Append sendet einen Text-Append (Dummy, Changeset-Logik fehlt) +func (p *Pad) Append(text string) { + newChangeset, err := changeset.MakeSplice(p.atext.Text, len(p.atext.Text), 0, text, nil, nil) + if err != nil { + fmt.Printf("Error creating changeset: %v\n", err) + return + } + newRev := p.baseRev + + p.atext, err = changeset.ApplyToAText(newChangeset, *p.atext, *p.apool) + if err != nil { + fmt.Printf("Error applying changeset: %v\n", err) + return + } + tempPool := apool.NewAPool() + wireApool := tempPool.ToJsonable() + + err = p.conn.WriteJSON(ws.UserChange{ + Event: "message", + Data: ws.UserChangeData{ + Component: "pad", + Type: "USER_CHANGES", + Data: ws.UserChangeDataData{ + Apool: struct { + NumToAttrib map[int][]string `json:"numToAttrib"` + NextNum int `json:"nextNum"` + }{NumToAttrib: wireApool.NumToAttribRaw, NextNum: wireApool.NextNum}, + BaseRev: newRev, + Changeset: newChangeset, + }, + }, + }) + +} + +// Dummy Pad struct und Methoden für Demo-Zwecke +// In echter Implementierung: WebSocket/HTTP-Client für Etherpad + type PadState struct { Host string Path string PadId string } -func connect(host string) *Pad { +func connect(host string, logger *zap.SugaredLogger) *Pad { padState := PadState{} @@ -60,26 +139,282 @@ func connect(host string) *Pad { fmt.Printf("Failed to connect to pad at %s%s/p/%s\n", padState.Host, padState.Path, padState.PadId) os.Exit(1) } + defer func() { + _ = resp.Body.Close() // Fehler ignorieren + }() + + wsUrl := fmt.Sprintf("%s/%ssocket.io", strings.Replace(padState.Host, "http", "ws", 1), padState.Path) + fmt.Printf("Connecting to WebSocket at %s\n", wsUrl) + connection, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + if err != nil { + fmt.Printf("WebSocket connection failed: %v\n", err) + os.Exit(1) + } + + var authorToken = "t." + utils.RandomString(20) + + pad := NewPad(padState.Host, padState.PadId, connection) + + go func() { + defer pad.Close() + var ( + newline = []byte{'\n'} + space = []byte{' '} + ) + for { + select { + case <-pad.closeChan: + return + default: + _, message, err := connection.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + logger.Errorf("error: %v", err) + } + pad.emit("disconnect", err) + return + } + message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) + logger.Debugf("Received: %s", message) + + var arr []interface{} + isArrayFormat := false + if err := json.Unmarshal(message, &arr); err == nil && len(arr) == 2 { + isArrayFormat = true + } + + if isArrayFormat { + msgType, _ := arr[0].(string) + if msgType != "message" { + continue + } + msgObj := arr[1] + msgMap, ok := msgObj.(map[string]interface{}) + if !ok { + logger.Errorf("Fehler beim Casten der Nachricht zu map[string]interface{} (Array-Format)") + continue + } + typeStr, _ := msgMap["type"].(string) + switch typeStr { + case "CLIENT_VARS": + data, ok := msgMap["data"].(map[string]interface{}) + if !ok { + logger.Errorf("CLIENT_VARS: Data fehlt oder hat falschen Typ (Array-Format)") + continue + } + collabVars, ok := data["collab_client_vars"].(map[string]interface{}) + if !ok { + logger.Errorf("CLIENT_VARS: collab_client_vars fehlt oder hat falschen Typ (Array-Format)") + continue + } + initText, _ := collabVars["initialAttributedText"].(map[string]interface{}) + atext := apool.AText{ + Text: initText["text"].(string), + Attribs: initText["attribs"].(string), + } + apoolMap, _ := collabVars["apool"].(map[string]interface{}) + pool := apool.NewAPool() + if numToAttrib, ok := apoolMap["numToAttrib"].(map[string]interface{}); ok { + for k, v := range numToAttrib { + idx, err := strconv.Atoi(k) + if err != nil { + continue + } + if arr, ok := v.([]interface{}); ok && len(arr) == 2 { + attr := apool.Attribute{ + Key: arr[0].(string), + Value: arr[1].(string), + } + pool.NumToAttrib[idx] = attr + } + } + } + if nextNum, ok := apoolMap["nextNum"].(float64); ok { + pool.NextNum = int(nextNum) + } + pad.apool = &pool + if rev, ok := collabVars["rev"].(float64); ok { + pad.baseRev = int(rev) + } + pad.atext = &atext + pad.emit("connected", nil) + case "COLLABROOM": + data, ok := msgMap["data"].(map[string]interface{}) + if !ok { + continue + } + if data["type"] == "NEW_CHANGES" { + if newRev, ok := data["newRev"].(float64); ok && int(newRev) <= pad.baseRev { + continue + } + if newRev, ok := data["newRev"].(float64); ok { + if int(newRev)-1 != pad.baseRev { + logger.Errorf("wrong incoming revision :%v/%v", int(newRev), pad.baseRev) + continue + } + } + wireApool := apool.NewAPool() + if apoolMap, ok := data["apool"].(map[string]interface{}); ok { + if numToAttrib, ok := apoolMap["numToAttrib"].(map[string]interface{}); ok { + for k, v := range numToAttrib { + idx, err := strconv.Atoi(k) + if err != nil { + continue + } + if arr, ok := v.([]interface{}); ok && len(arr) == 2 { + attr := apool.Attribute{ + Key: arr[0].(string), + Value: arr[1].(string), + } + wireApool.NumToAttrib[idx] = attr + } + } + } + if nextNum, ok := apoolMap["nextNum"].(float64); ok { + wireApool.NextNum = int(nextNum) + } + } + changesetStr, _ := data["changeset"].(string) + serverChangeset := changeset.MoveOpsToNewPool(changesetStr, &wireApool, pad.apool) + server := &PadChangeset{changeset: serverChangeset} + if pad.inFlight != nil { + transformX(pad.inFlight, server, pad.apool) + } + if pad.outgoing != nil { + transformX(pad.outgoing, server, pad.apool) + if newRev, ok := data["newRev"].(float64); ok { + pad.outgoing.baseRev = int(newRev) + } + } + atext, err := changeset.ApplyToAText(server.changeset, *pad.atext, *pad.apool) + if err != nil { + logger.Errorf("Fehler beim Anwenden des Changesets: %v", err) + continue + } + pad.atext = atext + if newRev, ok := data["newRev"].(float64); ok { + pad.baseRev = int(newRev) + } + pad.emit("newContents", atext) + } + if data["type"] == "ACCEPT_COMMIT" { + if newRev, ok := data["newRev"].(float64); ok && int(newRev) <= pad.baseRev { + continue + } + if newRev, ok := data["newRev"].(float64); ok { + if int(newRev)-1 != pad.baseRev { + logger.Errorf("wrong incoming revision :%v/%v", int(newRev), pad.baseRev) + continue + } + pad.baseRev = int(newRev) + pad.inFlight = nil + if pad.outgoing != nil { + pad.outgoing.baseRev = int(newRev) + } + pad.sendMessage(nil) + } + } + } + } + var obj map[string]interface{} + if err := json.Unmarshal(message, &obj); err == nil { + event, _ := obj["event"].(string) + if event == "message" { + data, ok := obj["data"].(map[string]interface{}) + if ok { + typeStr, _ := data["type"].(string) + if typeStr == "CLIENT_READY" { + // CLIENT_READY-Event: einfach als connected behandeln + pad.emit("connected", nil) + } + } + } + } + } + } + }() + + if err := connection.WriteJSON(ws.ClientReady{ + Event: "message", + Data: ws.ClientReadyData{ + Component: "pad", + Type: "CLIENT_READY", + PadID: padState.PadId, + Token: authorToken, + UserInfo: ws.ClientReadyUserInfo{ + ColorId: nil, + Name: nil, + }, + }, + }); err != nil { + logger.Errorf("Fehler beim Senden von CLIENT_READY: %v", err) + } - websocket.DefaultDialer.Dial() + return pad } -func (p *Pad) OnConnected(callback func(padState *Pad)) { - // Simuliere initialen Verbindungsaufbau - callback(p) +func transformX(client, server *PadChangeset, pool *apool.APool) { + if cs, err := changeset.Follow(server.changeset, client.changeset, false, pool); err == nil && cs != nil { + client.changeset = *cs + } + if cs, err := changeset.Follow(client.changeset, server.changeset, true, pool); err == nil && cs != nil { + server.changeset = *cs + } } -func (p *Pad) OnNewContents(callback func(atext string)) { - // Simuliere neue Inhalte (hier: alle 3 Sekunden Dummy-Update) - for i := 0; i < 3; i++ { - time.Sleep(3 * time.Second) - p.atext = fmt.Sprintf("Demo Pad Inhalt Update %d", i+1) - callback(p.atext) +type PadChangeset struct { + changeset string + baseRev int +} + +// sendMessage-Logik wie im JS +func (p *Pad) sendMessage(optMsg *PadChangeset) { + if optMsg != nil { + if p.outgoing != nil { + if optMsg.baseRev != p.outgoing.baseRev { + return // oder Fehlerbehandlung + } + if cs, err := changeset.Compose(p.outgoing.changeset, optMsg.changeset, p.apool); err == nil && cs != nil { + p.outgoing.changeset = *cs + } + } else { + p.outgoing = optMsg + } } + if p.inFlight == nil && p.outgoing != nil { + p.inFlight = p.outgoing + p.outgoing = nil + // Sende COLLABROOM-Nachricht + msg := map[string]interface{}{ + "type": "COLLABROOM", + "component": "pad", + "data": map[string]interface{}{ + "type": "USER_CHANGES", + "baseRev": p.inFlight.baseRev, + "changeset": p.inFlight.changeset, + "apool": p.apool.ToJsonable(), + }, + } + _ = p.conn.WriteJSON(msg) + } +} + +func (p *Pad) OnConnected(callback func(padState *Pad)) { + p.On("connected", func(data interface{}) { + callback(p) + }) } -func (p *Pad) Append(s string) { - p.atext += s +func (p *Pad) OnNewContents(callback func(atext apool.AText)) { + p.On("newContents", func(data interface{}) { + if atext, ok := data.(*apool.AText); ok { + if atext != nil { + callback(*atext) + } + } else { + println("OnNewContents: invalid data type received") + } + }) } func StartCLI(logger *zap.SugaredLogger) { @@ -103,23 +438,31 @@ func StartCLI(logger *zap.SugaredLogger) { if host != "" { if action == "" { - pad := connect(host) + pad := connect(host, logger) pad.OnConnected(func(padState *Pad) { fmt.Printf("Connected to %s with padId %s\n", padState.host, padState.padId) fmt.Print("\u001b[2J\u001b[0;0H") - fmt.Println("Pad Contents", "\n"+padState.atext) + if padState.atext != nil { + fmt.Println("Pad Contents", "\n"+padState.atext.Text) + } }) - pad.OnNewContents(func(atext string) { + pad.OnNewContents(func(atext apool.AText) { fmt.Print("\u001b[2J\u001b[0;0H") - fmt.Println("Pad Contents", "\n"+atext) + fmt.Println("Pad Contents", "\n"+atext.Text) }) + + done := make(chan struct{}) + pad.On("disconnect", func(_ interface{}) { + close(done) + }) + <-done } if action == "-a" { if str == "" { fmt.Println("No string specified with pad") os.Exit(1) } - pad := connect(host) + pad := connect(host, logger) pad.OnConnected(func(_ *Pad) { pad.Append(str) fmt.Printf("Appended %q to %s\n", str, host) @@ -127,4 +470,6 @@ func StartCLI(logger *zap.SugaredLogger) { }) } } + + logger.Infof("Stopping CLI") } diff --git a/lib/models/ws/clientReady.go b/lib/models/ws/clientReady.go index 4ca57d7..47337bd 100644 --- a/lib/models/ws/clientReady.go +++ b/lib/models/ws/clientReady.go @@ -1,17 +1,21 @@ package ws type ClientReady struct { - Event string `json:"event"` - Data struct { - Component string `json:"component"` - Type string `json:"type"` - PadID string `json:"padId"` - Token string `json:"token"` - UserInfo struct { - ColorId *string `json:"colorId"` - Name *string `json:"name"` - } `json:"userInfo"` - Reconnect *bool `json:"reconnect"` - ClientRev *int `json:"client_rev"` - } `json:"data"` + Event string `json:"event"` + Data ClientReadyData `json:"data"` +} + +type ClientReadyData struct { + Component string `json:"component"` + Type string `json:"type"` + PadID string `json:"padId"` + Token string `json:"token"` + UserInfo ClientReadyUserInfo `json:"userInfo"` + Reconnect *bool `json:"reconnect"` + ClientRev *int `json:"client_rev"` +} + +type ClientReadyUserInfo struct { + ColorId *string `json:"colorId"` + Name *string `json:"name"` } diff --git a/lib/models/ws/userChange.go b/lib/models/ws/userChange.go index 95c1bb9..d9ddd85 100644 --- a/lib/models/ws/userChange.go +++ b/lib/models/ws/userChange.go @@ -1,17 +1,21 @@ package ws type UserChange struct { - Event string `json:"event"` - Data struct { - Component string `json:"component"` - Data struct { - Apool struct { - NumToAttrib map[int][]string `json:"numToAttrib"` - NextNum int `json:"nextNum"` - } `json:"apool"` - BaseRev int `json:"baseRev"` - Changeset string `json:"changeset"` - } `json:"data"` - Type string `json:"type"` - } `json:"data"` + Event string `json:"event"` + Data UserChangeData `json:"data"` +} + +type UserChangeData struct { + Component string `json:"component"` + Data UserChangeDataData `json:"data"` + Type string `json:"type"` +} + +type UserChangeDataData struct { + Apool struct { + NumToAttrib map[int][]string `json:"numToAttrib"` + NextNum int `json:"nextNum"` + } `json:"apool"` + BaseRev int `json:"baseRev"` + Changeset string `json:"changeset"` } diff --git a/lib/utils/logger.go b/lib/utils/logger.go index d8f1376..99b0762 100644 --- a/lib/utils/logger.go +++ b/lib/utils/logger.go @@ -1,10 +1,14 @@ package utils -import "go.uber.org/zap" +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) func SetupLogger() *zap.SugaredLogger { - logger, _ := zap.NewProduction() - logger = zap.Must(zap.NewDevelopment()) + cfg := zap.NewDevelopmentConfig() + cfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel) + logger := zap.Must(cfg.Build()) sugar := logger.Sugar() return sugar From 6582c172bd35ccdce0647f2b7056671790d343c1 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 17 Jan 2026 12:11:04 +0100 Subject: [PATCH 03/11] chore: started with loadtest --- lib/cli/cli.go | 35 +++- lib/loadtest/app.go | 251 ++++++++++++++++++++++++ lib/test/ws/pad_message_handler_test.go | 13 +- main.go | 51 +++++ 4 files changed, 331 insertions(+), 19 deletions(-) create mode 100644 lib/loadtest/app.go diff --git a/lib/cli/cli.go b/lib/cli/cli.go index e121476..a5d049d 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -31,7 +31,6 @@ type Pad struct { outgoing *PadChangeset } -// NewPad erstellt ein neues Pad-Objekt func NewPad(host, padId string, conn *websocket.Conn) *Pad { return &Pad{ host: host, @@ -42,19 +41,16 @@ func NewPad(host, padId string, conn *websocket.Conn) *Pad { } } -// On registriert einen Event-Handler func (p *Pad) On(event string, handler func(interface{})) { p.events[event] = append(p.events[event], handler) } -// emit löst ein Event aus func (p *Pad) emit(event string, data interface{}) { for _, handler := range p.events[event] { go handler(data) } } -// Close schließt die Verbindung func (p *Pad) Close() { close(p.closeChan) if p.conn != nil { @@ -98,15 +94,16 @@ func (p *Pad) Append(text string) { } -// Dummy Pad struct und Methoden für Demo-Zwecke -// In echter Implementierung: WebSocket/HTTP-Client für Etherpad - type PadState struct { Host string Path string PadId string } +func Connect(host string, logger *zap.SugaredLogger) *Pad { + return connect(host, logger) +} + func connect(host string, logger *zap.SugaredLogger) *Pad { padState := PadState{} @@ -212,6 +209,7 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { Text: initText["text"].(string), Attribs: initText["attribs"].(string), } + pad.emit("numConnectedUsers", collabVars["numConnectedUsers"]) apoolMap, _ := collabVars["apool"].(map[string]interface{}) pool := apool.NewAPool() if numToAttrib, ok := apoolMap["numToAttrib"].(map[string]interface{}); ok { @@ -327,6 +325,7 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { // CLIENT_READY-Event: einfach als connected behandeln pad.emit("connected", nil) } + pad.emit("message", data) } } } @@ -405,6 +404,28 @@ func (p *Pad) OnConnected(callback func(padState *Pad)) { }) } +func (p *Pad) OnNumConnectedUsers(callback func(count int)) { + p.On("numConnectedUsers", func(data interface{}) { + if count, ok := data.(float64); ok { + callback(int(count)) + } + }) +} + +func (p *Pad) OnDisconnect(callback func(err interface{})) { + p.On("disconnect", func(data interface{}) { + callback(data) + }) +} + +func (p *Pad) OnMessage(callback func(msg map[string]interface{})) { + p.On("message", func(data interface{}) { + if msg, ok := data.(map[string]interface{}); ok { + callback(msg) + } + }) +} + func (p *Pad) OnNewContents(callback func(atext apool.AText)) { p.On("newContents", func(data interface{}) { if atext, ok := data.(*apool.AText); ok { diff --git a/lib/loadtest/app.go b/lib/loadtest/app.go new file mode 100644 index 0000000..4e1f086 --- /dev/null +++ b/lib/loadtest/app.go @@ -0,0 +1,251 @@ +package loadtest + +import ( + "fmt" + "math/rand" + "net/url" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/ether/etherpad-go/lib/apool" + "github.com/ether/etherpad-go/lib/cli" + "go.uber.org/zap" +) + +type Metrics struct { + ClientsConnected int64 + AuthorsConnected int64 + LurkersConnected int64 + AppendSent int64 + ErrorCount int64 + AcceptedCommit int64 + ChangeFromServer int64 + NumConnectedUsers int64 // From server + StartTime time.Time +} + +var stats Metrics +var maxPS float64 +var statsLock sync.Mutex + +func randomString() string { + const stringLength = 4 + var b strings.Builder + for i := 0; i < stringLength; i++ { + // JS: Math.random() * (300 - 1) + 1 + charNumber := rand.Intn(299) + 1 + b.WriteRune(rune(charNumber)) + } + return b.String() +} + +func randomPadName() string { + const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + const strLen = 10 + var b strings.Builder + for i := 0; i < strLen; i++ { + b.WriteByte(chars[rand.Intn(len(chars))]) + } + return b.String() +} + +func updateMetricsUI(host string) { + if os.Getenv("SILENT_METRICS") == "true" { + return + } + statsLock.Lock() + defer statsLock.Unlock() + + testDuration := time.Since(stats.StartTime) + + // Clear screen and move cursor to top-left + fmt.Print("\033[2J\033[0;0H") + fmt.Printf("Load Test Metrics -- Target Pad %s\n\n", host) + + if atomic.LoadInt64(&stats.NumConnectedUsers) > 0 { + fmt.Printf("Total Clients Connected: %d\n", atomic.LoadInt64(&stats.NumConnectedUsers)) + } + fmt.Printf("Local Clients Connected: %d\n", atomic.LoadInt64(&stats.ClientsConnected)) + fmt.Printf("Authors Connected: %d\n", atomic.LoadInt64(&stats.AuthorsConnected)) + fmt.Printf("Lurkers Connected: %d\n", atomic.LoadInt64(&stats.LurkersConnected)) + fmt.Printf("Sent Append messages: %d\n", atomic.LoadInt64(&stats.AppendSent)) + fmt.Printf("Errors: %d\n", atomic.LoadInt64(&stats.ErrorCount)) + fmt.Printf("Commits accepted by server: %d\n", atomic.LoadInt64(&stats.AcceptedCommit)) + + changesFromServer := atomic.LoadInt64(&stats.ChangeFromServer) + fmt.Printf("Commits sent from Server to Client: %d\n", changesFromServer) + + durationSec := testDuration.Seconds() + if durationSec > 0 { + currentRate := float64(changesFromServer) / durationSec // This is mean rate actually in this simple impl + fmt.Printf("Current rate per second of Commits sent from Server to Client: %.0f\n", currentRate) + fmt.Printf("Mean(per second) of # of Commits sent from Server to Client: %.0f\n", currentRate) + + if currentRate > maxPS { + maxPS = currentRate + } + fmt.Printf("Max(per second) of # of Messages (SocketIO has cap of 10k): %.0f\n", maxPS) + } + + diff := atomic.LoadInt64(&stats.AppendSent) - atomic.LoadInt64(&stats.AcceptedCommit) + if diff > 5 { + fmt.Printf("Number of commits not yet replied as ACCEPT_COMMIT from server: %d\n", diff) + } + + fmt.Printf("Seconds test has been running for: %d\n", int(durationSec)) +} + +func newAuthor(host string, logger *zap.SugaredLogger) { + pad := cli.Connect(host, logger) + + pad.OnDisconnect(func(err interface{}) { + fmt.Printf("connection error connecting to pad: %v\n", err) + os.Exit(1) + }) + + pad.OnConnected(func(p *cli.Pad) { + atomic.AddInt64(&stats.ClientsConnected, 1) + atomic.AddInt64(&stats.AuthorsConnected, 1) + updateMetricsUI(host) + + ticker := time.NewTicker(400 * time.Millisecond) + go func() { + for range ticker.C { + atomic.AddInt64(&stats.AppendSent, 1) + updateMetricsUI(host) + p.Append(randomString()) + } + }() + }) + + pad.OnNumConnectedUsers(func(count int) { + atomic.StoreInt64(&stats.NumConnectedUsers, int64(count)) + updateMetricsUI(host) + }) + + pad.OnMessage(func(msg map[string]interface{}) { + if msg["type"] == "COLLABROOM" { + if data, ok := msg["data"].(map[string]interface{}); ok { + if data["type"] == "ACCEPT_COMMIT" { + atomic.AddInt64(&stats.AcceptedCommit, 1) + } + } + } + }) + + pad.OnNewContents(func(atext apool.AText) { + atomic.AddInt64(&stats.ChangeFromServer, 1) + }) +} + +func newLurker(host string, logger *zap.SugaredLogger) { + pad := cli.Connect(host, logger) + + pad.OnDisconnect(func(err interface{}) { + fmt.Printf("connection error connecting to pad: %v\n", err) + os.Exit(1) + }) + + pad.OnConnected(func(p *cli.Pad) { + atomic.AddInt64(&stats.ClientsConnected, 1) + atomic.AddInt64(&stats.LurkersConnected, 1) + updateMetricsUI(host) + }) + + pad.OnNumConnectedUsers(func(count int) { + atomic.StoreInt64(&stats.NumConnectedUsers, int64(count)) + updateMetricsUI(host) + }) + + pad.OnNewContents(func(atext apool.AText) { + atomic.AddInt64(&stats.ChangeFromServer, 1) + }) +} + +func StartLoadTest(logger *zap.SugaredLogger, host string, numAuthors, numLurkers int, duration int, loadUntilFail bool) { + stats.StartTime = time.Now() + + if host == "" { + host = "http://127.0.0.1:9001" + } + + if !strings.Contains(host, "/p/") { + host = fmt.Sprintf("%s/p/%s", strings.TrimSuffix(host, "/"), randomPadName()) + } else { + // Ensure it's a valid URL + _, err := url.Parse(host) + if err != nil { + fmt.Printf("Invalid host: %v\n", err) + os.Exit(1) + } + } + + var endTime time.Time + if duration > 0 { + endTime = stats.StartTime.Add(time.Duration(duration) * time.Second) + } + + if numAuthors > 0 || numLurkers > 0 { + var users []string + for i := 0; i < numLurkers; i++ { + users = append(users, "l") + } + for i := 0; i < numAuthors; i++ { + users = append(users, "a") + } + + go func() { + for _, t := range users { + if t == "l" { + newLurker(host, logger) + } else { + newAuthor(host, logger) + } + time.Sleep(200 * time.Millisecond / time.Duration(len(users))) + } + }() + } else { + if duration > 0 { + fmt.Printf("Creating load for %d seconds\n", duration) + } else { + fmt.Println("Creating load until the pad server stops responding in a timely fashion") + } + + go func() { + // Loads at ratio of 3(lurkers):1(author), every 1 second it adds more. + users := []string{"a", "l", "l", "l"} + ticker := time.NewTicker(1 * time.Second) + for range ticker.C { + for _, t := range users { + if t == "l" { + newLurker(host, logger) + } else { + newAuthor(host, logger) + } + time.Sleep(200 * time.Millisecond / time.Duration(len(users))) + } + } + }() + } + + ticker := time.NewTicker(100 * time.Millisecond) + for range ticker.C { + if !endTime.IsZero() && time.Now().After(endTime) { + fmt.Println("Test duration complete and Load Tests PASS") + // Print final stats + fmt.Printf("%+v\n", stats) + os.Exit(0) + } + + if loadUntilFail { + diff := atomic.LoadInt64(&stats.AppendSent) - atomic.LoadInt64(&stats.AcceptedCommit) + if diff > 100 { + fmt.Printf("Load test failed: too many pending commits (%d)\n", diff) + os.Exit(1) + } + } + } +} diff --git a/lib/test/ws/pad_message_handler_test.go b/lib/test/ws/pad_message_handler_test.go index 509e0b3..905ad99 100644 --- a/lib/test/ws/pad_message_handler_test.go +++ b/lib/test/ws/pad_message_handler_test.go @@ -1620,18 +1620,7 @@ func testHandleMessageUserChangeReadonly(t *testing.T, ds testutils.TestDataStor // Create USER_CHANGES message userChange := ws.UserChange{ Event: "message", - Data: struct { - Component string `json:"component"` - Data struct { - Apool struct { - NumToAttrib map[int][]string `json:"numToAttrib"` - NextNum int `json:"nextNum"` - } `json:"apool"` - BaseRev int `json:"baseRev"` - Changeset string `json:"changeset"` - } `json:"data"` - Type string `json:"type"` - }{ + Data: ws.UserChangeData{ Component: "pad", Type: "USER_CHANGES", Data: struct { diff --git a/main.go b/main.go index a7e7dbb..8f18d64 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "github.com/ether/etherpad-go/lib/author" "github.com/ether/etherpad-go/lib/cli" "github.com/ether/etherpad-go/lib/hooks" + "github.com/ether/etherpad-go/lib/loadtest" "github.com/ether/etherpad-go/lib/pad" "github.com/ether/etherpad-go/lib/plugins" session2 "github.com/ether/etherpad-go/lib/session" @@ -46,6 +47,56 @@ func main() { cli.StartCLI(setupLogger) return } + if len(os.Args) > 1 && os.Args[1] == "loadtest" { + host := "" + if len(os.Args) > 2 { + host = os.Args[2] + } + // Basic parsing of arguments for loadtest + // In a real app we would use a flag package, but for now let's keep it simple + authors := 0 + lurkers := 0 + duration := 0 + untilFail := false + + for i := 2; i < len(os.Args); i++ { + arg := os.Args[i] + if arg == "-a" || arg == "--authors" { + if i+1 < len(os.Args) { + fmt.Sscanf(os.Args[i+1], "%d", &authors) + i++ + } + } else if arg == "-l" || arg == "--lurkers" { + if i+1 < len(os.Args) { + fmt.Sscanf(os.Args[i+1], "%d", &lurkers) + i++ + } + } else if arg == "-d" || arg == "--duration" { + if i+1 < len(os.Args) { + fmt.Sscanf(os.Args[i+1], "%d", &duration) + i++ + } + } else if arg == "--loadUntilFail" { + untilFail = true + } + } + + loadtest.StartLoadTest(setupLogger, host, authors, lurkers, duration, untilFail) + return + } + if len(os.Args) > 1 && os.Args[1] == "multiload" { + host := "" + if len(os.Args) > 2 { + host = os.Args[2] + } + maxPads := 10 + if len(os.Args) > 3 { + fmt.Sscanf(os.Args[3], "%d", &maxPads) + } + + loadtest.StartMultiLoadTest(setupLogger, host, maxPads) + return + } settings2.InitSettings(setupLogger) From a9f994ed4cbb6c0421d184e341b37e94ea479ea9 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 17 Jan 2026 14:18:36 +0100 Subject: [PATCH 04/11] chore:added multi --- lib/loadtest/multi.go | 59 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 lib/loadtest/multi.go diff --git a/lib/loadtest/multi.go b/lib/loadtest/multi.go new file mode 100644 index 0000000..76786fe --- /dev/null +++ b/lib/loadtest/multi.go @@ -0,0 +1,59 @@ +package loadtest + +import ( + "fmt" + "os" + "os/exec" + "sync" + "time" + + "go.uber.org/zap" +) + +func StartMultiLoadTest(logger *zap.SugaredLogger, host string, maxPads int) { + if maxPads <= 0 { + maxPads = 10 + } + + fmt.Printf("Starting multi-pad load test: %d pads for 30 seconds each\n", maxPads) + + executable, err := os.Executable() + if err != nil { + logger.Errorf("Failed to get executable path: %v", err) + os.Exit(1) + } + + var wg sync.WaitGroup + messageCount := 0 // Simplified as in JS + + for i := 0; i < maxPads; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + // Equivalent to: node app.js -a 3 -d 30 + // We use the same binary with 'loadtest' subcommand + cmd := exec.Command(executable, "loadtest", host, "-a", "3", "-d", "30") + cmd.Env = append(os.Environ(), "SILENT_METRICS=true") + + // In JS it uses fork, here we use exec. + // We don't necessarily want all of them clearing the screen, + // but the JS version would have them all writing to the same terminal too. + // To keep it simple and similar to JS, we just run them. + + output, err := cmd.CombinedOutput() + if err != nil { + fmt.Printf("Child process %d exited with error: %v\n", id, err) + fmt.Printf("Output: %s\n", string(output)) + fmt.Println("total pads made:", id) // Approximation + fmt.Println("total messages", messageCount) + os.Exit(1) + } + }(i) + + // Small delay between starts to not overwhelm everything at once + time.Sleep(100 * time.Millisecond) + } + + wg.Wait() + fmt.Println("Multi-pad load test completed successfully") +} From fce60086d506e74285247b7cd9cac3bcef810f7c Mon Sep 17 00:00:00 2001 From: SamTV1998 Date: Sat, 17 Jan 2026 15:00:15 +0100 Subject: [PATCH 05/11] fix: added help texts for loadtest and cli --- lib/cli/cli.go | 96 ++++++++++++++++++++++--------------------- lib/loadtest/app.go | 35 ++++++++++++++++ lib/loadtest/multi.go | 12 +----- main.go | 70 +++++++++---------------------- 4 files changed, 105 insertions(+), 108 deletions(-) diff --git a/lib/cli/cli.go b/lib/cli/cli.go index a5d049d..d330daa 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -10,6 +10,8 @@ import ( "strconv" "strings" + "flag" + "github.com/ether/etherpad-go/lib/apool" "github.com/ether/etherpad-go/lib/changeset" "github.com/ether/etherpad-go/lib/models/ws" @@ -438,59 +440,59 @@ func (p *Pad) OnNewContents(callback func(atext apool.AText)) { }) } -func StartCLI(logger *zap.SugaredLogger) { - args := os.Args - if len(args) < 3 { - fmt.Println("No host specified..") - fmt.Println("Stream Pad to CLI: etherpad http://127.0.0.1:9001/p/test") - fmt.Println("Append contents to pad: etherpad http://127.0.0.1:9001/p/test -a 'hello world'") - os.Exit(1) - } +func RunFromCLI(logger *zap.SugaredLogger, args []string) { + fs := flag.NewFlagSet("cli", flag.ExitOnError) + host := fs.String("host", "", "The host of the pad (e.g. http://127.0.0.1:9001/p/test)") + appendStr := fs.String("append", "", "Append contents to pad") + fs.StringVar(appendStr, "a", "", "Append contents to pad (shorthand)") - host := args[2] - action := "" - if len(args) > 3 { - action = args[3] + // Compatibility for old positional host argument + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + *host = args[0] + args = args[1:] } - str := "" - if len(args) > 4 { - str = args[4] + + fs.Parse(args) + + if *host == "" { + fmt.Println("No host specified..") + fs.Usage() + os.Exit(1) } - if host != "" { - if action == "" { - pad := connect(host, logger) - pad.OnConnected(func(padState *Pad) { - fmt.Printf("Connected to %s with padId %s\n", padState.host, padState.padId) - fmt.Print("\u001b[2J\u001b[0;0H") - if padState.atext != nil { - fmt.Println("Pad Contents", "\n"+padState.atext.Text) - } - }) - pad.OnNewContents(func(atext apool.AText) { - fmt.Print("\u001b[2J\u001b[0;0H") - fmt.Println("Pad Contents", "\n"+atext.Text) - }) - - done := make(chan struct{}) - pad.On("disconnect", func(_ interface{}) { - close(done) - }) - <-done - } - if action == "-a" { - if str == "" { - fmt.Println("No string specified with pad") - os.Exit(1) + if *appendStr != "" { + pad := connect(*host, logger) + pad.OnConnected(func(_ *Pad) { + pad.Append(*appendStr) + fmt.Printf("Appended %q to %s\n", *appendStr, *host) + os.Exit(0) + }) + // Block to wait for connection/append + select {} + } else { + pad := connect(*host, logger) + pad.OnConnected(func(padState *Pad) { + fmt.Printf("Connected to %s with padId %s\n", padState.host, padState.padId) + fmt.Print("\u001b[2J\u001b[0;0H") + if padState.atext != nil { + fmt.Println("Pad Contents", "\n"+padState.atext.Text) } - pad := connect(host, logger) - pad.OnConnected(func(_ *Pad) { - pad.Append(str) - fmt.Printf("Appended %q to %s\n", str, host) - os.Exit(0) - }) - } + }) + pad.OnNewContents(func(atext apool.AText) { + fmt.Print("\u001b[2J\u001b[0;0H") + fmt.Println("Pad Contents", "\n"+atext.Text) + }) + + done := make(chan struct{}) + pad.On("disconnect", func(_ interface{}) { + close(done) + }) + <-done } logger.Infof("Stopping CLI") } + +func StartCLI(logger *zap.SugaredLogger) { + RunFromCLI(logger, os.Args[2:]) +} diff --git a/lib/loadtest/app.go b/lib/loadtest/app.go index 4e1f086..a932c97 100644 --- a/lib/loadtest/app.go +++ b/lib/loadtest/app.go @@ -10,11 +10,46 @@ import ( "sync/atomic" "time" + "flag" + "github.com/ether/etherpad-go/lib/apool" "github.com/ether/etherpad-go/lib/cli" "go.uber.org/zap" ) +func RunFromCLI(logger *zap.SugaredLogger, args []string) { + fs := flag.NewFlagSet("loadtest", flag.ExitOnError) + host := fs.String("host", "http://127.0.0.1:9001", "The host to test") + authors := fs.Int("authors", 0, "Number of authors") + lurkers := fs.Int("lurkers", 0, "Number of lurkers") + duration := fs.Int("duration", 0, "Duration of the test in seconds") + untilFail := fs.Bool("loadUntilFail", false, "Load until the server fails") + + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + *host = args[0] + args = args[1:] + } + + fs.Parse(args) + + StartLoadTest(logger, *host, *authors, *lurkers, *duration, *untilFail) +} + +func RunMultiFromCLI(logger *zap.SugaredLogger, args []string) { + fs := flag.NewFlagSet("multiload", flag.ExitOnError) + host := fs.String("host", "http://127.0.0.1:9001", "The host to test") + maxPads := fs.Int("maxPads", 10, "Maximum number of pads") + + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + *host = args[0] + args = args[1:] + } + + fs.Parse(args) + + StartMultiLoadTest(logger, *host, *maxPads) +} + type Metrics struct { ClientsConnected int64 AuthorsConnected int64 diff --git a/lib/loadtest/multi.go b/lib/loadtest/multi.go index 76786fe..daea556 100644 --- a/lib/loadtest/multi.go +++ b/lib/loadtest/multi.go @@ -24,22 +24,15 @@ func StartMultiLoadTest(logger *zap.SugaredLogger, host string, maxPads int) { } var wg sync.WaitGroup - messageCount := 0 // Simplified as in JS + messageCount := 0 for i := 0; i < maxPads; i++ { wg.Add(1) go func(id int) { defer wg.Done() - // Equivalent to: node app.js -a 3 -d 30 - // We use the same binary with 'loadtest' subcommand - cmd := exec.Command(executable, "loadtest", host, "-a", "3", "-d", "30") + cmd := exec.Command(executable, "loadtest", "-host", host, "-authors", "3", "-duration", "30") cmd.Env = append(os.Environ(), "SILENT_METRICS=true") - // In JS it uses fork, here we use exec. - // We don't necessarily want all of them clearing the screen, - // but the JS version would have them all writing to the same terminal too. - // To keep it simple and similar to JS, we just run them. - output, err := cmd.CombinedOutput() if err != nil { fmt.Printf("Child process %d exited with error: %v\n", id, err) @@ -50,7 +43,6 @@ func StartMultiLoadTest(logger *zap.SugaredLogger, host string, maxPads int) { } }(i) - // Small delay between starts to not overwhelm everything at once time.Sleep(100 * time.Millisecond) } diff --git a/main.go b/main.go index 8f18d64..f27785a 100644 --- a/main.go +++ b/main.go @@ -43,59 +43,27 @@ var uiAssets embed.FS func main() { setupLogger := utils.SetupLogger() defer setupLogger.Sync() - if len(os.Args) > 1 && os.Args[1] == "cli" { - cli.StartCLI(setupLogger) - return - } - if len(os.Args) > 1 && os.Args[1] == "loadtest" { - host := "" - if len(os.Args) > 2 { - host = os.Args[2] - } - // Basic parsing of arguments for loadtest - // In a real app we would use a flag package, but for now let's keep it simple - authors := 0 - lurkers := 0 - duration := 0 - untilFail := false - - for i := 2; i < len(os.Args); i++ { - arg := os.Args[i] - if arg == "-a" || arg == "--authors" { - if i+1 < len(os.Args) { - fmt.Sscanf(os.Args[i+1], "%d", &authors) - i++ - } - } else if arg == "-l" || arg == "--lurkers" { - if i+1 < len(os.Args) { - fmt.Sscanf(os.Args[i+1], "%d", &lurkers) - i++ - } - } else if arg == "-d" || arg == "--duration" { - if i+1 < len(os.Args) { - fmt.Sscanf(os.Args[i+1], "%d", &duration) - i++ - } - } else if arg == "--loadUntilFail" { - untilFail = true - } - } - loadtest.StartLoadTest(setupLogger, host, authors, lurkers, duration, untilFail) - return - } - if len(os.Args) > 1 && os.Args[1] == "multiload" { - host := "" - if len(os.Args) > 2 { - host = os.Args[2] - } - maxPads := 10 - if len(os.Args) > 3 { - fmt.Sscanf(os.Args[3], "%d", &maxPads) + if len(os.Args) > 1 { + switch os.Args[1] { + case "cli": + cli.RunFromCLI(setupLogger, os.Args[2:]) + return + case "loadtest": + loadtest.RunFromCLI(setupLogger, os.Args[2:]) + return + case "multiload": + loadtest.RunMultiFromCLI(setupLogger, os.Args[2:]) + return + case "-h", "--help", "help": + fmt.Println("Usage: etherpad [command] [options]") + fmt.Println("Commands:") + fmt.Println(" cli Interactive CLI for pads") + fmt.Println(" loadtest Run a load test on a single pad") + fmt.Println(" multiload Run a multi-pad load test") + fmt.Println(" (none) Start the Etherpad server") + return } - - loadtest.StartMultiLoadTest(setupLogger, host, maxPads) - return } settings2.InitSettings(setupLogger) From 80d9c78317e5c8e97d5664b6322f614cd2ae1102 Mon Sep 17 00:00:00 2001 From: SamTV1998 Date: Sat, 17 Jan 2026 15:27:39 +0100 Subject: [PATCH 06/11] fix: experimental stuff --- go.mod | 3 + go.sum | 6 ++ lib/cli/cli.go | 105 +++++++++++++++++------- lib/cli/cli_test.go | 55 +++++++++++++ lib/loadtest/app.go | 36 +++++++-- lib/loadtest/app_test.go | 120 ++++++++++++++++++++++++++++ lib/locales/locales.go | 11 ++- lib/test/integration_test.go | 150 +++++++++++++++++++++++++++++++++++ lib/ws/client.go | 39 +++++++++ 9 files changed, 485 insertions(+), 40 deletions(-) create mode 100644 lib/cli/cli_test.go create mode 100644 lib/loadtest/app_test.go create mode 100644 lib/test/integration_test.go diff --git a/go.mod b/go.mod index 6f1edb9..8f3d416 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.4 // indirect + github.com/fasthttp/websocket v1.5.3 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.12 // indirect @@ -71,6 +72,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/gobuffalo/pop/v6 v6.1.1 // indirect + github.com/gofiber/websocket/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -116,6 +118,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/seatgeek/logrus-gelf-formatter v0.0.0-20210414080842-5b05eb8ff761 // indirect github.com/shirou/gopsutil/v4 v4.25.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/go.sum b/go.sum index 246a9d6..1472026 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0o github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/evanw/esbuild v0.27.2 h1:3xBEws9y/JosfewXMM2qIyHAi+xRo8hVx475hVkJfNg= github.com/evanw/esbuild v0.27.2/go.mod h1:D2vIQZqV/vIf/VRHtViaUtViZmG7o+kKmlBfVQuRi48= +github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= +github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= @@ -138,6 +140,8 @@ github.com/gofiber/fiber/v2 v2.52.10 h1:jRHROi2BuNti6NYXmZ6gbNSfT3zj/8c0xy94GOU5 github.com/gofiber/fiber/v2 v2.52.10/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/gofiber/swagger v1.1.1 h1:FZVhVQQ9s1ZKLHL/O0loLh49bYB5l1HEAgxDlcTtkRA= github.com/gofiber/swagger v1.1.1/go.mod h1:vtvY/sQAMc/lGTUCg0lqmBL7Ht9O7uzChpbvJeJQINw= +github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= +github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v4.3.1+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -364,6 +368,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/seatgeek/logrus-gelf-formatter v0.0.0-20210414080842-5b05eb8ff761 h1:0b8DF5kR0PhRoRXDiEEdzrgBc8UqVY4JWLkQJCRsLME= github.com/seatgeek/logrus-gelf-formatter v0.0.0-20210414080842-5b05eb8ff761/go.mod h1:/THDZYi7F/BsVEcYzYPqdcWFQ+1C2InkawTKfLOAnzg= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= diff --git a/lib/cli/cli.go b/lib/cli/cli.go index d330daa..f96e10c 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -9,8 +9,11 @@ import ( "os" "strconv" "strings" + "sync" "flag" + "io" + "time" "github.com/ether/etherpad-go/lib/apool" "github.com/ether/etherpad-go/lib/changeset" @@ -29,6 +32,7 @@ type Pad struct { conn *websocket.Conn events map[string][]func(interface{}) closeChan chan struct{} + closeOnce sync.Once inFlight *PadChangeset outgoing *PadChangeset } @@ -54,11 +58,13 @@ func (p *Pad) emit(event string, data interface{}) { } func (p *Pad) Close() { - close(p.closeChan) - if p.conn != nil { - _ = p.conn.Close() // Fehler ignorieren - } - p.emit("disconnect", nil) + p.closeOnce.Do(func() { + close(p.closeChan) + if p.conn != nil { + _ = p.conn.Close() // Fehler ignorieren + } + p.emit("disconnect", nil) + }) } // Append sendet einen Text-Append (Dummy, Changeset-Logik fehlt) @@ -133,9 +139,16 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { } httpClient := &http.Client{} - resp, err := httpClient.Get(fmt.Sprintf("%s%s/p/%s", padState.Host, padState.Path, padState.PadId)) - if err != nil || resp.StatusCode != http.StatusOK { - fmt.Printf("Failed to connect to pad at %s%s/p/%s\n", padState.Host, padState.Path, padState.PadId) + fullUrl := fmt.Sprintf("%s%s/p/%s", padState.Host, padState.Path, padState.PadId) + fmt.Printf("Getting Pad at %s\n", fullUrl) + resp, err := httpClient.Get(fullUrl) + if err != nil { + fmt.Printf("Failed to connect to pad at %s: %v\n", fullUrl, err) + os.Exit(1) + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + fmt.Printf("Failed to connect to pad at %s, status: %s, body: %s\n", fullUrl, resp.Status, string(body)) os.Exit(1) } defer func() { @@ -144,9 +157,12 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { wsUrl := fmt.Sprintf("%s/%ssocket.io", strings.Replace(padState.Host, "http", "ws", 1), padState.Path) fmt.Printf("Connecting to WebSocket at %s\n", wsUrl) - connection, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + connection, resp, err := websocket.DefaultDialer.Dial(wsUrl, nil) if err != nil { fmt.Printf("WebSocket connection failed: %v\n", err) + if resp != nil { + fmt.Printf("Response Status: %s\n", resp.Status) + } os.Exit(1) } @@ -441,36 +457,49 @@ func (p *Pad) OnNewContents(callback func(atext apool.AText)) { } func RunFromCLI(logger *zap.SugaredLogger, args []string) { - fs := flag.NewFlagSet("cli", flag.ExitOnError) - host := fs.String("host", "", "The host of the pad (e.g. http://127.0.0.1:9001/p/test)") - appendStr := fs.String("append", "", "Append contents to pad") - fs.StringVar(appendStr, "a", "", "Append contents to pad (shorthand)") - - // Compatibility for old positional host argument - if len(args) > 0 && !strings.HasPrefix(args[0], "-") { - *host = args[0] - args = args[1:] + host, appendStr, err := parseCLIArgs(args) + if err != nil { + return } - fs.Parse(args) - - if *host == "" { + if host == "" { fmt.Println("No host specified..") - fs.Usage() - os.Exit(1) + return } - if *appendStr != "" { - pad := connect(*host, logger) + if appendStr != "" { + pad := connect(host, logger) pad.OnConnected(func(_ *Pad) { - pad.Append(*appendStr) - fmt.Printf("Appended %q to %s\n", *appendStr, *host) - os.Exit(0) + fmt.Println("CLI Connected, appending...") + pad.Append(appendStr) + fmt.Printf("Appended %q to %s\n", appendStr, host) + // Don't os.Exit(0) in tests, use a channel to signal completion + if os.Getenv("GO_TEST_MODE") == "true" { + pad.emit("append_done", nil) + } else { + os.Exit(0) + } }) // Block to wait for connection/append - select {} + if os.Getenv("GO_TEST_MODE") == "true" { + done := make(chan struct{}) + pad.On("append_done", func(_ interface{}) { + close(done) + }) + select { + case <-done: + pad.Close() + return + case <-time.After(10 * time.Second): + fmt.Println("Append timeout") + pad.Close() + return + } + } else { + select {} + } } else { - pad := connect(*host, logger) + pad := connect(host, logger) pad.OnConnected(func(padState *Pad) { fmt.Printf("Connected to %s with padId %s\n", padState.host, padState.padId) fmt.Print("\u001b[2J\u001b[0;0H") @@ -493,6 +522,22 @@ func RunFromCLI(logger *zap.SugaredLogger, args []string) { logger.Infof("Stopping CLI") } +func parseCLIArgs(args []string) (string, string, error) { + fs := flag.NewFlagSet("cli", flag.ContinueOnError) + host := fs.String("host", "", "The host of the pad (e.g. http://127.0.0.1:9001/p/test)") + appendStr := fs.String("append", "", "Append contents to pad") + fs.StringVar(appendStr, "a", "", "Append contents to pad (shorthand)") + + // Compatibility for old positional host argument + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + *host = args[0] + args = args[1:] + } + + err := fs.Parse(args) + return *host, *appendStr, err +} + func StartCLI(logger *zap.SugaredLogger) { RunFromCLI(logger, os.Args[2:]) } diff --git a/lib/cli/cli_test.go b/lib/cli/cli_test.go new file mode 100644 index 0000000..05adaf3 --- /dev/null +++ b/lib/cli/cli_test.go @@ -0,0 +1,55 @@ +package cli + +import ( + "testing" +) + +func TestParseCLIArgs(t *testing.T) { + tests := []struct { + name string + args []string + wantHost string + wantAppend string + }{ + { + name: "no arguments", + args: []string{}, + wantHost: "", + wantAppend: "", + }, + { + name: "positional host", + args: []string{"http://test.com"}, + wantHost: "http://test.com", + wantAppend: "", + }, + { + name: "explicit flags", + args: []string{"-host", "http://test.com", "-append", "hello"}, + wantHost: "http://test.com", + wantAppend: "hello", + }, + { + name: "shorthand append", + args: []string{"http://test.com", "-a", "world"}, + wantHost: "http://test.com", + wantAppend: "world", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + host, appendStr, err := parseCLIArgs(tt.args) + if err != nil { + t.Errorf("parseCLIArgs() error = %v", err) + return + } + if host != tt.wantHost { + t.Errorf("host = %v, want %v", host, tt.wantHost) + } + if appendStr != tt.wantAppend { + t.Errorf("appendStr = %v, want %v", appendStr, tt.wantAppend) + } + }) + } +} diff --git a/lib/loadtest/app.go b/lib/loadtest/app.go index a932c97..ac05cb8 100644 --- a/lib/loadtest/app.go +++ b/lib/loadtest/app.go @@ -18,7 +18,15 @@ import ( ) func RunFromCLI(logger *zap.SugaredLogger, args []string) { - fs := flag.NewFlagSet("loadtest", flag.ExitOnError) + host, authors, lurkers, duration, untilFail, err := parseRunArgs(args) + if err != nil { + return + } + StartLoadTest(logger, host, authors, lurkers, duration, untilFail) +} + +func parseRunArgs(args []string) (string, int, int, int, bool, error) { + fs := flag.NewFlagSet("loadtest", flag.ContinueOnError) host := fs.String("host", "http://127.0.0.1:9001", "The host to test") authors := fs.Int("authors", 0, "Number of authors") lurkers := fs.Int("lurkers", 0, "Number of lurkers") @@ -30,13 +38,20 @@ func RunFromCLI(logger *zap.SugaredLogger, args []string) { args = args[1:] } - fs.Parse(args) - - StartLoadTest(logger, *host, *authors, *lurkers, *duration, *untilFail) + err := fs.Parse(args) + return *host, *authors, *lurkers, *duration, *untilFail, err } func RunMultiFromCLI(logger *zap.SugaredLogger, args []string) { - fs := flag.NewFlagSet("multiload", flag.ExitOnError) + host, maxPads, err := parseMultiRunArgs(args) + if err != nil { + return + } + StartMultiLoadTest(logger, host, maxPads) +} + +func parseMultiRunArgs(args []string) (string, int, error) { + fs := flag.NewFlagSet("multiload", flag.ContinueOnError) host := fs.String("host", "http://127.0.0.1:9001", "The host to test") maxPads := fs.Int("maxPads", 10, "Maximum number of pads") @@ -45,9 +60,8 @@ func RunMultiFromCLI(logger *zap.SugaredLogger, args []string) { args = args[1:] } - fs.Parse(args) - - StartMultiLoadTest(logger, *host, *maxPads) + err := fs.Parse(args) + return *host, *maxPads, err } type Metrics struct { @@ -272,6 +286,9 @@ func StartLoadTest(logger *zap.SugaredLogger, host string, numAuthors, numLurker fmt.Println("Test duration complete and Load Tests PASS") // Print final stats fmt.Printf("%+v\n", stats) + if os.Getenv("GO_TEST_MODE") == "true" { + return + } os.Exit(0) } @@ -279,6 +296,9 @@ func StartLoadTest(logger *zap.SugaredLogger, host string, numAuthors, numLurker diff := atomic.LoadInt64(&stats.AppendSent) - atomic.LoadInt64(&stats.AcceptedCommit) if diff > 100 { fmt.Printf("Load test failed: too many pending commits (%d)\n", diff) + if os.Getenv("GO_TEST_MODE") == "true" { + return + } os.Exit(1) } } diff --git a/lib/loadtest/app_test.go b/lib/loadtest/app_test.go new file mode 100644 index 0000000..fe7cbf1 --- /dev/null +++ b/lib/loadtest/app_test.go @@ -0,0 +1,120 @@ +package loadtest + +import ( + "testing" +) + +func TestParseRunArgs(t *testing.T) { + tests := []struct { + name string + args []string + wantHost string + wantAuthors int + wantLurkers int + wantDuration int + wantUntilFail bool + }{ + { + name: "default values", + args: []string{}, + wantHost: "http://127.0.0.1:9001", + wantAuthors: 0, + wantLurkers: 0, + wantDuration: 0, + }, + { + name: "positional host", + args: []string{"http://test.com"}, + wantHost: "http://test.com", + wantAuthors: 0, + wantLurkers: 0, + wantDuration: 0, + }, + { + name: "explicit flags", + args: []string{"-host", "http://test.com", "-authors", "5", "-lurkers", "10", "-duration", "60", "-loadUntilFail"}, + wantHost: "http://test.com", + wantAuthors: 5, + wantLurkers: 10, + wantDuration: 60, + wantUntilFail: true, + }, + { + name: "positional host and flags", + args: []string{"http://pos.com", "-authors", "3"}, + wantHost: "http://pos.com", + wantAuthors: 3, + wantLurkers: 0, + wantDuration: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + host, authors, lurkers, duration, untilFail, err := parseRunArgs(tt.args) + if err != nil { + t.Errorf("parseRunArgs() error = %v", err) + return + } + if host != tt.wantHost { + t.Errorf("host = %v, want %v", host, tt.wantHost) + } + if authors != tt.wantAuthors { + t.Errorf("authors = %v, want %v", authors, tt.wantAuthors) + } + if lurkers != tt.wantLurkers { + t.Errorf("lurkers = %v, want %v", lurkers, tt.wantLurkers) + } + if duration != tt.wantDuration { + t.Errorf("duration = %v, want %v", duration, tt.wantDuration) + } + if untilFail != tt.wantUntilFail { + t.Errorf("untilFail = %v, want %v", untilFail, tt.wantUntilFail) + } + }) + } +} + +func TestParseMultiRunArgs(t *testing.T) { + tests := []struct { + name string + args []string + wantHost string + wantMaxPads int + }{ + { + name: "default values", + args: []string{}, + wantHost: "http://127.0.0.1:9001", + wantMaxPads: 10, + }, + { + name: "explicit flags", + args: []string{"-host", "http://test.com", "-maxPads", "20"}, + wantHost: "http://test.com", + wantMaxPads: 20, + }, + { + name: "positional host", + args: []string{"http://pos.com", "-maxPads", "5"}, + wantHost: "http://pos.com", + wantMaxPads: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + host, maxPads, err := parseMultiRunArgs(tt.args) + if err != nil { + t.Errorf("parseMultiRunArgs() error = %v", err) + return + } + if host != tt.wantHost { + t.Errorf("host = %v, want %v", host, tt.wantHost) + } + if maxPads != tt.wantMaxPads { + t.Errorf("maxPads = %v, want %v", maxPads, tt.wantMaxPads) + } + }) + } +} diff --git a/lib/locales/locales.go b/lib/locales/locales.go index cf922e1..4209638 100644 --- a/lib/locales/locales.go +++ b/lib/locales/locales.go @@ -22,10 +22,17 @@ func Init(initStore *lib.InitStore) { } fileName := file.Name() Locales[strings.Replace(fileName, ".json", "", -1)] = `locales/` + fileName - content, _ := fs.ReadFile(initStore.UiAssets, "./assets/locales/en.json") + content, err := fs.ReadFile(initStore.UiAssets, "./assets/locales/en.json") + if err != nil { + initStore.Logger.Warnf("Could not read en.json: %v", err) + continue + } var enMap = make(map[string]string) - json.Unmarshal(content, &enMap) + if err := json.Unmarshal(content, &enMap); err != nil { + initStore.Logger.Warnf("Could not unmarshal en.json: %v", err) + continue + } Locales["en"] = enMap } } diff --git a/lib/test/integration_test.go b/lib/test/integration_test.go new file mode 100644 index 0000000..a8bc515 --- /dev/null +++ b/lib/test/integration_test.go @@ -0,0 +1,150 @@ +package test + +import ( + "fmt" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/ether/etherpad-go/lib" + // "github.com/ether/etherpad-go/lib/api" + "github.com/ether/etherpad-go/lib/author" + "github.com/ether/etherpad-go/lib/cli" + "github.com/ether/etherpad-go/lib/db" + "github.com/ether/etherpad-go/lib/hooks" + "github.com/ether/etherpad-go/lib/loadtest" + "github.com/ether/etherpad-go/lib/pad" + "github.com/ether/etherpad-go/lib/settings" + "github.com/ether/etherpad-go/lib/test/testutils" + "github.com/ether/etherpad-go/lib/ws" + "github.com/go-playground/validator/v10" + "github.com/gofiber/adaptor/v2" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/session" + "go.uber.org/zap" +) + +func TestIntegration(t *testing.T) { + // Setup environment for testing + os.Setenv("GO_TEST_MODE", "true") + defer os.Unsetenv("GO_TEST_MODE") + + logger := zap.NewNop().Sugar() + + // Use Memory DataStore for integration test for speed + dataStore := db.NewMemoryDataStore() + defer dataStore.Close() + + hook := hooks.NewHook() + hub := ws.NewHub() + go hub.Run() + + sessionStore := ws.NewSessionStore() + padManager := pad.NewManager(dataStore, &hook) + authorManager := author.NewManager(dataStore) + padMessageHandler := ws.NewPadMessageHandler(dataStore, &hook, padManager, &sessionStore, hub, logger) + securityManager := pad.NewSecurityManager(dataStore, &hook, padManager) + readOnlyManager := pad.NewReadOnlyManager(dataStore) + + settings.Displayed = settings.Settings{ + IP: "127.0.0.1", + Port: "3000", + SSO: &settings.SSO{ + Issuer: "http://localhost:3000", + }, + } + + app := fiber.New(fiber.Config{ + DisableStartupMessage: true, + }) + + app.Use(func(c *fiber.Ctx) error { + fmt.Printf("DEBUG Request: %s %s, Content-Type: %s\n", c.Method(), c.Path(), c.Get("Content-Type")) + err := c.Next() + if err != nil { + fmt.Printf("DEBUG Error in route: %v\n", err) + } + return err + }) + + // Setup session middleware + cookieStore := session.New(session.Config{ + KeyLookup: "cookie:express_sid", + }) + + // Setup API and WebSocket routes similar to main.go + _ = &lib.InitStore{ + C: app, + Validator: validator.New(validator.WithRequiredStructEnabled()), + PadManager: padManager, + AuthorManager: authorManager, + Hooks: &hook, + Logger: logger, + SecurityManager: securityManager, + UiAssets: testutils.GetTestAssets(), + CookieStore: cookieStore, + Handler: padMessageHandler, + Store: dataStore, + ReadOnlyManager: readOnlyManager, + RetrievedSettings: &settings.Displayed, + } + // api.InitAPI(libStore) + + app.Get("/p/:padId", func(c *fiber.Ctx) error { + padID := c.Params("padId") + fmt.Printf("Accessing pad: %s\n", padID) + // Ensure pad exists in manager + p, _ := padManager.GetPad(padID, nil, nil) + p.SetText("Initial Text\n", nil) + c.Set("Content-Type", "text/html") + return c.SendString("Pad") + }) + + app.Get("/socket.io/*", func(c *fiber.Ctx) error { + c.Locals("ctx", c) + return ws.ServeWsFiber(cookieStore, &settings.Displayed, logger, padMessageHandler)(c) + }) + + // Start test server + ts := httptest.NewServer(adaptor.FiberApp(app)) + defer ts.Close() + + settings.Displayed.SSO.Issuer = ts.URL + + t.Run("CLI_Append_Verification", func(t *testing.T) { + padID := "test-pad-" + time.Now().Format("150405") + host := fmt.Sprintf("%s/p/%s", ts.URL, padID) + appendStr := "Hello from Integration Test" + + // Run CLI Append + cli.RunFromCLI(logger, []string{"-host", host, "-append", appendStr}) + + // Verify via PadManager + p, err := padManager.GetPad(padID, nil, nil) + if err != nil { + t.Fatalf("Failed to get pad: %v", err) + } + + text := p.Text() + if !strings.Contains(text, appendStr) { + t.Errorf("Expected pad to contain %q, but got %q", appendStr, text) + } + }) + + t.Run("Loadtest_Short_Run", func(t *testing.T) { + padID := "load-test-pad" + host := fmt.Sprintf("%s/p/%s", ts.URL, padID) + + // Run a very short loadtest (1 second) + loadtest.RunFromCLI(logger, []string{"-host", host, "-authors", "1", "-duration", "1"}) + + // If it doesn't panic or hang, we consider it a success for this integration test + if _, err := padManager.GetPad(padID, nil, nil); err == nil { + // In Memory store might not have revisions if not saved properly but let's check + // We just want to see if the loadtest ran without errors + t.Logf("Loadtest finished for pad %s", padID) + } + }) +} diff --git a/lib/ws/client.go b/lib/ws/client.go index fe754ea..de0f244 100644 --- a/lib/ws/client.go +++ b/lib/ws/client.go @@ -7,6 +7,7 @@ package ws import ( "bytes" "encoding/json" + "fmt" "log" "net/http" "strings" @@ -18,6 +19,7 @@ import ( "github.com/ether/etherpad-go/lib/ws/ratelimiter" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/session" + fiberwebsocket "github.com/gofiber/websocket/v2" "go.uber.org/zap" "github.com/gorilla/websocket" @@ -26,6 +28,9 @@ import ( var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, } var ( @@ -114,7 +119,10 @@ func (c *Client) writePump() { // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *Client) readPump(retrievedSettings *settings.Settings, logger *zap.SugaredLogger) { + fmt.Printf("readPump started for SessionId: %s\n", c.SessionId) + c.Hub.Register <- c defer func() { + fmt.Printf("readPump ended for SessionId: %s\n", c.SessionId) c.Hub.Unregister <- c c.Conn.Close() }() @@ -231,6 +239,37 @@ func (c *Client) SendPadDelete() { c.SafeSend([]byte(`{"disconnect":"deleted"}`)) } +func ServeWsFiber(sessionStore *session.Store, configSettings *settings.Settings, + logger *zap.SugaredLogger, handler *PadMessageHandler) fiber.Handler { + return fiberwebsocket.New(func(c *fiberwebsocket.Conn) { + ctxObj := c.Locals("ctx") + if ctxObj == nil { + logger.Error("Ctx local not found in websocket connection") + return + } + fctx := ctxObj.(*fiber.Ctx) + store, err := sessionStore.Get(fctx) + if err != nil { + logger.Error("Error establishing socket conn session: ", err) + return + } + + client := &Client{ + Hub: handler.hub, + Conn: c, + Send: make(chan []byte, 256), + SessionId: store.ID(), + Ctx: fctx, + Handler: handler, + } + fmt.Printf("WS client created for SessionId: %s\n", store.ID()) + handler.SessionStore.initSession(store.ID()) + client.Hub.Register <- client + go client.writePump() + client.readPump(configSettings, logger) + }) +} + // ServeWs handles websocket requests from the peer. func ServeWs(w http.ResponseWriter, r *http.Request, sessionStore *session.Store, fiber *fiber.Ctx, configSettings *settings.Settings, From dd3c3f1691cfe2763590e2b12d15d2112cea1ba8 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 17 Jan 2026 17:00:56 +0100 Subject: [PATCH 07/11] chore: fixed tests --- lib/locales/locales.go | 2 +- lib/session/SessionDatabase.go | 5 +- lib/test/integration_test.go | 150 --------------------------------- lib/ws/client.go | 35 -------- 4 files changed, 3 insertions(+), 189 deletions(-) delete mode 100644 lib/test/integration_test.go diff --git a/lib/locales/locales.go b/lib/locales/locales.go index 4209638..17574e5 100644 --- a/lib/locales/locales.go +++ b/lib/locales/locales.go @@ -22,7 +22,7 @@ func Init(initStore *lib.InitStore) { } fileName := file.Name() Locales[strings.Replace(fileName, ".json", "", -1)] = `locales/` + fileName - content, err := fs.ReadFile(initStore.UiAssets, "./assets/locales/en.json") + content, err := fs.ReadFile(initStore.UiAssets, "assets/locales/en.json") if err != nil { initStore.Logger.Warnf("Could not read en.json: %v", err) continue diff --git a/lib/session/SessionDatabase.go b/lib/session/SessionDatabase.go index 7113802..22eb730 100644 --- a/lib/session/SessionDatabase.go +++ b/lib/session/SessionDatabase.go @@ -1,20 +1,19 @@ package session import ( - "github.com/ether/etherpad-go/lib/db" "time" + + "github.com/ether/etherpad-go/lib/db" ) type Database struct { } func (s Database) Get(key string) ([]byte, error) { - println(key) return nil, nil } func (s Database) Set(key string, val []byte, exp time.Duration) error { - println(key, val, exp) //TODO implement me return nil } diff --git a/lib/test/integration_test.go b/lib/test/integration_test.go deleted file mode 100644 index a8bc515..0000000 --- a/lib/test/integration_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package test - -import ( - "fmt" - "net/http/httptest" - "os" - "strings" - "testing" - "time" - - "github.com/ether/etherpad-go/lib" - // "github.com/ether/etherpad-go/lib/api" - "github.com/ether/etherpad-go/lib/author" - "github.com/ether/etherpad-go/lib/cli" - "github.com/ether/etherpad-go/lib/db" - "github.com/ether/etherpad-go/lib/hooks" - "github.com/ether/etherpad-go/lib/loadtest" - "github.com/ether/etherpad-go/lib/pad" - "github.com/ether/etherpad-go/lib/settings" - "github.com/ether/etherpad-go/lib/test/testutils" - "github.com/ether/etherpad-go/lib/ws" - "github.com/go-playground/validator/v10" - "github.com/gofiber/adaptor/v2" - "github.com/gofiber/fiber/v2" - "github.com/gofiber/fiber/v2/middleware/session" - "go.uber.org/zap" -) - -func TestIntegration(t *testing.T) { - // Setup environment for testing - os.Setenv("GO_TEST_MODE", "true") - defer os.Unsetenv("GO_TEST_MODE") - - logger := zap.NewNop().Sugar() - - // Use Memory DataStore for integration test for speed - dataStore := db.NewMemoryDataStore() - defer dataStore.Close() - - hook := hooks.NewHook() - hub := ws.NewHub() - go hub.Run() - - sessionStore := ws.NewSessionStore() - padManager := pad.NewManager(dataStore, &hook) - authorManager := author.NewManager(dataStore) - padMessageHandler := ws.NewPadMessageHandler(dataStore, &hook, padManager, &sessionStore, hub, logger) - securityManager := pad.NewSecurityManager(dataStore, &hook, padManager) - readOnlyManager := pad.NewReadOnlyManager(dataStore) - - settings.Displayed = settings.Settings{ - IP: "127.0.0.1", - Port: "3000", - SSO: &settings.SSO{ - Issuer: "http://localhost:3000", - }, - } - - app := fiber.New(fiber.Config{ - DisableStartupMessage: true, - }) - - app.Use(func(c *fiber.Ctx) error { - fmt.Printf("DEBUG Request: %s %s, Content-Type: %s\n", c.Method(), c.Path(), c.Get("Content-Type")) - err := c.Next() - if err != nil { - fmt.Printf("DEBUG Error in route: %v\n", err) - } - return err - }) - - // Setup session middleware - cookieStore := session.New(session.Config{ - KeyLookup: "cookie:express_sid", - }) - - // Setup API and WebSocket routes similar to main.go - _ = &lib.InitStore{ - C: app, - Validator: validator.New(validator.WithRequiredStructEnabled()), - PadManager: padManager, - AuthorManager: authorManager, - Hooks: &hook, - Logger: logger, - SecurityManager: securityManager, - UiAssets: testutils.GetTestAssets(), - CookieStore: cookieStore, - Handler: padMessageHandler, - Store: dataStore, - ReadOnlyManager: readOnlyManager, - RetrievedSettings: &settings.Displayed, - } - // api.InitAPI(libStore) - - app.Get("/p/:padId", func(c *fiber.Ctx) error { - padID := c.Params("padId") - fmt.Printf("Accessing pad: %s\n", padID) - // Ensure pad exists in manager - p, _ := padManager.GetPad(padID, nil, nil) - p.SetText("Initial Text\n", nil) - c.Set("Content-Type", "text/html") - return c.SendString("Pad") - }) - - app.Get("/socket.io/*", func(c *fiber.Ctx) error { - c.Locals("ctx", c) - return ws.ServeWsFiber(cookieStore, &settings.Displayed, logger, padMessageHandler)(c) - }) - - // Start test server - ts := httptest.NewServer(adaptor.FiberApp(app)) - defer ts.Close() - - settings.Displayed.SSO.Issuer = ts.URL - - t.Run("CLI_Append_Verification", func(t *testing.T) { - padID := "test-pad-" + time.Now().Format("150405") - host := fmt.Sprintf("%s/p/%s", ts.URL, padID) - appendStr := "Hello from Integration Test" - - // Run CLI Append - cli.RunFromCLI(logger, []string{"-host", host, "-append", appendStr}) - - // Verify via PadManager - p, err := padManager.GetPad(padID, nil, nil) - if err != nil { - t.Fatalf("Failed to get pad: %v", err) - } - - text := p.Text() - if !strings.Contains(text, appendStr) { - t.Errorf("Expected pad to contain %q, but got %q", appendStr, text) - } - }) - - t.Run("Loadtest_Short_Run", func(t *testing.T) { - padID := "load-test-pad" - host := fmt.Sprintf("%s/p/%s", ts.URL, padID) - - // Run a very short loadtest (1 second) - loadtest.RunFromCLI(logger, []string{"-host", host, "-authors", "1", "-duration", "1"}) - - // If it doesn't panic or hang, we consider it a success for this integration test - if _, err := padManager.GetPad(padID, nil, nil); err == nil { - // In Memory store might not have revisions if not saved properly but let's check - // We just want to see if the loadtest ran without errors - t.Logf("Loadtest finished for pad %s", padID) - } - }) -} diff --git a/lib/ws/client.go b/lib/ws/client.go index de0f244..5fa5b3e 100644 --- a/lib/ws/client.go +++ b/lib/ws/client.go @@ -7,7 +7,6 @@ package ws import ( "bytes" "encoding/json" - "fmt" "log" "net/http" "strings" @@ -19,7 +18,6 @@ import ( "github.com/ether/etherpad-go/lib/ws/ratelimiter" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/session" - fiberwebsocket "github.com/gofiber/websocket/v2" "go.uber.org/zap" "github.com/gorilla/websocket" @@ -119,10 +117,8 @@ func (c *Client) writePump() { // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *Client) readPump(retrievedSettings *settings.Settings, logger *zap.SugaredLogger) { - fmt.Printf("readPump started for SessionId: %s\n", c.SessionId) c.Hub.Register <- c defer func() { - fmt.Printf("readPump ended for SessionId: %s\n", c.SessionId) c.Hub.Unregister <- c c.Conn.Close() }() @@ -239,37 +235,6 @@ func (c *Client) SendPadDelete() { c.SafeSend([]byte(`{"disconnect":"deleted"}`)) } -func ServeWsFiber(sessionStore *session.Store, configSettings *settings.Settings, - logger *zap.SugaredLogger, handler *PadMessageHandler) fiber.Handler { - return fiberwebsocket.New(func(c *fiberwebsocket.Conn) { - ctxObj := c.Locals("ctx") - if ctxObj == nil { - logger.Error("Ctx local not found in websocket connection") - return - } - fctx := ctxObj.(*fiber.Ctx) - store, err := sessionStore.Get(fctx) - if err != nil { - logger.Error("Error establishing socket conn session: ", err) - return - } - - client := &Client{ - Hub: handler.hub, - Conn: c, - Send: make(chan []byte, 256), - SessionId: store.ID(), - Ctx: fctx, - Handler: handler, - } - fmt.Printf("WS client created for SessionId: %s\n", store.ID()) - handler.SessionStore.initSession(store.ID()) - client.Hub.Register <- client - go client.writePump() - client.readPump(configSettings, logger) - }) -} - // ServeWs handles websocket requests from the peer. func ServeWs(w http.ResponseWriter, r *http.Request, sessionStore *session.Store, fiber *fiber.Ctx, configSettings *settings.Settings, From 9ca4a10736b612a4114669a30186b0ea712fe224 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 17 Jan 2026 22:52:11 +0100 Subject: [PATCH 08/11] chore: working append --- lib/cli/cli.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/lib/cli/cli.go b/lib/cli/cli.go index f96e10c..47c77ba 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -61,14 +61,17 @@ func (p *Pad) Close() { p.closeOnce.Do(func() { close(p.closeChan) if p.conn != nil { - _ = p.conn.Close() // Fehler ignorieren + _ = p.conn.Close() } p.emit("disconnect", nil) }) } -// Append sendet einen Text-Append (Dummy, Changeset-Logik fehlt) func (p *Pad) Append(text string) { + if text[len(text)-1] != '\n' { + text += "\n" + } + newChangeset, err := changeset.MakeSplice(p.atext.Text, len(p.atext.Text), 0, text, nil, nil) if err != nil { fmt.Printf("Error creating changeset: %v\n", err) @@ -152,7 +155,7 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { os.Exit(1) } defer func() { - _ = resp.Body.Close() // Fehler ignorieren + _ = resp.Body.Close() }() wsUrl := fmt.Sprintf("%s/%ssocket.io", strings.Replace(padState.Host, "http", "ws", 1), padState.Path) @@ -340,7 +343,6 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { if ok { typeStr, _ := data["type"].(string) if typeStr == "CLIENT_READY" { - // CLIENT_READY-Event: einfach als connected behandeln pad.emit("connected", nil) } pad.emit("message", data) @@ -384,12 +386,11 @@ type PadChangeset struct { baseRev int } -// sendMessage-Logik wie im JS func (p *Pad) sendMessage(optMsg *PadChangeset) { if optMsg != nil { if p.outgoing != nil { if optMsg.baseRev != p.outgoing.baseRev { - return // oder Fehlerbehandlung + return } if cs, err := changeset.Compose(p.outgoing.changeset, optMsg.changeset, p.apool); err == nil && cs != nil { p.outgoing.changeset = *cs @@ -401,7 +402,6 @@ func (p *Pad) sendMessage(optMsg *PadChangeset) { if p.inFlight == nil && p.outgoing != nil { p.inFlight = p.outgoing p.outgoing = nil - // Sende COLLABROOM-Nachricht msg := map[string]interface{}{ "type": "COLLABROOM", "component": "pad", @@ -473,14 +473,12 @@ func RunFromCLI(logger *zap.SugaredLogger, args []string) { fmt.Println("CLI Connected, appending...") pad.Append(appendStr) fmt.Printf("Appended %q to %s\n", appendStr, host) - // Don't os.Exit(0) in tests, use a channel to signal completion if os.Getenv("GO_TEST_MODE") == "true" { pad.emit("append_done", nil) } else { os.Exit(0) } }) - // Block to wait for connection/append if os.Getenv("GO_TEST_MODE") == "true" { done := make(chan struct{}) pad.On("append_done", func(_ interface{}) { @@ -528,7 +526,6 @@ func parseCLIArgs(args []string) (string, string, error) { appendStr := fs.String("append", "", "Append contents to pad") fs.StringVar(appendStr, "a", "", "Append contents to pad (shorthand)") - // Compatibility for old positional host argument if len(args) > 0 && !strings.HasPrefix(args[0], "-") { *host = args[0] args = args[1:] @@ -537,7 +534,3 @@ func parseCLIArgs(args []string) (string, string, error) { err := fs.Parse(args) return *host, *appendStr, err } - -func StartCLI(logger *zap.SugaredLogger) { - RunFromCLI(logger, os.Args[2:]) -} From 45b123cbf8a2538e5ab2ef2a814d5fa88736ce81 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 17 Jan 2026 23:21:44 +0100 Subject: [PATCH 09/11] chore: fixed crashing of etherpad due to concurrent map write --- lib/cli/cli.go | 25 ++++++++++++++++++++++++- lib/ws/AdminMessageHandler.go | 2 ++ lib/ws/PadMessageHandler.go | 26 +++++++++++++++++--------- lib/ws/hub.go | 11 ++++++++++- 4 files changed, 53 insertions(+), 11 deletions(-) diff --git a/lib/cli/cli.go b/lib/cli/cli.go index 47c77ba..e57f478 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -30,6 +30,7 @@ type Pad struct { baseRev int atext *apool.AText conn *websocket.Conn + connWrite sync.Mutex events map[string][]func(interface{}) closeChan chan struct{} closeOnce sync.Once @@ -42,6 +43,7 @@ func NewPad(host, padId string, conn *websocket.Conn) *Pad { host: host, padId: padId, conn: conn, + connWrite: sync.Mutex{}, events: make(map[string][]func(interface{})), closeChan: make(chan struct{}), } @@ -68,7 +70,14 @@ func (p *Pad) Close() { } func (p *Pad) Append(text string) { - if text[len(text)-1] != '\n' { + if p.atext == nil || p.apool == nil { + fmt.Println("Pad ist nicht initialisiert (atext oder apool ist nil)") + return + } + + if len(text) == 0 { + text = "\n" + } else if text[len(text)-1] != '\n' { text += "\n" } @@ -87,6 +96,14 @@ func (p *Pad) Append(text string) { tempPool := apool.NewAPool() wireApool := tempPool.ToJsonable() + // Ensure websocket connection exists before attempting to write + if p.conn == nil { + fmt.Println("WebSocket connection is nil; cannot send USER_CHANGES") + return + } + + p.connWrite.Lock() + defer p.connWrite.Unlock() err = p.conn.WriteJSON(ws.UserChange{ Event: "message", Data: ws.UserChangeData{ @@ -103,6 +120,10 @@ func (p *Pad) Append(text string) { }, }) + if err != nil { + fmt.Printf("Error writing USER_CHANGES to websocket: %v\n", err) + } + } type PadState struct { @@ -412,6 +433,8 @@ func (p *Pad) sendMessage(optMsg *PadChangeset) { "apool": p.apool.ToJsonable(), }, } + p.connWrite.Lock() + defer p.connWrite.Unlock() _ = p.conn.WriteJSON(msg) } } diff --git a/lib/ws/AdminMessageHandler.go b/lib/ws/AdminMessageHandler.go index 3dd3293..33fc6e1 100644 --- a/lib/ws/AdminMessageHandler.go +++ b/lib/ws/AdminMessageHandler.go @@ -188,9 +188,11 @@ func (h AdminMessageHandler) HandleMessage(message admin.EventMessage, retrieved return } + h.hub.ClientsRWMutex.RLock() for key := range h.hub.Clients { key.SafeSend(responseBytes) } + h.hub.ClientsRWMutex.RUnlock() } case "deletePad": diff --git a/lib/ws/PadMessageHandler.go b/lib/ws/PadMessageHandler.go index 7cccd25..23111fb 100644 --- a/lib/ws/PadMessageHandler.go +++ b/lib/ws/PadMessageHandler.go @@ -8,6 +8,7 @@ import ( "regexp" "slices" "strconv" + "sync" "time" "unicode/utf8" @@ -56,6 +57,7 @@ type Task struct { type ChannelOperator struct { channels map[string]chan Task handler *PadMessageHandler + mu sync.Mutex } func NewChannelOperator(p *PadMessageHandler) ChannelOperator { @@ -66,19 +68,21 @@ func NewChannelOperator(p *PadMessageHandler) ChannelOperator { } func (c *ChannelOperator) AddToQueue(ch string, t Task) { - var _, ok = c.channels[ch] - + c.mu.Lock() + chChan, ok := c.channels[ch] if !ok { - c.channels[ch] = make(chan Task) - go func() { - for { - var incomingTask = <-c.channels[ch] + // small buffer to decouple producer from goroutine scheduling + chChan = make(chan Task, 1) + c.channels[ch] = chChan + go func(localCh chan Task) { + for incomingTask := range localCh { c.handler.handleUserChanges(incomingTask) } - }() + }(chChan) } + c.mu.Unlock() - c.channels[ch] <- t + chChan <- t } type PadMessageHandler struct { @@ -199,7 +203,7 @@ func (p *PadMessageHandler) handleUserChanges(task Task) { // and can be applied after "c". optRebasedChangeset, err := changeset.Follow(revisionPad.Changeset, rebasedChangeset, false, &retrievedPad.Pool) if err != nil { - p.Logger.Warnf("Error rebasing changeset at rev %d: %v", r, err) + p.Logger.Warnf("Error rebasing changeset at rev %d: %v for %s", r, err, retrievedPad.Id) return } rebasedChangeset = *optRebasedChangeset @@ -1347,16 +1351,19 @@ func (p *PadMessageHandler) UpdatePadClients(pad *pad2.Pad) { func (p *PadMessageHandler) GetRoomSockets(padID string) []Client { var sockets = make([]Client, 0) + p.hub.ClientsRWMutex.RLock() for k := range p.hub.Clients { sessId := p.SessionStore.getSession(k.SessionId) if sessId != nil && sessId.PadId == padID { sockets = append(sockets, *k) } } + p.hub.ClientsRWMutex.RUnlock() return sockets } func (p *PadMessageHandler) KickSessionsFromPad(padID string) { + p.hub.ClientsRWMutex.RLock() for k := range p.hub.Clients { if k == nil || k.SessionId == "" { continue @@ -1370,4 +1377,5 @@ func (p *PadMessageHandler) KickSessionsFromPad(padID string) { k.SendPadDelete() } } + p.hub.ClientsRWMutex.RUnlock() } diff --git a/lib/ws/hub.go b/lib/ws/hub.go index df63ef4..0e84bdd 100644 --- a/lib/ws/hub.go +++ b/lib/ws/hub.go @@ -1,10 +1,13 @@ package ws +import "sync" + // Hub maintains the set of active Clients and broadcasts messages to the // Clients. type Hub struct { // Registered Clients. - Clients map[*Client]bool + Clients map[*Client]bool + ClientsRWMutex sync.RWMutex // Inbound messages from the Clients. Broadcast chan []byte @@ -29,16 +32,21 @@ func (h *Hub) Run() { for { select { case client := <-h.Register: + h.ClientsRWMutex.Lock() h.Clients[client] = true + h.ClientsRWMutex.Unlock() case client := <-h.Unregister: if client == nil { continue } + h.ClientsRWMutex.Lock() if _, ok := h.Clients[client]; ok { delete(h.Clients, client) close(client.Send) } + h.ClientsRWMutex.Unlock() case message := <-h.Broadcast: + h.ClientsRWMutex.RLock() for client := range h.Clients { if client == nil { continue @@ -50,6 +58,7 @@ func (h *Hub) Run() { delete(h.Clients, client) } } + h.ClientsRWMutex.RUnlock() } } } From 06538cccd3868628853f9d401ec7183fc0f24a41 Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Mon, 19 Jan 2026 20:29:11 +0100 Subject: [PATCH 10/11] chore: working append --- lib/changeset/changeset.go | 4 + lib/cli/cli.go | 161 +++++++++++++++++++++++++----------- lib/loadtest/app.go | 14 +--- lib/models/ws/userChange.go | 15 ++-- lib/utils/stringUtils.go | 8 +- lib/ws/PadMessageHandler.go | 1 - 6 files changed, 133 insertions(+), 70 deletions(-) diff --git a/lib/changeset/changeset.go b/lib/changeset/changeset.go index 20f2b0b..f387924 100644 --- a/lib/changeset/changeset.go +++ b/lib/changeset/changeset.go @@ -134,6 +134,10 @@ func MakeSplice(orig string, start int, ndel int, ins string, attribs *string, p stringAttribs: &emptyStringAttribs, } + if attribs == nil { + attribs = &emptyStringAttribs + } + var equalOps = OpsFromText("=", utils.RuneSlice(orig, 0, start), &keepArgsToUse, nil) var deletedOps = OpsFromText("-", deleted, &keepArgsToUse, nil) var insertedOps = OpsFromText("+", ins, &KeepArgs{ diff --git a/lib/cli/cli.go b/lib/cli/cli.go index e57f478..bd785c7 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "unicode/utf8" "flag" "io" @@ -31,6 +32,7 @@ type Pad struct { atext *apool.AText conn *websocket.Conn connWrite sync.Mutex + poolLock sync.RWMutex events map[string][]func(interface{}) closeChan chan struct{} closeOnce sync.Once @@ -70,60 +72,71 @@ func (p *Pad) Close() { } func (p *Pad) Append(text string) { + // Acquire lock while we read/modify shared pad state (atext, apool, baseRev) + p.poolLock.Lock() if p.atext == nil || p.apool == nil { + p.poolLock.Unlock() fmt.Println("Pad ist nicht initialisiert (atext oder apool ist nil)") return } if len(text) == 0 { - text = "\n" - } else if text[len(text)-1] != '\n' { + fmt.Println("Kein Text zum Anhängen – Changeset wird nicht erzeugt.") + p.poolLock.Unlock() + return + } + + if text == "\n" && strings.HasSuffix(p.atext.Text, "\n") { + fmt.Println("Pad endet bereits mit Zeilenumbruch – Changeset wird nicht erzeugt.") + p.poolLock.Unlock() + return + } + + if text[len(text)-1] != '\n' { text += "\n" } - newChangeset, err := changeset.MakeSplice(p.atext.Text, len(p.atext.Text), 0, text, nil, nil) + start := utf8.RuneCountInString(p.atext.Text) + emptyAttribs := "" + newChangeset, err := changeset.MakeSplice(p.atext.Text, start, 0, text, &emptyAttribs, p.apool) if err != nil { + p.poolLock.Unlock() fmt.Printf("Error creating changeset: %v\n", err) return } - newRev := p.baseRev - p.atext, err = changeset.ApplyToAText(newChangeset, *p.atext, *p.apool) + // Unpack and repack to ensure canonical form + unpacked, err := changeset.Unpack(newChangeset) if err != nil { - fmt.Printf("Error applying changeset: %v\n", err) + p.poolLock.Unlock() + fmt.Printf("Error unpacking changeset: %v\n", err) return } - tempPool := apool.NewAPool() - wireApool := tempPool.ToJsonable() - - // Ensure websocket connection exists before attempting to write - if p.conn == nil { - fmt.Println("WebSocket connection is nil; cannot send USER_CHANGES") + newChangeset = changeset.Pack(unpacked.OldLen, unpacked.NewLen, unpacked.Ops, unpacked.CharBank) + + // Validate generated changeset header: oldLen should equal current local text length + if unpacked.OldLen != start { + p.poolLock.Unlock() + fmt.Printf("Generated changeset oldLen mismatch: expected %d got %d; not sending\n", start, unpacked.OldLen) + // emit an error event so callers/tests can react + p.emit("append_error", map[string]interface{}{"error": "oldLen_mismatch", "expected": start, "got": unpacked.OldLen}) return } - p.connWrite.Lock() - defer p.connWrite.Unlock() - err = p.conn.WriteJSON(ws.UserChange{ - Event: "message", - Data: ws.UserChangeData{ - Component: "pad", - Type: "USER_CHANGES", - Data: ws.UserChangeDataData{ - Apool: struct { - NumToAttrib map[int][]string `json:"numToAttrib"` - NextNum int `json:"nextNum"` - }{NumToAttrib: wireApool.NumToAttribRaw, NextNum: wireApool.NextNum}, - BaseRev: newRev, - Changeset: newChangeset, - }, - }, - }) - + newAText, err := changeset.ApplyToAText(newChangeset, *p.atext, *p.apool) if err != nil { - fmt.Printf("Error writing USER_CHANGES to websocket: %v\n", err) + p.poolLock.Unlock() + fmt.Printf("Error applying changeset: %v\n", err) + return } + p.atext = newAText + baseRev := p.baseRev + p.poolLock.Unlock() + + // Queue the changeset for sending + pc := &PadChangeset{changeset: newChangeset, baseRev: baseRev} + p.sendMessage(pc) } type PadState struct { @@ -195,7 +208,16 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { pad := NewPad(padState.Host, padState.PadId, connection) go func() { - defer pad.Close() + // Recover to avoid crashing the whole process on unexpected panics in the reader loop + defer func() { + if r := recover(); r != nil { + logger.Errorf("panic in recv goroutine: %v", r) + pad.emit("disconnect", r) + _ = connection.Close() + } + pad.Close() + }() + var ( newline = []byte{'\n'} space = []byte{' '} @@ -226,6 +248,7 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { msgType, _ := arr[0].(string) if msgType != "message" { continue + } msgObj := arr[1] msgMap, ok := msgObj.(map[string]interface{}) @@ -272,11 +295,14 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { if nextNum, ok := apoolMap["nextNum"].(float64); ok { pool.NextNum = int(nextNum) } + // protect setting shared fields + pad.poolLock.Lock() pad.apool = &pool if rev, ok := collabVars["rev"].(float64); ok { pad.baseRev = int(rev) } pad.atext = &atext + pad.poolLock.Unlock() pad.emit("connected", nil) case "COLLABROOM": data, ok := msgMap["data"].(map[string]interface{}) @@ -284,6 +310,15 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { continue } if data["type"] == "NEW_CHANGES" { + // Ensure we have initial state + pad.poolLock.RLock() + havePool := pad.apool != nil + haveAText := pad.atext != nil + pad.poolLock.RUnlock() + if !havePool || !haveAText { + logger.Errorf("received NEW_CHANGES but pad.apool or pad.atext is nil - skipping") + continue + } if newRev, ok := data["newRev"].(float64); ok && int(newRev) <= pad.baseRev { continue } @@ -315,23 +350,43 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { } } changesetStr, _ := data["changeset"].(string) - serverChangeset := changeset.MoveOpsToNewPool(changesetStr, &wireApool, pad.apool) + // Re-read pad.apool and pad.atext under read lock to ensure stability + pad.poolLock.RLock() + localPool := pad.apool + localAText := pad.atext + pad.poolLock.RUnlock() + if localPool == nil || localAText == nil { + logger.Errorf("pad.apool or pad.atext became nil while processing NEW_CHANGES - skipping") + continue + } + serverChangeset := changeset.MoveOpsToNewPool(changesetStr, &wireApool, localPool) server := &PadChangeset{changeset: serverChangeset} + // Validate server changeset header before attempting to apply it + if unpacked, err := changeset.Unpack(server.changeset); err != nil { + logger.Errorf("cannot unpack server changeset: %v - skipping", err) + continue + } else if utf8.RuneCountInString(localAText.Text) != unpacked.OldLen { + logger.Errorf("server changeset oldLen %d does not match local text length %d - skipping", unpacked.OldLen, utf8.RuneCountInString(localAText.Text)) + continue + } if pad.inFlight != nil { - transformX(pad.inFlight, server, pad.apool) + transformX(pad.inFlight, server, localPool) } if pad.outgoing != nil { - transformX(pad.outgoing, server, pad.apool) + transformX(pad.outgoing, server, localPool) if newRev, ok := data["newRev"].(float64); ok { pad.outgoing.baseRev = int(newRev) } } - atext, err := changeset.ApplyToAText(server.changeset, *pad.atext, *pad.apool) + atext, err := changeset.ApplyToAText(server.changeset, *localAText, *localPool) if err != nil { logger.Errorf("Fehler beim Anwenden des Changesets: %v", err) continue } + // write back updated atext under lock + pad.poolLock.Lock() pad.atext = atext + pad.poolLock.Unlock() if newRev, ok := data["newRev"].(float64); ok { pad.baseRev = int(newRev) } @@ -411,11 +466,15 @@ func (p *Pad) sendMessage(optMsg *PadChangeset) { if optMsg != nil { if p.outgoing != nil { if optMsg.baseRev != p.outgoing.baseRev { + fmt.Println("Dropping outgoing changeset due to baseRev mismatch") return } - if cs, err := changeset.Compose(p.outgoing.changeset, optMsg.changeset, p.apool); err == nil && cs != nil { - p.outgoing.changeset = *cs + tempStr, err := changeset.Compose(p.outgoing.changeset, optMsg.changeset, p.apool) + if err != nil { + fmt.Printf("Error composing outgoing changesets: %v\n", err) + return } + p.outgoing.changeset = *tempStr } else { p.outgoing = optMsg } @@ -423,14 +482,24 @@ func (p *Pad) sendMessage(optMsg *PadChangeset) { if p.inFlight == nil && p.outgoing != nil { p.inFlight = p.outgoing p.outgoing = nil - msg := map[string]interface{}{ - "type": "COLLABROOM", - "component": "pad", - "data": map[string]interface{}{ - "type": "USER_CHANGES", - "baseRev": p.inFlight.baseRev, - "changeset": p.inFlight.changeset, - "apool": p.apool.ToJsonable(), + apoolCreated := apool.NewAPool() + changeset.MoveOpsToNewPool(p.inFlight.changeset, p.apool, &apoolCreated) + wirePool := apoolCreated.ToJsonable() + fmt.Println("Sending changeset:", p.inFlight.changeset) + msg := ws.UserChange{ + Event: "message", + Data: ws.UserChangeData{ + Type: "COLLABROOM", + Component: "pad", + Data: ws.UserChangeDataData{ + Type: "USER_CHANGES", + BaseRev: p.inFlight.baseRev, + Changeset: p.inFlight.changeset, + Apool: ws.UserChangeDataDataApool{ + NumToAttrib: wirePool.NumToAttribRaw, + NextNum: wirePool.NextNum, + }, + }, }, } p.connWrite.Lock() diff --git a/lib/loadtest/app.go b/lib/loadtest/app.go index ac05cb8..aa6e65e 100644 --- a/lib/loadtest/app.go +++ b/lib/loadtest/app.go @@ -14,6 +14,7 @@ import ( "github.com/ether/etherpad-go/lib/apool" "github.com/ether/etherpad-go/lib/cli" + "github.com/ether/etherpad-go/lib/utils" "go.uber.org/zap" ) @@ -80,17 +81,6 @@ var stats Metrics var maxPS float64 var statsLock sync.Mutex -func randomString() string { - const stringLength = 4 - var b strings.Builder - for i := 0; i < stringLength; i++ { - // JS: Math.random() * (300 - 1) + 1 - charNumber := rand.Intn(299) + 1 - b.WriteRune(rune(charNumber)) - } - return b.String() -} - func randomPadName() string { const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" const strLen = 10 @@ -165,7 +155,7 @@ func newAuthor(host string, logger *zap.SugaredLogger) { for range ticker.C { atomic.AddInt64(&stats.AppendSent, 1) updateMetricsUI(host) - p.Append(randomString()) + p.Append(utils.RandomString(10)) } }() }) diff --git a/lib/models/ws/userChange.go b/lib/models/ws/userChange.go index d9ddd85..f4f84ca 100644 --- a/lib/models/ws/userChange.go +++ b/lib/models/ws/userChange.go @@ -11,11 +11,14 @@ type UserChangeData struct { Type string `json:"type"` } +type UserChangeDataDataApool struct { + NumToAttrib map[int][]string `json:"numToAttrib"` + NextNum int `json:"nextNum"` +} + type UserChangeDataData struct { - Apool struct { - NumToAttrib map[int][]string `json:"numToAttrib"` - NextNum int `json:"nextNum"` - } `json:"apool"` - BaseRev int `json:"baseRev"` - Changeset string `json:"changeset"` + Type string `json:"type"` + Apool UserChangeDataDataApool `json:"apool"` + BaseRev int `json:"baseRev"` + Changeset string `json:"changeset"` } diff --git a/lib/utils/stringUtils.go b/lib/utils/stringUtils.go index ac3f533..cef4a90 100644 --- a/lib/utils/stringUtils.go +++ b/lib/utils/stringUtils.go @@ -1,17 +1,15 @@ package utils import ( - randc "crypto/rand" - "encoding/hex" "math/big" "strconv" "strings" + + "github.com/ether/etherpad-go/lib/test/testutils/general" ) func RandomString(length int) string { - bytes := make([]byte, length) - randc.Read(bytes) - return hex.EncodeToString(bytes) + return general.RandomInlineString(length) } func NumToString(num int) string { diff --git a/lib/ws/PadMessageHandler.go b/lib/ws/PadMessageHandler.go index 23111fb..a07950e 100644 --- a/lib/ws/PadMessageHandler.go +++ b/lib/ws/PadMessageHandler.go @@ -222,7 +222,6 @@ func (p *PadMessageHandler) handleUserChanges(task Task) { if *oldLen != utf8.RuneCountInString(prevText) { p.Logger.Warnf("Can't apply changeset to pad text: oldLen=%d, prevTextLen=%d, baseRev=%d, headRev=%d", *oldLen, utf8.RuneCountInString(prevText), r, retrievedPad.Head) - // Don't panic - just return and let the client retry or reconnect return } From f1830e57c559caefa596a782168884d09824cdba Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Mon, 19 Jan 2026 21:22:37 +0100 Subject: [PATCH 11/11] chore: fixed with padIDS for author --- lib/cli/cli.go | 58 +++++++++++------------ lib/db/MySQLDB.go | 43 +++++++++++++---- lib/db/PostgresDB.go | 43 +++++++++++++---- lib/db/SQLiteDB.go | 46 ++++++++++++++---- lib/test/api/author/author_test.go | 12 +++-- lib/test/testutils/general/stringUtils.go | 2 +- lib/test/ws/pad_message_handler_test.go | 14 +----- 7 files changed, 145 insertions(+), 73 deletions(-) diff --git a/lib/cli/cli.go b/lib/cli/cli.go index bd785c7..6c32a43 100644 --- a/lib/cli/cli.go +++ b/lib/cli/cli.go @@ -25,6 +25,7 @@ import ( ) type Pad struct { + logger *zap.SugaredLogger host string padId string apool *apool.APool @@ -40,8 +41,9 @@ type Pad struct { outgoing *PadChangeset } -func NewPad(host, padId string, conn *websocket.Conn) *Pad { +func NewPad(host, padId string, conn *websocket.Conn, logger *zap.SugaredLogger) *Pad { return &Pad{ + logger: logger, host: host, padId: padId, conn: conn, @@ -76,18 +78,18 @@ func (p *Pad) Append(text string) { p.poolLock.Lock() if p.atext == nil || p.apool == nil { p.poolLock.Unlock() - fmt.Println("Pad ist nicht initialisiert (atext oder apool ist nil)") + p.logger.Errorf("Pad is not initialized") return } if len(text) == 0 { - fmt.Println("Kein Text zum Anhängen – Changeset wird nicht erzeugt.") + p.logger.Warnf("No text to append - changeset will not be created.") p.poolLock.Unlock() return } if text == "\n" && strings.HasSuffix(p.atext.Text, "\n") { - fmt.Println("Pad endet bereits mit Zeilenumbruch – Changeset wird nicht erzeugt.") + p.logger.Infof("Pad already ends with newline - not appending another.") p.poolLock.Unlock() return } @@ -101,7 +103,7 @@ func (p *Pad) Append(text string) { newChangeset, err := changeset.MakeSplice(p.atext.Text, start, 0, text, &emptyAttribs, p.apool) if err != nil { p.poolLock.Unlock() - fmt.Printf("Error creating changeset: %v\n", err) + p.logger.Errorf("Error creating changeset: %v", err) return } @@ -109,7 +111,7 @@ func (p *Pad) Append(text string) { unpacked, err := changeset.Unpack(newChangeset) if err != nil { p.poolLock.Unlock() - fmt.Printf("Error unpacking changeset: %v\n", err) + p.logger.Errorf("Error unpacking changeset: %v", err) return } newChangeset = changeset.Pack(unpacked.OldLen, unpacked.NewLen, unpacked.Ops, unpacked.CharBank) @@ -117,7 +119,7 @@ func (p *Pad) Append(text string) { // Validate generated changeset header: oldLen should equal current local text length if unpacked.OldLen != start { p.poolLock.Unlock() - fmt.Printf("Generated changeset oldLen mismatch: expected %d got %d; not sending\n", start, unpacked.OldLen) + p.logger.Errorf("Generated changeset oldLen mismatch: expected %d got %d; not sending", start, unpacked.OldLen) // emit an error event so callers/tests can react p.emit("append_error", map[string]interface{}{"error": "oldLen_mismatch", "expected": start, "got": unpacked.OldLen}) return @@ -126,7 +128,7 @@ func (p *Pad) Append(text string) { newAText, err := changeset.ApplyToAText(newChangeset, *p.atext, *p.apool) if err != nil { p.poolLock.Unlock() - fmt.Printf("Error applying changeset: %v\n", err) + p.logger.Errorf("Error applying changeset: %v", err) return } @@ -160,7 +162,7 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { } else { parsedUrl, err := url.Parse(host) if err != nil { - fmt.Println("Invalid host URL:", err) + logger.Warnf("Invalid host URL: %v", err) os.Exit(1) } padState.Host = fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Host) @@ -177,15 +179,15 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { httpClient := &http.Client{} fullUrl := fmt.Sprintf("%s%s/p/%s", padState.Host, padState.Path, padState.PadId) - fmt.Printf("Getting Pad at %s\n", fullUrl) + logger.Infof("Getting Pad at %s", fullUrl) resp, err := httpClient.Get(fullUrl) if err != nil { - fmt.Printf("Failed to connect to pad at %s: %v\n", fullUrl, err) + logger.Errorf("Failed to connect to pad at %s: %v", fullUrl, err) os.Exit(1) } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - fmt.Printf("Failed to connect to pad at %s, status: %s, body: %s\n", fullUrl, resp.Status, string(body)) + logger.Errorf("Failed to connect to pad at %s, status: %s, body: %s", fullUrl, resp.Status, string(body)) os.Exit(1) } defer func() { @@ -193,19 +195,19 @@ func connect(host string, logger *zap.SugaredLogger) *Pad { }() wsUrl := fmt.Sprintf("%s/%ssocket.io", strings.Replace(padState.Host, "http", "ws", 1), padState.Path) - fmt.Printf("Connecting to WebSocket at %s\n", wsUrl) + logger.Infof("Connecting to WebSocket at %s", wsUrl) connection, resp, err := websocket.DefaultDialer.Dial(wsUrl, nil) if err != nil { - fmt.Printf("WebSocket connection failed: %v\n", err) + logger.Errorf("WebSocket connection failed: %v", err) if resp != nil { - fmt.Printf("Response Status: %s\n", resp.Status) + logger.Warnf("Response Status: %s", resp.Status) } os.Exit(1) } var authorToken = "t." + utils.RandomString(20) - pad := NewPad(padState.Host, padState.PadId, connection) + pad := NewPad(padState.Host, padState.PadId, connection, logger) go func() { // Recover to avoid crashing the whole process on unexpected panics in the reader loop @@ -466,12 +468,12 @@ func (p *Pad) sendMessage(optMsg *PadChangeset) { if optMsg != nil { if p.outgoing != nil { if optMsg.baseRev != p.outgoing.baseRev { - fmt.Println("Dropping outgoing changeset due to baseRev mismatch") + p.logger.Warnf("Dropping outgoing changeset due to baseRev mismatch") return } tempStr, err := changeset.Compose(p.outgoing.changeset, optMsg.changeset, p.apool) if err != nil { - fmt.Printf("Error composing outgoing changesets: %v\n", err) + p.logger.Errorf("Error composing outgoing changesets: %v", err) return } p.outgoing.changeset = *tempStr @@ -485,7 +487,7 @@ func (p *Pad) sendMessage(optMsg *PadChangeset) { apoolCreated := apool.NewAPool() changeset.MoveOpsToNewPool(p.inFlight.changeset, p.apool, &apoolCreated) wirePool := apoolCreated.ToJsonable() - fmt.Println("Sending changeset:", p.inFlight.changeset) + p.logger.Debugf("Sending changeset: %s", p.inFlight.changeset) msg := ws.UserChange{ Event: "message", Data: ws.UserChangeData{ @@ -555,16 +557,16 @@ func RunFromCLI(logger *zap.SugaredLogger, args []string) { } if host == "" { - fmt.Println("No host specified..") + logger.Warnf("No host specified..") return } if appendStr != "" { pad := connect(host, logger) pad.OnConnected(func(_ *Pad) { - fmt.Println("CLI Connected, appending...") + logger.Infof("CLI Connected, appending...") pad.Append(appendStr) - fmt.Printf("Appended %q to %s\n", appendStr, host) + logger.Infof("Appended %q to %s", appendStr, host) if os.Getenv("GO_TEST_MODE") == "true" { pad.emit("append_done", nil) } else { @@ -581,7 +583,7 @@ func RunFromCLI(logger *zap.SugaredLogger, args []string) { pad.Close() return case <-time.After(10 * time.Second): - fmt.Println("Append timeout") + logger.Warnf("Append timeout") pad.Close() return } @@ -591,15 +593,11 @@ func RunFromCLI(logger *zap.SugaredLogger, args []string) { } else { pad := connect(host, logger) pad.OnConnected(func(padState *Pad) { - fmt.Printf("Connected to %s with padId %s\n", padState.host, padState.padId) - fmt.Print("\u001b[2J\u001b[0;0H") - if padState.atext != nil { - fmt.Println("Pad Contents", "\n"+padState.atext.Text) - } + logger.Infof("Connected to %s with padId %s", padState.host, padState.padId) + logger.Debugf("Pad Contents: \n%s", padState.atext.Text) }) pad.OnNewContents(func(atext apool.AText) { - fmt.Print("\u001b[2J\u001b[0;0H") - fmt.Println("Pad Contents", "\n"+atext.Text) + logger.Debugf("Pad Contents: \n%s", atext.Text) }) done := make(chan struct{}) diff --git a/lib/db/MySQLDB.go b/lib/db/MySQLDB.go index 907c0df..08ad602 100644 --- a/lib/db/MySQLDB.go +++ b/lib/db/MySQLDB.go @@ -726,10 +726,14 @@ func (d MysqlDB) SetAuthorByToken(token, authorId string) error { * @param {String} author The id of the author */ func (d MysqlDB) GetAuthor(author string) (*db.AuthorDB, error) { - - var resultedSQL, args, err = mysql.Select("*"). + var resultedSQL, args, err = mysql.Select("globalAuthor.*, padRev.id"). From("globalAuthor"). - Where(sq.Eq{"id": author}).ToSql() + LeftJoin("padRev ON globalAuthor.id = padRev.authorId"). + Where(sq.Eq{"globalAuthor.id": author}).ToSql() + + if err != nil { + return nil, err + } query, err := d.sqlDB.Query(resultedSQL, args...) if err != nil { @@ -738,14 +742,37 @@ func (d MysqlDB) GetAuthor(author string) (*db.AuthorDB, error) { defer query.Close() var authorDB *db.AuthorDB + for query.Next() { - var authorCopy db.AuthorDB - query.Scan(&authorCopy.ID, &authorCopy.ColorId, &authorCopy.Name, &authorCopy.Timestamp) - authorDB = &authorCopy - return authorDB, nil + var padID sql.NullString + + if authorDB == nil { + authorDB = &db.AuthorDB{ + PadIDs: make(map[string]struct{}), + } + err = query.Scan(&authorDB.ID, &authorDB.ColorId, &authorDB.Name, + &authorDB.Timestamp, &padID) + if err != nil { + return nil, err + } + } else { + var dummy1, dummy2, dummy3, dummy4 interface{} + err = query.Scan(&dummy1, &dummy2, &dummy3, &dummy4, &padID) + if err != nil { + return nil, err + } + } + + if padID.Valid { + authorDB.PadIDs[padID.String] = struct{}{} + } } - return nil, errors.New(AuthorNotFoundError) + if authorDB == nil { + return nil, errors.New(AuthorNotFoundError) + } + + return authorDB, nil } func (d MysqlDB) GetAuthorByToken(token string) (*string, error) { diff --git a/lib/db/PostgresDB.go b/lib/db/PostgresDB.go index b90daab..ac7a495 100644 --- a/lib/db/PostgresDB.go +++ b/lib/db/PostgresDB.go @@ -733,10 +733,14 @@ func (d PostgresDB) SetAuthorByToken(token, authorId string) error { * @param {String} author The id of the author */ func (d PostgresDB) GetAuthor(author string) (*db.AuthorDB, error) { - - var resultedSQL, args, err = psql.Select("*"). + var resultedSQL, args, err = psql.Select("globalAuthor.*, padRev.id"). From("globalAuthor"). - Where(sq.Eq{"id": author}).ToSql() + LeftJoin("padRev ON globalAuthor.id = padRev.authorId"). + Where(sq.Eq{"globalAuthor.id": author}).ToSql() + + if err != nil { + return nil, err + } query, err := d.sqlDB.Query(resultedSQL, args...) if err != nil { @@ -745,14 +749,37 @@ func (d PostgresDB) GetAuthor(author string) (*db.AuthorDB, error) { defer query.Close() var authorDB *db.AuthorDB + for query.Next() { - var authorCopy db.AuthorDB - query.Scan(&authorCopy.ID, &authorCopy.ColorId, &authorCopy.Name, &authorCopy.Timestamp) - authorDB = &authorCopy - return authorDB, nil + var padID sql.NullString + + if authorDB == nil { + authorDB = &db.AuthorDB{ + PadIDs: make(map[string]struct{}), + } + err = query.Scan(&authorDB.ID, &authorDB.ColorId, &authorDB.Name, + &authorDB.Timestamp, &padID) + if err != nil { + return nil, err + } + } else { + var dummy1, dummy2, dummy3, dummy4 interface{} + err = query.Scan(&dummy1, &dummy2, &dummy3, &dummy4, &padID) + if err != nil { + return nil, err + } + } + + if padID.Valid { + authorDB.PadIDs[padID.String] = struct{}{} + } } - return nil, errors.New(AuthorNotFoundError) + if authorDB == nil { + return nil, errors.New(AuthorNotFoundError) + } + + return authorDB, nil } func (d PostgresDB) GetAuthorByToken(token string) (*string, error) { diff --git a/lib/db/SQLiteDB.go b/lib/db/SQLiteDB.go index 80a6c54..6eb33cf 100644 --- a/lib/db/SQLiteDB.go +++ b/lib/db/SQLiteDB.go @@ -730,25 +730,53 @@ func (d SQLiteDB) SetAuthorByToken(token, authorId string) error { * @param {String} author The id of the author */ func (d SQLiteDB) GetAuthor(author string) (*db.AuthorDB, error) { - - var resultedSQL, args, err = sq.Select("*"). + var resultedSQL, args, err = sq.Select("globalAuthor.*, padRev.id"). From("globalAuthor"). - Where(sq.Eq{"id": author}).ToSql() + LeftJoin("padRev ON globalAuthor.id = padRev.authorId"). + Where(sq.Eq{"globalAuthor.id": author}).ToSql() + + if err != nil { + return nil, err + } query, err := d.sqlDB.Query(resultedSQL, args...) if err != nil { return nil, err } defer query.Close() + + var authorDB *db.AuthorDB + for query.Next() { - var authorDB *db.AuthorDB - var authorCopy db.AuthorDB - query.Scan(&authorCopy.ID, &authorCopy.ColorId, &authorCopy.Name, &authorCopy.Timestamp) - authorDB = &authorCopy - return authorDB, nil + var padID sql.NullString + + if authorDB == nil { + authorDB = &db.AuthorDB{ + PadIDs: make(map[string]struct{}), + } + err = query.Scan(&authorDB.ID, &authorDB.ColorId, &authorDB.Name, + &authorDB.Timestamp, &padID) + if err != nil { + return nil, err + } + } else { + var dummy1, dummy2, dummy3, dummy4 interface{} + err = query.Scan(&dummy1, &dummy2, &dummy3, &dummy4, &padID) + if err != nil { + return nil, err + } + } + + if padID.Valid { + authorDB.PadIDs[padID.String] = struct{}{} + } } - return nil, errors.New(AuthorNotFoundError) + if authorDB == nil { + return nil, errors.New(AuthorNotFoundError) + } + + return authorDB, nil } func (d SQLiteDB) GetAuthorByToken(token string) (*string, error) { diff --git a/lib/test/api/author/author_test.go b/lib/test/api/author/author_test.go index 830edd6..7a1f978 100644 --- a/lib/test/api/author/author_test.go +++ b/lib/test/api/author/author_test.go @@ -9,7 +9,6 @@ import ( "github.com/ether/etherpad-go/lib/api/author" "github.com/ether/etherpad-go/lib/test/testutils" - "github.com/gofiber/fiber/v2" "github.com/stretchr/testify/assert" ) @@ -108,15 +107,18 @@ func testGetExistingAuthor(t *testing.T, tsStore testutils.TestDataStore) { } func testGetAuthorPadIDS(t *testing.T, tsStore testutils.TestDataStore) { - t.Skip() - // Skip because we cannot yet map pads to authors - app := fiber.New() author.Init(tsStore.ToInitStore()) dbAuthorToSave := testutils.GenerateDBAuthor() assert.NoError(t, tsStore.DS.SaveAuthor(dbAuthorToSave)) + padText := "Hallo123\n" + _, err := tsStore.PadManager.GetPad("pad123", &padText, &dbAuthorToSave.ID) + assert.NoError(t, err) req := httptest.NewRequest("GET", "/author/"+dbAuthorToSave.ID+"/pads", nil) - resp, _ := app.Test(req, 10) + resp, err := tsStore.App.Test(req, 100) + if err != nil { + t.Errorf("error getting author pads: %v", err) + } if resp.StatusCode != 200 { t.Errorf("should return 200 for existing author pads, got %d", resp.StatusCode) } diff --git a/lib/test/testutils/general/stringUtils.go b/lib/test/testutils/general/stringUtils.go index e1c5299..c45badf 100644 --- a/lib/test/testutils/general/stringUtils.go +++ b/lib/test/testutils/general/stringUtils.go @@ -29,7 +29,7 @@ func RandomMultiline(approxMaxLines, approxMaxCols int) string { } func RandomInlineString(length int) string { - const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 !@#$%^&*()_+-=[]{}|;:,.<>?" + const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" var result strings.Builder result.Grow(length) diff --git a/lib/test/ws/pad_message_handler_test.go b/lib/test/ws/pad_message_handler_test.go index 905ad99..32d60ba 100644 --- a/lib/test/ws/pad_message_handler_test.go +++ b/lib/test/ws/pad_message_handler_test.go @@ -1623,18 +1623,8 @@ func testHandleMessageUserChangeReadonly(t *testing.T, ds testutils.TestDataStor Data: ws.UserChangeData{ Component: "pad", Type: "USER_CHANGES", - Data: struct { - Apool struct { - NumToAttrib map[int][]string `json:"numToAttrib"` - NextNum int `json:"nextNum"` - } `json:"apool"` - BaseRev int `json:"baseRev"` - Changeset string `json:"changeset"` - }{ - Apool: struct { - NumToAttrib map[int][]string `json:"numToAttrib"` - NextNum int `json:"nextNum"` - }{ + Data: ws.UserChangeDataData{ + Apool: ws.UserChangeDataDataApool{ NumToAttrib: map[int][]string{}, NextNum: 0, },