diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 029622bf938..2f4f8605870 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -366,37 +366,37 @@ func (mm *messageManager) Open() { // Close stops the messageManager service. func (mm *messageManager) Close() { - log.Infof("messageManager - started execution of Close") + log.Infof("messageManager (%v) - started execution of Close", mm.name) mm.pollerTicks.Stop() mm.purgeTicks.Stop() - log.Infof("messageManager - stopped the ticks. Acquiring mu Lock") + log.Infof("messageManager (%v) - stopped the ticks. Acquiring mu Lock", mm.name) mm.mu.Lock() - log.Infof("messageManager - acquired mu Lock") + log.Infof("messageManager (%v) - acquired mu Lock", mm.name) if !mm.isOpen { - log.Infof("messageManager - manager is not open") + log.Infof("messageManager (%v) - manager is not open", mm.name) mm.mu.Unlock() return } mm.isOpen = false - log.Infof("messageManager - cancelling all receivers") + log.Infof("messageManager (%v) - cancelling all receivers", mm.name) for _, rcvr := range mm.receivers { rcvr.receiver.cancel() } mm.receivers = nil MessageStats.Set([]string{mm.name.String(), "ClientCount"}, 0) - log.Infof("messageManager - clearing cache") + log.Infof("messageManager (%v) - clearing cache", mm.name) mm.cache.Clear() - log.Infof("messageManager - sending a broadcast") + log.Infof("messageManager (%v) - sending a broadcast", mm.name) // This broadcast will cause runSend to exit. mm.cond.Broadcast() - log.Infof("messageManager - stopping VStream") + log.Infof("messageManager (%v) - stopping VStream", mm.name) mm.stopVStream() mm.mu.Unlock() - log.Infof("messageManager - Waiting for the wait group") + log.Infof("messageManager (%v) - Waiting for the wait group", mm.name) mm.wg.Wait() - log.Infof("messageManager - closed") + log.Infof("messageManager (%v) - closed", mm.name) } // Subscribe registers the send function as a receiver of messages @@ -414,7 +414,7 @@ func (mm *messageManager) Subscribe(ctx context.Context, send func(*sqltypes.Res } if err := receiver.Send(mm.fieldResult); err != nil { - log.Errorf("Terminating connection due to error sending field info: %v", err) + log.Errorf("messageManager (%v) - Terminating connection due to error sending field info: %v", mm.name, err) receiver.cancel() return done } @@ -578,7 +578,7 @@ func (mm *messageManager) runSend() { go func() { err := mm.send(context.Background(), receiver, &sqltypes.Result{Rows: rows}) // calls the offsetting mm.wg.Done() if err != nil { - log.Errorf("messageManager - send failed: %v", err) + log.Errorf("messageManager (%v) - send failed: %v", mm.name, err) } }() } @@ -621,7 +621,7 @@ func (mm *messageManager) send(ctx context.Context, receiver *receiverWithStatus // Log the error, but we still want to postpone the message. // Otherwise, if this is a chronic failure like "message too // big", we'll end up spamming non-stop. - log.Errorf("Error sending messages: %v: %v", qr, err) + log.Errorf("messageManager (%v) - Error sending messages: %v: %v", mm.name, qr, err) } return mm.postpone(ctx, mm.tsv, mm.ackWaitTime, ids) } @@ -652,7 +652,7 @@ func (mm *messageManager) startVStream() { } func (mm *messageManager) stopVStream() { - log.Infof("messageManager - calling stream cancel") + log.Infof("messageManager (%v) - calling stream cancel", mm.name) if mm.streamCancel != nil { mm.streamCancel() mm.streamCancel = nil @@ -664,12 +664,12 @@ func (mm *messageManager) runVStream(ctx context.Context) { err := mm.runOneVStream(ctx) select { case <-ctx.Done(): - log.Info("Context canceled, exiting vstream") + log.Info("messageManager (%v) - Context canceled, exiting vstream", mm.name) return default: } MessageStats.Add([]string{mm.name.String(), "VStreamFailed"}, 1) - log.Infof("VStream ended: %v, retrying in 5 seconds", err) + log.Infof("messageManager (%v) - VStream ended: %v, retrying in 5 seconds", mm.name, err) time.Sleep(5 * time.Second) } } @@ -815,7 +815,7 @@ func (mm *messageManager) runPoller() { mr, err := BuildMessageRow(row) if err != nil { mm.tsv.Stats().InternalErrors.Add("Messages", 1) - log.Errorf("Error reading message row: %v", err) + log.Errorf("messageManager (%v) - Error reading message row: %v", mm.name, err) continue } if !mm.cache.Add(mr) { @@ -836,7 +836,7 @@ func (mm *messageManager) runPurge() { count, err := mm.tsv.PurgeMessages(ctx, nil, mm, time.Now().Add(-mm.purgeAfter).UnixNano()) if err != nil { MessageStats.Add([]string{mm.name.String(), "PurgeFailed"}, 1) - log.Errorf("Unable to delete messages: %v", err) + log.Errorf("messageManager (%v) - Unable to delete messages: %v", mm.name, err) } else { MessageStats.Add([]string{mm.name.String(), "Purged"}, count) } @@ -939,7 +939,7 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* query, err := mm.readByPriorityAndTimeNext.GenerateQuery(bindVars, nil) if err != nil { mm.tsv.Stats().InternalErrors.Add("Messages", 1) - log.Errorf("Error reading rows from message table: %v", err) + log.Errorf("messageManager (%v) - Error reading rows from message table: %v", mm.name, err) return nil, err } qr := &sqltypes.Result{}