From e92f7ded3d538305cf0d47577fbd86039ec2d25c Mon Sep 17 00:00:00 2001 From: Branden J Brown Date: Thu, 14 Nov 2024 01:22:10 -0500 Subject: [PATCH] robot: remove the worker pool The concept of the worker pool was to limit the work the scheduler has to do by having an expanding pool of goroutines available for work at all times, rather than frequently destroying and recreating them. In practice, this wasn't useful because the work we do per message turned out to be much cheaper than expected. Even at 150+ messages per minute, the pool never reached its nominal cap. --- privmsg.go | 254 ++++++++++++++++++++++------------------------------- tmi.go | 164 +++++++++++++++++----------------- 2 files changed, 186 insertions(+), 232 deletions(-) diff --git a/privmsg.go b/privmsg.go index b51cb00..2eb5ec3 100644 --- a/privmsg.go +++ b/privmsg.go @@ -11,7 +11,6 @@ import ( "unicode/utf8" "gitlab.com/zephyrtronium/tmi" - "golang.org/x/sync/errgroup" "github.com/zephyrtronium/robot/brain" "github.com/zephyrtronium/robot/channel" @@ -22,127 +21,123 @@ import ( ) // tmiMessage processes a PRIVMSG from TMI. -func (robo *Robot) tmiMessage(ctx context.Context, group *errgroup.Group, send chan<- *tmi.Message, msg *tmi.Message) { +func (robo *Robot) tmiMessage(ctx context.Context, send chan<- *tmi.Message, msg *tmi.Message) { robo.Metrics.TMIMsgsCount.Observe(1) - // Run in a worker so that we don't block the message loop. - work := func(ctx context.Context) { - ch, _ := robo.channels.Load(msg.To()) - if ch == nil { - // TMI gives a WHISPER for a direct message, so this is a message to a - // channel that isn't configured. Ignore it. - return - } - m := message.FromTMI(msg) - log := slog.With(slog.String("trace", m.ID), slog.String("in", ch.Name)) - from := m.Sender - if ch.Ignore[from] { - log.InfoContext(ctx, "message from ignored user") - return - } - if ch.Block.MatchString(m.Text) && !ch.Meme.MatchString(m.Text) { - log.InfoContext(ctx, "blocked message", slog.String("text", m.Text), slog.Bool("meme", false)) - return - } - if cmd, ok := parseCommand(robo.tmi.name, m.Text); ok { - robo.command(ctx, log, ch, m, from, cmd) - return - } - ch.History.Add(m.Time(), m.ID, m.Sender, m.Text) - // If the message is a reply to e.g. Bocchi, TMI adds @Bocchi to the - // start of the message text. - // That's helpful for commands, which we've already processed, but - // otherwise we probably don't want to see it. Remove it. - if _, ok := msg.Tag("reply-parent-msg-id"); ok && strings.HasPrefix(m.Text, "@") { - _, t, _ := strings.Cut(m.Text, " ") - log.DebugContext(ctx, "stripped reply mention", slog.String("text", t)) - m.Text = t - } - robo.learn(ctx, log, ch, robo.hashes(), m) - switch err := ch.Memery.Check(m.Time(), from, m.Text); err { - case channel.ErrNotCopypasta: // do nothing - case nil: - // Meme detected. Copypasta. - t := time.Now() - r := ch.Rate.ReserveN(t, 1) - if d := r.DelayFrom(t); d > 0 { - // But we can't meme it. Restore it so we can next time. - log.InfoContext(ctx, "rate limited", - slog.String("action", "copypasta"), - slog.String("delay", d.String()), - ) - ch.Memery.Unblock(m.Text) - r.CancelAt(t) - return - } - f := ch.Effects.Pick(rand.Uint32()) - s := command.Effect(log, f, m.Text) - if ch.Block.MatchString(s) && !ch.Meme.MatchString(s) { - // We would copypasta something that is blocked. - // Note that since we reached here at all, that implies the - // effect made it unacceptable. - log.WarnContext(ctx, "blocked copypasta", slog.String("text", s), slog.String("effect", f)) - return - } - ch.Memery.Block(m.Time(), s) - log.InfoContext(ctx, "copypasta", - slog.String("text", s), - slog.String("effect", f), - ) - msg := message.Format("", ch.Name, "%s", s) - robo.sendTMI(ctx, send, msg) - return - default: - log.ErrorContext(ctx, "failed copypasta check", slog.Any("err", err)) - // Continue on. - } - if rand.Float64() > ch.Responses { - return - } - start := time.Now() - s, trace, err := brain.Speak(ctx, robo.brain, ch.Send, "") - cost := time.Since(start) - if err != nil { - log.ErrorContext(ctx, "wanted to speak but failed", slog.Any("err", err)) - return - } - if s == "" { - log.InfoContext(ctx, "spoke nothing", slog.String("tag", ch.Send)) - return - } - x := rand.Uint64() - e := ch.Emotes.Pick(uint32(x)) - f := ch.Effects.Pick(uint32(x >> 32)) - log.InfoContext(ctx, "speak", - slog.String("text", s), - slog.String("emote", e), - slog.String("effect", f), - ) - se := strings.TrimSpace(s + " " + e) - sef := command.Effect(log, f, se) - if err := robo.spoken.Record(ctx, ch.Send, sef, trace, time.Now(), cost, s, e, f); err != nil { - log.ErrorContext(ctx, "record trace failed", slog.Any("err", err)) - return - } - if ch.Block.MatchString(se) || ch.Block.MatchString(sef) { - log.WarnContext(ctx, "wanted to send blocked message", slog.String("text", sef)) - return - } - // Now that we've done all the work, which might take substantial time, - // check whether we can use it. + ch, _ := robo.channels.Load(msg.To()) + if ch == nil { + // TMI gives a WHISPER for a direct message, so this is a message to a + // channel that isn't configured. Ignore it. + return + } + m := message.FromTMI(msg) + log := slog.With(slog.String("trace", m.ID), slog.String("in", ch.Name)) + from := m.Sender + if ch.Ignore[from] { + log.InfoContext(ctx, "message from ignored user") + return + } + if ch.Block.MatchString(m.Text) && !ch.Meme.MatchString(m.Text) { + log.InfoContext(ctx, "blocked message", slog.String("text", m.Text), slog.Bool("meme", false)) + return + } + if cmd, ok := parseCommand(robo.tmi.name, m.Text); ok { + robo.command(ctx, log, ch, m, from, cmd) + return + } + ch.History.Add(m.Time(), m.ID, m.Sender, m.Text) + // If the message is a reply to e.g. Bocchi, TMI adds @Bocchi to the + // start of the message text. + // That's helpful for commands, which we've already processed, but + // otherwise we probably don't want to see it. Remove it. + if _, ok := msg.Tag("reply-parent-msg-id"); ok && strings.HasPrefix(m.Text, "@") { + _, t, _ := strings.Cut(m.Text, " ") + log.DebugContext(ctx, "stripped reply mention", slog.String("text", t)) + m.Text = t + } + robo.learn(ctx, log, ch, robo.hashes(), m) + switch err := ch.Memery.Check(m.Time(), from, m.Text); err { + case channel.ErrNotCopypasta: // do nothing + case nil: + // Meme detected. Copypasta. t := time.Now() r := ch.Rate.ReserveN(t, 1) if d := r.DelayFrom(t); d > 0 { + // But we can't meme it. Restore it so we can next time. log.InfoContext(ctx, "rate limited", - slog.String("action", "speak"), + slog.String("action", "copypasta"), slog.String("delay", d.String()), ) + ch.Memery.Unblock(m.Text) r.CancelAt(t) return } - msg := message.Format("", ch.Name, "%s", sef) + f := ch.Effects.Pick(rand.Uint32()) + s := command.Effect(log, f, m.Text) + if ch.Block.MatchString(s) && !ch.Meme.MatchString(s) { + // We would copypasta something that is blocked. + // Note that since we reached here at all, that implies the + // effect made it unacceptable. + log.WarnContext(ctx, "blocked copypasta", slog.String("text", s), slog.String("effect", f)) + return + } + ch.Memery.Block(m.Time(), s) + log.InfoContext(ctx, "copypasta", + slog.String("text", s), + slog.String("effect", f), + ) + msg := message.Format("", ch.Name, "%s", s) robo.sendTMI(ctx, send, msg) + return + default: + log.ErrorContext(ctx, "failed copypasta check", slog.Any("err", err)) + // Continue on. + } + if rand.Float64() > ch.Responses { + return + } + start := time.Now() + s, trace, err := brain.Speak(ctx, robo.brain, ch.Send, "") + cost := time.Since(start) + if err != nil { + log.ErrorContext(ctx, "wanted to speak but failed", slog.Any("err", err)) + return + } + if s == "" { + log.InfoContext(ctx, "spoke nothing", slog.String("tag", ch.Send)) + return + } + x := rand.Uint64() + e := ch.Emotes.Pick(uint32(x)) + f := ch.Effects.Pick(uint32(x >> 32)) + log.InfoContext(ctx, "speak", + slog.String("text", s), + slog.String("emote", e), + slog.String("effect", f), + ) + se := strings.TrimSpace(s + " " + e) + sef := command.Effect(log, f, se) + if err := robo.spoken.Record(ctx, ch.Send, sef, trace, time.Now(), cost, s, e, f); err != nil { + log.ErrorContext(ctx, "record trace failed", slog.Any("err", err)) + return + } + if ch.Block.MatchString(se) || ch.Block.MatchString(sef) { + log.WarnContext(ctx, "wanted to send blocked message", slog.String("text", sef)) + return + } + // Now that we've done all the work, which might take substantial time, + // check whether we can use it. + t := time.Now() + r := ch.Rate.ReserveN(t, 1) + if d := r.DelayFrom(t); d > 0 { + log.InfoContext(ctx, "rate limited", + slog.String("action", "speak"), + slog.String("delay", d.String()), + ) + r.CancelAt(t) + return } - robo.enqueue(ctx, group, work) + out := message.Format("", ch.Name, "%s", sef) + robo.sendTMI(ctx, send, out) } func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Channel, m *message.Received, from, cmd string) { @@ -194,45 +189,6 @@ func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Ch c.fn(ctx, &r, &inv) } -func (robo *Robot) enqueue(ctx context.Context, group *errgroup.Group, work func(context.Context)) { - var w chan func(context.Context) - // Get a worker if one exists. Otherwise, spawn a new one. - select { - case w = <-robo.works: - default: - w = make(chan func(context.Context), 1) - group.Go(func() error { - worker(ctx, robo.works, w) - return nil - }) - } - // Send it work. - select { - case <-ctx.Done(): - return - case w <- work: - } -} - -// worker runs works for a while. The provided context is passed to each work. -func worker(ctx context.Context, works chan chan func(context.Context), ch chan func(context.Context)) { - for { - select { - case <-ctx.Done(): - return - case work := <-ch: - work(ctx) - // Replace ourselves in the pool if it needs additional capacity. - // Otherwise, we're done. - select { - case works <- ch: - default: - return - } - } - } -} - // learn learns a given message's text if it passes ch's filters. func (robo *Robot) learn(ctx context.Context, log *slog.Logger, ch *channel.Channel, hasher userhash.Hasher, msg *message.Received) { if !ch.Enabled.Load() { diff --git a/tmi.go b/tmi.go index f38d575..61fd989 100644 --- a/tmi.go +++ b/tmi.go @@ -25,15 +25,24 @@ func (robo *Robot) tmiLoop(ctx context.Context, group *errgroup.Group, send chan } switch msg.Command { case "PRIVMSG": - robo.tmiMessage(ctx, group, send, msg) + group.Go(func() error { + robo.tmiMessage(ctx, send, msg) + return nil + }) case "WHISPER": // TODO(zeph): this case "NOTICE": // nothing yet case "CLEARCHAT": - robo.clearchat(ctx, group, msg) + group.Go(func() error { + robo.clearchat(ctx, msg) + return nil + }) case "CLEARMSG": - robo.clearmsg(ctx, group, msg) + group.Go(func() error { + robo.clearmsg(ctx, msg) + return nil + }) case "HOSTTARGET": // nothing yet case "USERSTATE": @@ -77,7 +86,7 @@ func (robo *Robot) joinTwitch(ctx context.Context, send chan<- *tmi.Message) { } } -func (robo *Robot) clearchat(ctx context.Context, group *errgroup.Group, msg *tmi.Message) { +func (robo *Robot) clearchat(ctx context.Context, msg *tmi.Message) { if len(msg.Params) == 0 { return } @@ -85,102 +94,91 @@ func (robo *Robot) clearchat(ctx context.Context, group *errgroup.Group, msg *tm if ch == nil { return } - var work func(ctx context.Context) t, _ := msg.Tag("target-user-id") switch t { case "": // Delete all recent chat. - work = func(ctx context.Context) { - tag := ch.Learn - slog.InfoContext(ctx, "clear all chat", slog.String("channel", msg.To()), slog.String("tag", tag)) - err := robo.brain.ForgetDuring(ctx, tag, msg.Time().Add(-15*time.Minute), msg.Time()) - if err != nil { - slog.ErrorContext(ctx, "failed to forget from all chat", slog.Any("err", err), slog.String("channel", msg.To())) - } + tag := ch.Learn + slog.InfoContext(ctx, "clear all chat", slog.String("channel", msg.To()), slog.String("tag", tag)) + err := robo.brain.ForgetDuring(ctx, tag, msg.Time().Add(-15*time.Minute), msg.Time()) + if err != nil { + slog.ErrorContext(ctx, "failed to forget from all chat", slog.Any("err", err), slog.String("channel", msg.To())) } case robo.tmi.userID: - work = func(ctx context.Context) { - // We use the send tag because we are forgetting something we sent. - tag := ch.Send - slog.InfoContext(ctx, "forget recent generated", slog.String("channel", msg.To()), slog.String("tag", tag)) - for id, err := range robo.spoken.Since(ctx, tag, msg.Time().Add(-15*time.Minute)) { - if err != nil { - slog.ErrorContext(ctx, "failed to get recent traces", - slog.Any("err", err), - slog.String("channel", msg.To()), - slog.String("tag", tag), - ) - continue - } - robo.Metrics.ForgotCount.Observe(1) - if err := robo.brain.ForgetMessage(ctx, tag, id); err != nil { - slog.ErrorContext(ctx, "failed to forget from recent trace", - slog.Any("err", err), - slog.String("channel", msg.To()), - slog.String("tag", tag), - slog.String("id", id), - ) - } + // We use the send tag because we are forgetting something we sent. + tag := ch.Send + slog.InfoContext(ctx, "forget recent generated", slog.String("channel", msg.To()), slog.String("tag", tag)) + for id, err := range robo.spoken.Since(ctx, tag, msg.Time().Add(-15*time.Minute)) { + if err != nil { + slog.ErrorContext(ctx, "failed to get recent traces", + slog.Any("err", err), + slog.String("channel", msg.To()), + slog.String("tag", tag), + ) + continue + } + robo.Metrics.ForgotCount.Observe(1) + if err := robo.brain.ForgetMessage(ctx, tag, id); err != nil { + slog.ErrorContext(ctx, "failed to forget from recent trace", + slog.Any("err", err), + slog.String("channel", msg.To()), + slog.String("tag", tag), + slog.String("id", id), + ) } } default: // Delete from user. // We use the user's current and previous userhash, since userhashes // are time-based. - work = func(ctx context.Context) { - hr := robo.hashes() - h := hr.Hash(new(userhash.Hash), t, msg.To(), msg.Time()) - if err := robo.brain.ForgetUser(ctx, h); err != nil { - slog.ErrorContext(ctx, "failed to forget recent messages from user", slog.Any("err", err), slog.String("channel", msg.To())) - // Try the previous userhash anyway. - } - h = hr.Hash(h, t, msg.To(), msg.Time().Add(-userhash.TimeQuantum)) - if err := robo.brain.ForgetUser(ctx, h); err != nil { - slog.ErrorContext(ctx, "failed to forget older messages from user", slog.Any("err", err), slog.String("channel", msg.To())) - } + hr := robo.hashes() + h := hr.Hash(new(userhash.Hash), t, msg.To(), msg.Time()) + if err := robo.brain.ForgetUser(ctx, h); err != nil { + slog.ErrorContext(ctx, "failed to forget recent messages from user", slog.Any("err", err), slog.String("channel", msg.To())) + // Try the previous userhash anyway. + } + h = hr.Hash(h, t, msg.To(), msg.Time().Add(-userhash.TimeQuantum)) + if err := robo.brain.ForgetUser(ctx, h); err != nil { + slog.ErrorContext(ctx, "failed to forget older messages from user", slog.Any("err", err), slog.String("channel", msg.To())) } } - robo.enqueue(ctx, group, work) } -func (robo *Robot) clearmsg(ctx context.Context, group *errgroup.Group, msg *tmi.Message) { - work := func(ctx context.Context) { - if len(msg.Params) == 0 { - return - } - ch, _ := robo.channels.Load(msg.To()) - if ch == nil { - return - } - t, _ := msg.Tag("target-msg-id") - u, _ := msg.Tag("login") - log := slog.With(slog.String("trace", t), slog.String("in", msg.To())) - if u != robo.tmi.name { - // Forget a message from someone else. - log.InfoContext(ctx, "forget message", slog.String("tag", ch.Learn), slog.String("id", t)) - forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Learn, t) - return - } - // Forget a message from the robo. - // This may or may not be a generated message; it could be a command - // output or copypasta. Regardless, if it was deleted, we should try - // not to say it. - // Note that we use the send tag rather than the learn tag for this, - // because we are unlearning something that we sent. - trace, tm, err := robo.spoken.Trace(ctx, ch.Send, msg.Trailing) - if err != nil { - log.ErrorContext(ctx, "failed to get message trace", - slog.Any("err", err), - slog.String("tag", ch.Send), - slog.String("text", msg.Trailing), - slog.String("id", t), - ) - return - } - log.InfoContext(ctx, "forget trace", slog.String("tag", ch.Send), slog.Any("spoken", tm), slog.Any("trace", trace)) - forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Send, trace...) +func (robo *Robot) clearmsg(ctx context.Context, msg *tmi.Message) { + if len(msg.Params) == 0 { + return + } + ch, _ := robo.channels.Load(msg.To()) + if ch == nil { + return + } + t, _ := msg.Tag("target-msg-id") + u, _ := msg.Tag("login") + log := slog.With(slog.String("trace", t), slog.String("in", msg.To())) + if u != robo.tmi.name { + // Forget a message from someone else. + log.InfoContext(ctx, "forget message", slog.String("tag", ch.Learn), slog.String("id", t)) + forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Learn, t) + return + } + // Forget a message from the robo. + // This may or may not be a generated message; it could be a command + // output or copypasta. Regardless, if it was deleted, we should try + // not to say it. + // Note that we use the send tag rather than the learn tag for this, + // because we are unlearning something that we sent. + trace, tm, err := robo.spoken.Trace(ctx, ch.Send, msg.Trailing) + if err != nil { + log.ErrorContext(ctx, "failed to get message trace", + slog.Any("err", err), + slog.String("tag", ch.Send), + slog.String("text", msg.Trailing), + slog.String("id", t), + ) + return } - robo.enqueue(ctx, group, work) + log.InfoContext(ctx, "forget trace", slog.String("tag", ch.Send), slog.Any("spoken", tm), slog.Any("trace", trace)) + forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Send, trace...) } func forget(ctx context.Context, log *slog.Logger, forgetCount metrics.Observer, brain brain.Brain, tag string, trace ...string) {