-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.txt
403 lines (384 loc) · 11.7 KB
/
test.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
diff --git a/chatterbox-cli b/chatterbox-cli
index b7caa5f..752469b 100755
Binary files a/chatterbox-cli and b/chatterbox-cli differ
diff --git a/client/client.go b/client/client.go
index 8e895a0..8b907f1 100644
--- a/client/client.go
+++ b/client/client.go
@@ -25,20 +25,6 @@ func Client() {
panic(err)
}
- message := &pb.Message{
- User: user,
- Message: "Hello World",
- }
-
- chatEvent := &pb.ChatEvent{
- EventID: 1,
- Event: &pb.ChatEvent_UserMessage{
- UserMessage: message,
- },
- }
-
- fmt.Println("chatEventCreation: ", chatEvent)
-
// Configure the TLS connection
tlsConfig := &tls.Config{
InsecureSkipVerify: true, // Disable verification for self-signed certs; use `false` for production
@@ -51,23 +37,54 @@ func Client() {
}
defer conn.Close()
- // begin go routine for receiving chat events
- chatEventChan := make(chan *pb.ChatEvent)
- userInputChan := make(chan string)
- // begin go routine for receiving chat events
- go receiver.ReceiveChatEvents(conn, chatEventChan)
- // begin go routine for printing chat events
- go sender.SendUserMessages(conn, userInputChan, user)
- // Goroutine to listen for user input
+ sendChatEventChan := make(chan *pb.ChatEvent)
+ receiveChatEventChan := make(chan *pb.ChatEvent)
+
+ go receiver.ReceiveChatEvents(conn, receiveChatEventChan)
+ go sender.SendChatEvent(conn, sendChatEventChan)
+
+ // Start message receiver goroutine
+ go func() {
+ fmt.Println("Message receiver started...")
+ for chatEvent := range receiveChatEventChan {
+ switch event := chatEvent.Event.(type) {
+ case *pb.ChatEvent_UserMessage:
+ // Clear current line
+ fmt.Printf("\r\033[K") // Clear the current line
+ // Print received message
+ fmt.Printf("\n%s: %s\n", event.UserMessage.User.DisplayName, event.UserMessage.Message)
+ // Reprint prompt
+ fmt.Print("Enter message: ")
+ }
+ }
+ fmt.Println("Message receiver stopped")
+ }()
+
go func() {
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("Enter message: ")
- userInput, _ := reader.ReadString('\n')
+ userInput, err := reader.ReadString('\n')
+ if err != nil {
+ fmt.Println("Error reading input:", "err", err)
+ continue
+ }
+
userInput = strings.TrimSpace(userInput)
- // check if text isnt empty
if userInput != "" {
- userInputChan <- userInput
+ message := &pb.Message{
+ User: user,
+ Message: userInput,
+ }
+
+ chatEvent := &pb.ChatEvent{
+ EventID: 1,
+ Event: &pb.ChatEvent_UserMessage{
+ UserMessage: message,
+ },
+ }
+ sendChatEventChan <- chatEvent
+ fmt.Println("Message queued for sending")
}
}
}()
diff --git a/receiver/receiver.go b/receiver/receiver.go
index 0b7e145..befe253 100644
--- a/receiver/receiver.go
+++ b/receiver/receiver.go
@@ -1,7 +1,6 @@
package receiver
import (
- "bufio"
"chatterbox-cli/serialization"
"crypto/tls"
"encoding/binary"
@@ -13,44 +12,47 @@ import (
)
func ReceiveChatEvents(conn *tls.Conn, chatEvents chan *pb.ChatEvent) {
- defer close(chatEvents) // Close the channel when we're done
+ defer close(chatEvents)
+ log.Info("Starting chat event receiver")
- reader := bufio.NewReader(conn)
+ lengthBuf := make([]byte, 4)
for {
- // Read the length prefix (assuming it's a uint32 for this example)
- var length uint32
- log.Info("Attempting to read message length")
- err := binary.Read(reader, binary.BigEndian, &length)
+ log.Info("Waiting for next message length...")
+ // Read exactly 4 bytes for the length prefix
+ _, err := io.ReadFull(conn, lengthBuf)
if err != nil {
+ if err == io.EOF {
+ log.Info("Connection closed")
+ return
+ }
log.Error("Error reading message length:", "err", err)
- // Possibly log the reader buffer here to see if data is being received
- break
+ return
}
- log.Info("Message length received:", "length", length)
- // Read the actual message based on the length
+ length := binary.BigEndian.Uint32(lengthBuf)
+ log.Info("Received message length indicator:", "length", length)
+
+ // Read the message payload
rawData := make([]byte, length)
- _, err = io.ReadFull(reader, rawData)
+ _, err = io.ReadFull(conn, rawData)
if err != nil {
- // Handle the case where the connection was closed unexpectedly
if err == io.EOF {
- log.Info("Client disconnected")
- break
+ log.Info("Connection closed during message read")
+ return
}
- log.Error("Error reading message:", "err", err)
- break
+ log.Error("Error reading message payload:", "err", err)
+ return
}
- // Process the message (deserialize it)
- log.Info("Received raw data:", "rawData", rawData)
+ log.Info("Received raw data of length:", "bytes", len(rawData))
+
deserializedChatEvent, err := serialization.DeserializeChatEvent(rawData)
if err != nil {
log.Error("Error deserializing chat event:", "err", err)
continue
}
- // Send the deserialized chat event to the channel
- log.Info("Received chat event:", "chatEvent", deserializedChatEvent.String())
+ log.Info("Successfully deserialized chat event, sending to channel")
chatEvents <- deserializedChatEvent
}
}
diff --git a/sender/sender.go b/sender/sender.go
index aa3d192..9a521f9 100644
--- a/sender/sender.go
+++ b/sender/sender.go
@@ -6,50 +6,44 @@ import (
"chatterbox-cli/serialization"
"crypto/tls"
"encoding/binary"
+
"log"
)
-func SendUserMessages(conn *tls.Conn, userInputChan chan string, user *pb.User) {
- for {
- userInput := <-userInputChan
-
- message := &pb.Message{
- User: user,
- Message: userInput,
- }
-
- chatEvent := &pb.ChatEvent{
- EventID: 1,
- Event: &pb.ChatEvent_UserMessage{
- UserMessage: message,
- },
- }
-
- // Serialize the chatEvent
- serializedChatEvent, err := serialization.SerializeChatEvent(chatEvent)
- // Send the chat message to the server
-
- // Create a buffer to hold length + serialized data
- var buf bytes.Buffer
-
- // Write the length of the serialized message as a uint32
- err = binary.Write(&buf, binary.BigEndian, uint32(len(serializedChatEvent)))
- if err != nil {
- log.Fatalf("Error writing message length: %s", err)
- }
-
- // Write the actual serialized data to the buffer
- _, err = buf.Write(serializedChatEvent)
- if err != nil {
- log.Fatalf("Error writing serialized message: %s", err)
- }
-
- // Send the complete buffer to the server
- _, err = conn.Write(buf.Bytes())
- if err != nil {
- log.Fatalf("Error sending message to server: %s", err)
- }
-
- }
-
+func SendChatEvent(conn *tls.Conn, chatEvent chan *pb.ChatEvent) {
+ for {
+ // Wait for new messages from the channel
+ event := <-chatEvent
+
+ // Serialize the chatEvent
+ serializedChatEvent, err := serialization.SerializeChatEvent(event)
+ if err != nil {
+ log.Printf("Error serializing message: %s", err)
+ continue
+ }
+
+ // Create a buffer to hold length + serialized data
+ var buf bytes.Buffer
+
+ // Write the length of the serialized message as a uint32
+ err = binary.Write(&buf, binary.BigEndian, uint32(len(serializedChatEvent)))
+ if err != nil {
+ log.Printf("Error writing message length: %s", err)
+ continue
+ }
+
+ // Write the actual serialized data to the buffer
+ _, err = buf.Write(serializedChatEvent)
+ if err != nil {
+ log.Printf("Error writing serialized message: %s", err)
+ continue
+ }
+
+ // Send the complete buffer to the server
+ _, err = conn.Write(buf.Bytes())
+ if err != nil {
+ log.Printf("Error sending message to server: %s", err)
+ continue
+ }
+ }
}
diff --git a/server/server.go b/server/server.go
index c4524e5..3c19dfe 100644
--- a/server/server.go
+++ b/server/server.go
@@ -1,8 +1,6 @@
package server
import (
- "bufio"
- "bytes"
"chatterbox-cli/serialization"
"crypto/tls"
"encoding/binary"
@@ -55,30 +53,52 @@ func Server() {
func handleConnection(conn *tls.Conn) {
defer conn.Close()
- // Client management logic (unchanged)
+ // Client management logic
clientsMu.Lock()
clients = append(clients, conn)
+ clientIndex := len(clients) - 1
clientsMu.Unlock()
+
log.Info("Client joined:", "address", conn.RemoteAddr())
- defer func() { /* client removal logic */ }()
- reader := bufio.NewReader(conn)
+ // Proper client cleanup on disconnect
+ defer func() {
+ clientsMu.Lock()
+ if clientIndex < len(clients) {
+ clients = append(clients[:clientIndex], clients[clientIndex+1:]...)
+ }
+ clientsMu.Unlock()
+ log.Info("Client left:", "address", conn.RemoteAddr())
+ }()
+
+ lengthBuf := make([]byte, 4)
for {
- // Read the length prefix (assuming it's a uint32 for this example)
- var length uint32
- if err := binary.Read(reader, binary.BigEndian, &length); err != nil {
+ // Read exactly 4 bytes for the length prefix
+ _, err := io.ReadFull(conn, lengthBuf)
+ if err != nil {
+ if err == io.EOF {
+ log.Info("Connection closed")
+ return
+ }
log.Error("Error reading message length:", "err", err)
- break
+ return
}
- // Read the actual message based on the length
+ length := binary.BigEndian.Uint32(lengthBuf)
+
+ // Read the message payload
rawData := make([]byte, length)
- if _, err := io.ReadFull(reader, rawData); err != nil {
+ _, err = io.ReadFull(conn, rawData)
+ if err != nil {
+ if err == io.EOF {
+ log.Info("Connection closed during message read")
+ return
+ }
log.Error("Error reading message:", "err", err)
- break
+ return
}
- // Process the message (same as before)
+ // Process the message
deserializedChatEvent, err := serialization.DeserializeChatEvent(rawData)
if err != nil {
log.Error("Error deserializing chat event:", "err", err)
@@ -90,34 +110,45 @@ func handleConnection(conn *tls.Conn) {
}
}
-func broadcastChatEvent(conn *tls.Conn, chatEvent *pb.ChatEvent) {
- log.Info("Broadcasting message:", "chatEvent", chatEvent)
+func broadcastChatEvent(sender *tls.Conn, chatEvent *pb.ChatEvent) {
+ log.Info("Starting broadcast to all clients")
clientsMu.Lock()
defer clientsMu.Unlock()
+ log.Info("Number of connected clients:", "count", len(clients))
+
serializedChatEvent, err := serialization.SerializeChatEvent(chatEvent)
if err != nil {
log.Error("Error serializing chat event:", "err", err)
return
}
- var buf bytes.Buffer
- err = binary.Write(&buf, binary.BigEndian, uint32(len(serializedChatEvent)))
- if err != nil {
- log.Error("Error writing message length:", "err", err)
- }
+ message := make([]byte, 4+len(serializedChatEvent))
+ binary.BigEndian.PutUint32(message[:4], uint32(len(serializedChatEvent)))
+ copy(message[4:], serializedChatEvent)
- _, err = buf.Write(serializedChatEvent)
- if err != nil {
- log.Error("Error writing message:", "err", err)
- }
+ log.Info("Prepared message for broadcast:", "messageLength", len(message))
- for _, client := range clients {
- if client != conn {
- _, err := client.Write(buf.Bytes())
+ var failedClients []int
+ for i, client := range clients {
+ if client != sender { // Skip the sender
+ log.Info("Attempting to send to client:", "address", client.RemoteAddr())
+ n, err := client.Write(message)
if err != nil {
log.Error("Error sending message to client:", "err", err, "client", client.RemoteAddr())
+ failedClients = append(failedClients, i)
+ } else {
+ log.Info("Successfully sent message to client:", "address", client.RemoteAddr(), "bytes", n)
}
+ } else {
+ log.Info("Skipping sender:", "address", client.RemoteAddr())
}
}
+
+ // Remove failed clients in reverse order
+ for i := len(failedClients) - 1; i >= 0; i-- {
+ failedIndex := failedClients[i]
+ clients = append(clients[:failedIndex], clients[failedIndex+1:]...)
+ log.Info("Removed failed client at index:", "index", failedIndex)
+ }
}