diff --git a/privmsg.go b/privmsg.go index b51cb00..2eb5ec3 100644 --- a/privmsg.go +++ b/privmsg.go @@ -11,7 +11,6 @@ import ( "unicode/utf8" "gitlab.com/zephyrtronium/tmi" - "golang.org/x/sync/errgroup" "github.com/zephyrtronium/robot/brain" "github.com/zephyrtronium/robot/channel" @@ -22,127 +21,123 @@ import ( ) // tmiMessage processes a PRIVMSG from TMI. -func (robo *Robot) tmiMessage(ctx context.Context, group *errgroup.Group, send chan<- *tmi.Message, msg *tmi.Message) { +func (robo *Robot) tmiMessage(ctx context.Context, send chan<- *tmi.Message, msg *tmi.Message) { robo.Metrics.TMIMsgsCount.Observe(1) - // Run in a worker so that we don't block the message loop. - work := func(ctx context.Context) { - ch, _ := robo.channels.Load(msg.To()) - if ch == nil { - // TMI gives a WHISPER for a direct message, so this is a message to a - // channel that isn't configured. Ignore it. - return - } - m := message.FromTMI(msg) - log := slog.With(slog.String("trace", m.ID), slog.String("in", ch.Name)) - from := m.Sender - if ch.Ignore[from] { - log.InfoContext(ctx, "message from ignored user") - return - } - if ch.Block.MatchString(m.Text) && !ch.Meme.MatchString(m.Text) { - log.InfoContext(ctx, "blocked message", slog.String("text", m.Text), slog.Bool("meme", false)) - return - } - if cmd, ok := parseCommand(robo.tmi.name, m.Text); ok { - robo.command(ctx, log, ch, m, from, cmd) - return - } - ch.History.Add(m.Time(), m.ID, m.Sender, m.Text) - // If the message is a reply to e.g. Bocchi, TMI adds @Bocchi to the - // start of the message text. - // That's helpful for commands, which we've already processed, but - // otherwise we probably don't want to see it. Remove it. - if _, ok := msg.Tag("reply-parent-msg-id"); ok && strings.HasPrefix(m.Text, "@") { - _, t, _ := strings.Cut(m.Text, " ") - log.DebugContext(ctx, "stripped reply mention", slog.String("text", t)) - m.Text = t - } - robo.learn(ctx, log, ch, robo.hashes(), m) - switch err := ch.Memery.Check(m.Time(), from, m.Text); err { - case channel.ErrNotCopypasta: // do nothing - case nil: - // Meme detected. Copypasta. - t := time.Now() - r := ch.Rate.ReserveN(t, 1) - if d := r.DelayFrom(t); d > 0 { - // But we can't meme it. Restore it so we can next time. - log.InfoContext(ctx, "rate limited", - slog.String("action", "copypasta"), - slog.String("delay", d.String()), - ) - ch.Memery.Unblock(m.Text) - r.CancelAt(t) - return - } - f := ch.Effects.Pick(rand.Uint32()) - s := command.Effect(log, f, m.Text) - if ch.Block.MatchString(s) && !ch.Meme.MatchString(s) { - // We would copypasta something that is blocked. - // Note that since we reached here at all, that implies the - // effect made it unacceptable. - log.WarnContext(ctx, "blocked copypasta", slog.String("text", s), slog.String("effect", f)) - return - } - ch.Memery.Block(m.Time(), s) - log.InfoContext(ctx, "copypasta", - slog.String("text", s), - slog.String("effect", f), - ) - msg := message.Format("", ch.Name, "%s", s) - robo.sendTMI(ctx, send, msg) - return - default: - log.ErrorContext(ctx, "failed copypasta check", slog.Any("err", err)) - // Continue on. - } - if rand.Float64() > ch.Responses { - return - } - start := time.Now() - s, trace, err := brain.Speak(ctx, robo.brain, ch.Send, "") - cost := time.Since(start) - if err != nil { - log.ErrorContext(ctx, "wanted to speak but failed", slog.Any("err", err)) - return - } - if s == "" { - log.InfoContext(ctx, "spoke nothing", slog.String("tag", ch.Send)) - return - } - x := rand.Uint64() - e := ch.Emotes.Pick(uint32(x)) - f := ch.Effects.Pick(uint32(x >> 32)) - log.InfoContext(ctx, "speak", - slog.String("text", s), - slog.String("emote", e), - slog.String("effect", f), - ) - se := strings.TrimSpace(s + " " + e) - sef := command.Effect(log, f, se) - if err := robo.spoken.Record(ctx, ch.Send, sef, trace, time.Now(), cost, s, e, f); err != nil { - log.ErrorContext(ctx, "record trace failed", slog.Any("err", err)) - return - } - if ch.Block.MatchString(se) || ch.Block.MatchString(sef) { - log.WarnContext(ctx, "wanted to send blocked message", slog.String("text", sef)) - return - } - // Now that we've done all the work, which might take substantial time, - // check whether we can use it. + ch, _ := robo.channels.Load(msg.To()) + if ch == nil { + // TMI gives a WHISPER for a direct message, so this is a message to a + // channel that isn't configured. Ignore it. + return + } + m := message.FromTMI(msg) + log := slog.With(slog.String("trace", m.ID), slog.String("in", ch.Name)) + from := m.Sender + if ch.Ignore[from] { + log.InfoContext(ctx, "message from ignored user") + return + } + if ch.Block.MatchString(m.Text) && !ch.Meme.MatchString(m.Text) { + log.InfoContext(ctx, "blocked message", slog.String("text", m.Text), slog.Bool("meme", false)) + return + } + if cmd, ok := parseCommand(robo.tmi.name, m.Text); ok { + robo.command(ctx, log, ch, m, from, cmd) + return + } + ch.History.Add(m.Time(), m.ID, m.Sender, m.Text) + // If the message is a reply to e.g. Bocchi, TMI adds @Bocchi to the + // start of the message text. + // That's helpful for commands, which we've already processed, but + // otherwise we probably don't want to see it. Remove it. + if _, ok := msg.Tag("reply-parent-msg-id"); ok && strings.HasPrefix(m.Text, "@") { + _, t, _ := strings.Cut(m.Text, " ") + log.DebugContext(ctx, "stripped reply mention", slog.String("text", t)) + m.Text = t + } + robo.learn(ctx, log, ch, robo.hashes(), m) + switch err := ch.Memery.Check(m.Time(), from, m.Text); err { + case channel.ErrNotCopypasta: // do nothing + case nil: + // Meme detected. Copypasta. t := time.Now() r := ch.Rate.ReserveN(t, 1) if d := r.DelayFrom(t); d > 0 { + // But we can't meme it. Restore it so we can next time. log.InfoContext(ctx, "rate limited", - slog.String("action", "speak"), + slog.String("action", "copypasta"), slog.String("delay", d.String()), ) + ch.Memery.Unblock(m.Text) r.CancelAt(t) return } - msg := message.Format("", ch.Name, "%s", sef) + f := ch.Effects.Pick(rand.Uint32()) + s := command.Effect(log, f, m.Text) + if ch.Block.MatchString(s) && !ch.Meme.MatchString(s) { + // We would copypasta something that is blocked. + // Note that since we reached here at all, that implies the + // effect made it unacceptable. + log.WarnContext(ctx, "blocked copypasta", slog.String("text", s), slog.String("effect", f)) + return + } + ch.Memery.Block(m.Time(), s) + log.InfoContext(ctx, "copypasta", + slog.String("text", s), + slog.String("effect", f), + ) + msg := message.Format("", ch.Name, "%s", s) robo.sendTMI(ctx, send, msg) + return + default: + log.ErrorContext(ctx, "failed copypasta check", slog.Any("err", err)) + // Continue on. + } + if rand.Float64() > ch.Responses { + return + } + start := time.Now() + s, trace, err := brain.Speak(ctx, robo.brain, ch.Send, "") + cost := time.Since(start) + if err != nil { + log.ErrorContext(ctx, "wanted to speak but failed", slog.Any("err", err)) + return + } + if s == "" { + log.InfoContext(ctx, "spoke nothing", slog.String("tag", ch.Send)) + return + } + x := rand.Uint64() + e := ch.Emotes.Pick(uint32(x)) + f := ch.Effects.Pick(uint32(x >> 32)) + log.InfoContext(ctx, "speak", + slog.String("text", s), + slog.String("emote", e), + slog.String("effect", f), + ) + se := strings.TrimSpace(s + " " + e) + sef := command.Effect(log, f, se) + if err := robo.spoken.Record(ctx, ch.Send, sef, trace, time.Now(), cost, s, e, f); err != nil { + log.ErrorContext(ctx, "record trace failed", slog.Any("err", err)) + return + } + if ch.Block.MatchString(se) || ch.Block.MatchString(sef) { + log.WarnContext(ctx, "wanted to send blocked message", slog.String("text", sef)) + return + } + // Now that we've done all the work, which might take substantial time, + // check whether we can use it. + t := time.Now() + r := ch.Rate.ReserveN(t, 1) + if d := r.DelayFrom(t); d > 0 { + log.InfoContext(ctx, "rate limited", + slog.String("action", "speak"), + slog.String("delay", d.String()), + ) + r.CancelAt(t) + return } - robo.enqueue(ctx, group, work) + out := message.Format("", ch.Name, "%s", sef) + robo.sendTMI(ctx, send, out) } func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Channel, m *message.Received, from, cmd string) { @@ -194,45 +189,6 @@ func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Ch c.fn(ctx, &r, &inv) } -func (robo *Robot) enqueue(ctx context.Context, group *errgroup.Group, work func(context.Context)) { - var w chan func(context.Context) - // Get a worker if one exists. Otherwise, spawn a new one. - select { - case w = <-robo.works: - default: - w = make(chan func(context.Context), 1) - group.Go(func() error { - worker(ctx, robo.works, w) - return nil - }) - } - // Send it work. - select { - case <-ctx.Done(): - return - case w <- work: - } -} - -// worker runs works for a while. The provided context is passed to each work. -func worker(ctx context.Context, works chan chan func(context.Context), ch chan func(context.Context)) { - for { - select { - case <-ctx.Done(): - return - case work := <-ch: - work(ctx) - // Replace ourselves in the pool if it needs additional capacity. - // Otherwise, we're done. - select { - case works <- ch: - default: - return - } - } - } -} - // learn learns a given message's text if it passes ch's filters. func (robo *Robot) learn(ctx context.Context, log *slog.Logger, ch *channel.Channel, hasher userhash.Hasher, msg *message.Received) { if !ch.Enabled.Load() { diff --git a/tmi.go b/tmi.go index f38d575..61fd989 100644 --- a/tmi.go +++ b/tmi.go @@ -25,15 +25,24 @@ func (robo *Robot) tmiLoop(ctx context.Context, group *errgroup.Group, send chan } switch msg.Command { case "PRIVMSG": - robo.tmiMessage(ctx, group, send, msg) + group.Go(func() error { + robo.tmiMessage(ctx, send, msg) + return nil + }) case "WHISPER": // TODO(zeph): this case "NOTICE": // nothing yet case "CLEARCHAT": - robo.clearchat(ctx, group, msg) + group.Go(func() error { + robo.clearchat(ctx, msg) + return nil + }) case "CLEARMSG": - robo.clearmsg(ctx, group, msg) + group.Go(func() error { + robo.clearmsg(ctx, msg) + return nil + }) case "HOSTTARGET": // nothing yet case "USERSTATE": @@ -77,7 +86,7 @@ func (robo *Robot) joinTwitch(ctx context.Context, send chan<- *tmi.Message) { } } -func (robo *Robot) clearchat(ctx context.Context, group *errgroup.Group, msg *tmi.Message) { +func (robo *Robot) clearchat(ctx context.Context, msg *tmi.Message) { if len(msg.Params) == 0 { return } @@ -85,102 +94,91 @@ func (robo *Robot) clearchat(ctx context.Context, group *errgroup.Group, msg *tm if ch == nil { return } - var work func(ctx context.Context) t, _ := msg.Tag("target-user-id") switch t { case "": // Delete all recent chat. - work = func(ctx context.Context) { - tag := ch.Learn - slog.InfoContext(ctx, "clear all chat", slog.String("channel", msg.To()), slog.String("tag", tag)) - err := robo.brain.ForgetDuring(ctx, tag, msg.Time().Add(-15*time.Minute), msg.Time()) - if err != nil { - slog.ErrorContext(ctx, "failed to forget from all chat", slog.Any("err", err), slog.String("channel", msg.To())) - } + tag := ch.Learn + slog.InfoContext(ctx, "clear all chat", slog.String("channel", msg.To()), slog.String("tag", tag)) + err := robo.brain.ForgetDuring(ctx, tag, msg.Time().Add(-15*time.Minute), msg.Time()) + if err != nil { + slog.ErrorContext(ctx, "failed to forget from all chat", slog.Any("err", err), slog.String("channel", msg.To())) } case robo.tmi.userID: - work = func(ctx context.Context) { - // We use the send tag because we are forgetting something we sent. - tag := ch.Send - slog.InfoContext(ctx, "forget recent generated", slog.String("channel", msg.To()), slog.String("tag", tag)) - for id, err := range robo.spoken.Since(ctx, tag, msg.Time().Add(-15*time.Minute)) { - if err != nil { - slog.ErrorContext(ctx, "failed to get recent traces", - slog.Any("err", err), - slog.String("channel", msg.To()), - slog.String("tag", tag), - ) - continue - } - robo.Metrics.ForgotCount.Observe(1) - if err := robo.brain.ForgetMessage(ctx, tag, id); err != nil { - slog.ErrorContext(ctx, "failed to forget from recent trace", - slog.Any("err", err), - slog.String("channel", msg.To()), - slog.String("tag", tag), - slog.String("id", id), - ) - } + // We use the send tag because we are forgetting something we sent. + tag := ch.Send + slog.InfoContext(ctx, "forget recent generated", slog.String("channel", msg.To()), slog.String("tag", tag)) + for id, err := range robo.spoken.Since(ctx, tag, msg.Time().Add(-15*time.Minute)) { + if err != nil { + slog.ErrorContext(ctx, "failed to get recent traces", + slog.Any("err", err), + slog.String("channel", msg.To()), + slog.String("tag", tag), + ) + continue + } + robo.Metrics.ForgotCount.Observe(1) + if err := robo.brain.ForgetMessage(ctx, tag, id); err != nil { + slog.ErrorContext(ctx, "failed to forget from recent trace", + slog.Any("err", err), + slog.String("channel", msg.To()), + slog.String("tag", tag), + slog.String("id", id), + ) } } default: // Delete from user. // We use the user's current and previous userhash, since userhashes // are time-based. - work = func(ctx context.Context) { - hr := robo.hashes() - h := hr.Hash(new(userhash.Hash), t, msg.To(), msg.Time()) - if err := robo.brain.ForgetUser(ctx, h); err != nil { - slog.ErrorContext(ctx, "failed to forget recent messages from user", slog.Any("err", err), slog.String("channel", msg.To())) - // Try the previous userhash anyway. - } - h = hr.Hash(h, t, msg.To(), msg.Time().Add(-userhash.TimeQuantum)) - if err := robo.brain.ForgetUser(ctx, h); err != nil { - slog.ErrorContext(ctx, "failed to forget older messages from user", slog.Any("err", err), slog.String("channel", msg.To())) - } + hr := robo.hashes() + h := hr.Hash(new(userhash.Hash), t, msg.To(), msg.Time()) + if err := robo.brain.ForgetUser(ctx, h); err != nil { + slog.ErrorContext(ctx, "failed to forget recent messages from user", slog.Any("err", err), slog.String("channel", msg.To())) + // Try the previous userhash anyway. + } + h = hr.Hash(h, t, msg.To(), msg.Time().Add(-userhash.TimeQuantum)) + if err := robo.brain.ForgetUser(ctx, h); err != nil { + slog.ErrorContext(ctx, "failed to forget older messages from user", slog.Any("err", err), slog.String("channel", msg.To())) } } - robo.enqueue(ctx, group, work) } -func (robo *Robot) clearmsg(ctx context.Context, group *errgroup.Group, msg *tmi.Message) { - work := func(ctx context.Context) { - if len(msg.Params) == 0 { - return - } - ch, _ := robo.channels.Load(msg.To()) - if ch == nil { - return - } - t, _ := msg.Tag("target-msg-id") - u, _ := msg.Tag("login") - log := slog.With(slog.String("trace", t), slog.String("in", msg.To())) - if u != robo.tmi.name { - // Forget a message from someone else. - log.InfoContext(ctx, "forget message", slog.String("tag", ch.Learn), slog.String("id", t)) - forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Learn, t) - return - } - // Forget a message from the robo. - // This may or may not be a generated message; it could be a command - // output or copypasta. Regardless, if it was deleted, we should try - // not to say it. - // Note that we use the send tag rather than the learn tag for this, - // because we are unlearning something that we sent. - trace, tm, err := robo.spoken.Trace(ctx, ch.Send, msg.Trailing) - if err != nil { - log.ErrorContext(ctx, "failed to get message trace", - slog.Any("err", err), - slog.String("tag", ch.Send), - slog.String("text", msg.Trailing), - slog.String("id", t), - ) - return - } - log.InfoContext(ctx, "forget trace", slog.String("tag", ch.Send), slog.Any("spoken", tm), slog.Any("trace", trace)) - forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Send, trace...) +func (robo *Robot) clearmsg(ctx context.Context, msg *tmi.Message) { + if len(msg.Params) == 0 { + return + } + ch, _ := robo.channels.Load(msg.To()) + if ch == nil { + return + } + t, _ := msg.Tag("target-msg-id") + u, _ := msg.Tag("login") + log := slog.With(slog.String("trace", t), slog.String("in", msg.To())) + if u != robo.tmi.name { + // Forget a message from someone else. + log.InfoContext(ctx, "forget message", slog.String("tag", ch.Learn), slog.String("id", t)) + forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Learn, t) + return + } + // Forget a message from the robo. + // This may or may not be a generated message; it could be a command + // output or copypasta. Regardless, if it was deleted, we should try + // not to say it. + // Note that we use the send tag rather than the learn tag for this, + // because we are unlearning something that we sent. + trace, tm, err := robo.spoken.Trace(ctx, ch.Send, msg.Trailing) + if err != nil { + log.ErrorContext(ctx, "failed to get message trace", + slog.Any("err", err), + slog.String("tag", ch.Send), + slog.String("text", msg.Trailing), + slog.String("id", t), + ) + return } - robo.enqueue(ctx, group, work) + log.InfoContext(ctx, "forget trace", slog.String("tag", ch.Send), slog.Any("spoken", tm), slog.Any("trace", trace)) + forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Send, trace...) } func forget(ctx context.Context, log *slog.Logger, forgetCount metrics.Observer, brain brain.Brain, tag string, trace ...string) {