Skip to content

Commit 3d3fe29

Browse files
authored
Merge pull request #389 from HyperloopUPV-H8/backend/transport
Backend/transport
2 parents 213f71f + db40d44 commit 3d3fe29

File tree

5 files changed

+199
-68
lines changed

5 files changed

+199
-68
lines changed

backend/cmd/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ func main() {
179179
orderTopic := order_topic.NewSendTopic()
180180
loggerTopic := logger_topic.NewEnableTopic()
181181
loggerTopic.SetDataLogger(subloggers[data_logger.Name].(*data_logger.Logger))
182+
loggerHandler.SetOnStart(func() {
183+
if err := loggerTopic.NotifyStarted(); err != nil {
184+
trace.Error().Err(err).Msg("failed to notify logger started")
185+
}
186+
})
182187

183188
messageTopic := message_topic.NewUpdateTopic()
184189
stateOrderTopic := order_topic.NewState(idToBoard, trace.Logger)

backend/pkg/broker/topics/logger/enable.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func (enable *Enable) handleVariables(_ websocket.ClientId, message *websocket.M
104104
return nil
105105
}
106106

107+
func (enable *Enable) NotifyStarted() error {
108+
enable.isRunning.Store(true)
109+
return enable.broadcastState()
110+
}
111+
107112
func (enable *Enable) broadcastState() error {
108113
payload, err := json.Marshal(enable.isRunning.Load())
109114
if err != nil {

backend/pkg/logger/logger.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type Logger struct {
2424
subloggers map[abstraction.LoggerName]abstraction.Logger
2525

2626
trace zerolog.Logger
27+
28+
onStart func()
2729
}
2830

2931
/**************
@@ -83,9 +85,17 @@ func (logger *Logger) Start() error {
8385
}
8486

8587
logger.trace.Info().Msg("started")
88+
89+
if logger.onStart != nil {
90+
logger.onStart()
91+
}
8692
return nil
8793
}
8894

95+
func (logger *Logger) SetOnStart(cb func()) {
96+
logger.onStart = cb
97+
}
98+
8999
// PushRecord works as a proxy for the PushRecord method of the subloggers
90100
func (logger *Logger) PushRecord(record abstraction.LoggerRecord) error {
91101

backend/pkg/transport/transport.go

Lines changed: 109 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -119,44 +119,17 @@ func (transport *Transport) HandleServer(config tcp.ServerConfig, local string)
119119
// handleTCPConn is used to handle the specific TCP connections to the boards. It detects errors caused
120120
// on concurrent reads and writes, so other routines should not worry about closing or handling errors
121121
func (transport *Transport) handleTCPConn(conn net.Conn) error {
122-
if tcpConn, ok := conn.(*net.TCPConn); ok {
123-
transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection linger")
124-
err := tcpConn.SetLinger(0)
125-
if err != nil {
126-
transport.errChan <- err
127-
transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set linger")
128-
}
129-
130-
transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection no delay")
131-
err = tcpConn.SetNoDelay(true)
132-
if err != nil {
133-
transport.errChan <- err
134-
transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set no delay")
135-
}
136-
}
122+
transport.configureTCPConn(conn)
137123

138-
target, ok := transport.ipToTarget[conn.RemoteAddr().(*net.TCPAddr).IP.String()]
139-
if !ok {
140-
conn.Close()
141-
transport.logger.Warn().Str("remoteAddress", conn.RemoteAddr().(*net.TCPAddr).IP.String()).Msg("ip target not found")
142-
err := ErrUnknownTarget{Remote: conn.RemoteAddr()}
143-
transport.errChan <- err
124+
target, err := transport.targetFromTCPConn(conn)
125+
if err != nil {
144126
return err
145127
}
146128

147129
connectionLogger := transport.logger.With().Str("remoteAddress", conn.RemoteAddr().String()).Str("target", string(target)).Logger()
148130
connectionLogger.Info().Msg("new connection")
149131

150-
if err := func() error {
151-
transport.connectionsMx.Lock()
152-
defer transport.connectionsMx.Unlock()
153-
if _, ok := transport.connections[target]; ok {
154-
conn.Close()
155-
connectionLogger.Debug().Msg("already connected")
156-
return ErrTargetAlreadyConnected{Target: target}
157-
}
158-
return nil
159-
}(); err != nil {
132+
if err := transport.rejectIfConnectedTCPConn(target, conn, connectionLogger); err != nil {
160133
transport.errChan <- err
161134
return err
162135
}
@@ -167,56 +140,125 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error {
167140
connectionLogger.Info().Msg("close")
168141
}()
169142

170-
func() {
171-
transport.connectionsMx.Lock()
172-
defer transport.connectionsMx.Unlock()
173-
connectionLogger.Debug().Msg("added connection")
174-
transport.connections[target] = conn
175-
}()
176-
defer func() {
177-
transport.connectionsMx.Lock()
178-
defer transport.connectionsMx.Unlock()
179-
connectionLogger.Debug().Msg("removed connection")
180-
delete(transport.connections, target)
181-
}()
143+
cleanupConn := transport.registerTCPConn(target, conn, connectionLogger)
144+
defer cleanupConn()
182145

183146
transport.api.ConnectionUpdate(target, true)
184147
defer transport.api.ConnectionUpdate(target, false)
185148

149+
transport.readLoopTCPConn(conn, connectionLogger)
150+
151+
err = <-errChan
152+
if err != nil {
153+
connectionLogger.Error().Stack().Err(err).Msg("")
154+
transport.errChan <- err
155+
}
156+
return err
157+
}
158+
159+
// configureTCPConn sets TCP-level options like linger and no-delay.
160+
func (transport *Transport) configureTCPConn(conn net.Conn) {
161+
tcpConn, ok := conn.(*net.TCPConn)
162+
if !ok {
163+
return
164+
}
165+
166+
remote := conn.RemoteAddr().String()
167+
168+
transport.logger.Trace().Str("remoteAddress", remote).Msg("setting connection linger")
169+
err := tcpConn.SetLinger(0)
170+
if err != nil {
171+
transport.errChan <- err
172+
transport.logger.Error().Stack().Err(err).Str("remoteAddress", remote).Msg("set linger")
173+
}
174+
175+
transport.logger.Trace().Str("remoteAddress", remote).Msg("setting connection no delay")
176+
err = tcpConn.SetNoDelay(true)
177+
if err != nil {
178+
transport.errChan <- err
179+
transport.logger.Error().Stack().Err(err).Str("remoteAddress", remote).Msg("set no delay")
180+
}
181+
}
182+
183+
// targetFromTCPConn maps the remote IP address of the connection to a TransportTarget
184+
// using the ipToTarget map.
185+
func (transport *Transport) targetFromTCPConn(conn net.Conn) (abstraction.TransportTarget, error) {
186+
remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
187+
ip := remoteAddr.IP.String()
188+
189+
target, ok := transport.ipToTarget[ip]
190+
if !ok {
191+
conn.Close()
192+
transport.logger.Warn().Str("remoteAddress", ip).Msg("ip target not found")
193+
err := ErrUnknownTarget{Remote: conn.RemoteAddr()}
194+
transport.errChan <- err
195+
var zero abstraction.TransportTarget
196+
return zero, err
197+
198+
}
199+
return target, nil
200+
}
201+
202+
// rejectIfConnectedTCPConn closes and rejects conn if target already has an active connection.
203+
func (transport *Transport) rejectIfConnectedTCPConn(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger,) error {
204+
transport.connectionsMx.Lock()
205+
defer transport.connectionsMx.Unlock()
206+
207+
if _, ok := transport.connections[target]; ok {
208+
conn.Close()
209+
logger.Debug().Msg("already connected")
210+
err := ErrTargetAlreadyConnected{Target: target}
211+
transport.errChan <- err
212+
return err
213+
}
214+
return nil
215+
}
216+
217+
// registerTCPConn stores conn for target and returns a cleanup that removes it.
218+
func (transport *Transport) registerTCPConn(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger) func() {
219+
transport.connectionsMx.Lock()
220+
logger.Debug().Msg("added connection")
221+
transport.connections[target] = conn
222+
transport.connectionsMx.Unlock()
223+
224+
return func() {
225+
transport.connectionsMx.Lock()
226+
logger.Debug().Msg("removed connection")
227+
delete(transport.connections, target)
228+
transport.connectionsMx.Unlock()
229+
}
230+
}
231+
232+
// readLoopTCPConn reads packets from conn and forwards notifications until an error occurs.
233+
func (transport *Transport) readLoopTCPConn(conn net.Conn, logger zerolog.Logger) {
234+
from := conn.RemoteAddr().String()
235+
to := conn.LocalAddr().String()
236+
186237
go func() {
187238
for {
188239
packet, err := transport.decoder.DecodeNext(conn)
189240
if err != nil {
190-
connectionLogger.Error().Stack().Err(err).Msg("decode")
241+
logger.Error().Stack().Err(err).Msg("decode")
191242
transport.errChan <- err
192243
transport.SendFault()
193244
return
194245
}
195246

196247
if transport.propagateFault && packet.Id() == 0 {
197-
connectionLogger.Info().Msg("replicating packet with id 0 to all boards")
248+
logger.Info().Msg("replicating packet with id 0 to all boards")
198249
err := transport.handlePacketEvent(NewPacketMessage(packet))
199250
if err != nil {
200-
connectionLogger.Error().Err(err).Msg("failed to replicate packet")
251+
logger.Error().Err(err).Msg("failed to replicate packet")
201252
}
202253
}
203254

204-
from := conn.RemoteAddr().String()
205-
to := conn.LocalAddr().String()
206-
207-
connectionLogger.Trace().Type("type", packet).Msg("packet")
255+
logger.Trace().Type("type", packet).Msg("packet")
208256
transport.api.Notification(NewPacketNotification(packet, from, to, time.Now()))
209257
}
210258
}()
211-
212-
err := <-errChan
213-
if err != nil {
214-
connectionLogger.Error().Stack().Err(err).Msg("")
215-
transport.errChan <- err
216-
}
217-
return err
218259
}
219260

261+
220262
// SendMessage triggers an event to send something to the vehicle. Some messages
221263
// might additional means to pass information around (e.g. file read and write)
222264
func (transport *Transport) SendMessage(message abstraction.TransportMessage) error {
@@ -233,8 +275,10 @@ func (transport *Transport) SendMessage(message abstraction.TransportMessage) er
233275
err = ErrUnrecognizedEvent{message.Event()}
234276
}
235277
// handlePacketEvent already sends the error through the channel, so this avoids duplicates
236-
if _, ok := err.(ErrConnClosed); !ok {
237-
transport.errChan <- err
278+
if err != nil {
279+
if _, ok := err.(ErrConnClosed); !ok {
280+
transport.errChan <- err
281+
}
238282
}
239283
return err
240284
}
@@ -326,14 +370,18 @@ func (transport *Transport) handlePacketEvent(message PacketMessage) error {
326370
// handleFileWrite writes a file through tftp to the blcu
327371
func (transport *Transport) handleFileWrite(message FileWriteMessage) error {
328372
_, err := transport.tftp.WriteFile(message.Filename(), tftp.BinaryMode, message)
329-
transport.errChan <- err
373+
if err != nil {
374+
transport.errChan <- err
375+
}
330376
return err
331377
}
332378

333379
// handleFileRead reads a file through tftp from the blcu
334380
func (transport *Transport) handleFileRead(message FileReadMessage) error {
335381
_, err := transport.tftp.ReadFile(message.Filename(), tftp.BinaryMode, message)
336-
transport.errChan <- err
382+
if err != nil {
383+
transport.errChan <- err
384+
}
337385
return err
338386
}
339387

0 commit comments

Comments
 (0)