-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
92 lines (73 loc) · 1.69 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main
import (
"bufio"
"encoding/json"
"errors"
"io"
"log"
"net"
"strings"
"github.com/iAziz786/frenzy/client"
"github.com/iAziz786/frenzy/constant"
"github.com/iAziz786/frenzy/server"
)
func main() {
l, err := net.Listen("tcp", ":9022")
if err != nil {
log.Fatalf("unable to listen the server %s", err)
}
defer l.Close()
msgStream := make(chan client.Message)
defer close(msgStream)
for {
conn, err := l.Accept()
if err != nil {
log.Printf("unable to accept connection %s\n", err)
continue
}
go handleConn(conn)
}
}
func handleConn(conn net.Conn) {
defer conn.Close()
// First send the metadata about the connection, like whether the
// connection is from a producer or a consumer
reader := bufio.NewReader(conn)
b, err := reader.ReadString(constant.MsgEndIdent)
if err != nil {
log.Fatalf("unable read all the data %s\n", err)
return
}
// Removing the whitespace which is not necessary
b = strings.Trim(b, string(constant.MsgEndIdent))
switch b {
case string(client.ProducerConn):
// do producer stuff
defer server.Produce(conn)
break
case string(client.ConsumerConn):
// do consumer stuff
msg, err := readClientMessage(conn)
if err != nil {
log.Printf("unable to read initial boostrap data %s", err)
return
}
defer server.Consume(conn, msg.Topic)
break
default:
log.Println("unknown connection type")
}
}
func readClientMessage(reader io.Reader) (*client.Message, error) {
b := make([]byte, 1e2)
n, err := reader.Read(b)
if err != nil {
return nil, err
}
b = b[:n]
var msg client.Message
if err := json.Unmarshal(b, &msg); err != nil {
return nil, errors.New("unable to covert client message")
}
return &msg, nil
}