Skip to content

Commit

Permalink
robot: remove the worker pool
Browse files Browse the repository at this point in the history
The concept of the worker pool was to limit the work the scheduler has
to do by having an expanding pool of goroutines available for work at
all times, rather than frequently destroying and recreating them.

In practice, this wasn't useful because the work we do per message
turned out to be much cheaper than expected. Even at 150+ messages per
minute, the pool never reached its nominal cap.
  • Loading branch information
zephyrtronium committed Nov 14, 2024
1 parent b4b3dcb commit e92f7de
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 232 deletions.
254 changes: 105 additions & 149 deletions privmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"unicode/utf8"

"gitlab.com/zephyrtronium/tmi"
"golang.org/x/sync/errgroup"

"github.com/zephyrtronium/robot/brain"
"github.com/zephyrtronium/robot/channel"
Expand All @@ -22,127 +21,123 @@ import (
)

// tmiMessage processes a PRIVMSG from TMI.
func (robo *Robot) tmiMessage(ctx context.Context, group *errgroup.Group, send chan<- *tmi.Message, msg *tmi.Message) {
func (robo *Robot) tmiMessage(ctx context.Context, send chan<- *tmi.Message, msg *tmi.Message) {
robo.Metrics.TMIMsgsCount.Observe(1)
// Run in a worker so that we don't block the message loop.
work := func(ctx context.Context) {
ch, _ := robo.channels.Load(msg.To())
if ch == nil {
// TMI gives a WHISPER for a direct message, so this is a message to a
// channel that isn't configured. Ignore it.
return
}
m := message.FromTMI(msg)
log := slog.With(slog.String("trace", m.ID), slog.String("in", ch.Name))
from := m.Sender
if ch.Ignore[from] {
log.InfoContext(ctx, "message from ignored user")
return
}
if ch.Block.MatchString(m.Text) && !ch.Meme.MatchString(m.Text) {
log.InfoContext(ctx, "blocked message", slog.String("text", m.Text), slog.Bool("meme", false))
return
}
if cmd, ok := parseCommand(robo.tmi.name, m.Text); ok {
robo.command(ctx, log, ch, m, from, cmd)
return
}
ch.History.Add(m.Time(), m.ID, m.Sender, m.Text)
// If the message is a reply to e.g. Bocchi, TMI adds @Bocchi to the
// start of the message text.
// That's helpful for commands, which we've already processed, but
// otherwise we probably don't want to see it. Remove it.
if _, ok := msg.Tag("reply-parent-msg-id"); ok && strings.HasPrefix(m.Text, "@") {
_, t, _ := strings.Cut(m.Text, " ")
log.DebugContext(ctx, "stripped reply mention", slog.String("text", t))
m.Text = t
}
robo.learn(ctx, log, ch, robo.hashes(), m)
switch err := ch.Memery.Check(m.Time(), from, m.Text); err {
case channel.ErrNotCopypasta: // do nothing
case nil:
// Meme detected. Copypasta.
t := time.Now()
r := ch.Rate.ReserveN(t, 1)
if d := r.DelayFrom(t); d > 0 {
// But we can't meme it. Restore it so we can next time.
log.InfoContext(ctx, "rate limited",
slog.String("action", "copypasta"),
slog.String("delay", d.String()),
)
ch.Memery.Unblock(m.Text)
r.CancelAt(t)
return
}
f := ch.Effects.Pick(rand.Uint32())
s := command.Effect(log, f, m.Text)
if ch.Block.MatchString(s) && !ch.Meme.MatchString(s) {
// We would copypasta something that is blocked.
// Note that since we reached here at all, that implies the
// effect made it unacceptable.
log.WarnContext(ctx, "blocked copypasta", slog.String("text", s), slog.String("effect", f))
return
}
ch.Memery.Block(m.Time(), s)
log.InfoContext(ctx, "copypasta",
slog.String("text", s),
slog.String("effect", f),
)
msg := message.Format("", ch.Name, "%s", s)
robo.sendTMI(ctx, send, msg)
return
default:
log.ErrorContext(ctx, "failed copypasta check", slog.Any("err", err))
// Continue on.
}
if rand.Float64() > ch.Responses {
return
}
start := time.Now()
s, trace, err := brain.Speak(ctx, robo.brain, ch.Send, "")
cost := time.Since(start)
if err != nil {
log.ErrorContext(ctx, "wanted to speak but failed", slog.Any("err", err))
return
}
if s == "" {
log.InfoContext(ctx, "spoke nothing", slog.String("tag", ch.Send))
return
}
x := rand.Uint64()
e := ch.Emotes.Pick(uint32(x))
f := ch.Effects.Pick(uint32(x >> 32))
log.InfoContext(ctx, "speak",
slog.String("text", s),
slog.String("emote", e),
slog.String("effect", f),
)
se := strings.TrimSpace(s + " " + e)
sef := command.Effect(log, f, se)
if err := robo.spoken.Record(ctx, ch.Send, sef, trace, time.Now(), cost, s, e, f); err != nil {
log.ErrorContext(ctx, "record trace failed", slog.Any("err", err))
return
}
if ch.Block.MatchString(se) || ch.Block.MatchString(sef) {
log.WarnContext(ctx, "wanted to send blocked message", slog.String("text", sef))
return
}
// Now that we've done all the work, which might take substantial time,
// check whether we can use it.
ch, _ := robo.channels.Load(msg.To())
if ch == nil {
// TMI gives a WHISPER for a direct message, so this is a message to a
// channel that isn't configured. Ignore it.
return
}
m := message.FromTMI(msg)
log := slog.With(slog.String("trace", m.ID), slog.String("in", ch.Name))
from := m.Sender
if ch.Ignore[from] {
log.InfoContext(ctx, "message from ignored user")
return
}
if ch.Block.MatchString(m.Text) && !ch.Meme.MatchString(m.Text) {
log.InfoContext(ctx, "blocked message", slog.String("text", m.Text), slog.Bool("meme", false))
return
}
if cmd, ok := parseCommand(robo.tmi.name, m.Text); ok {
robo.command(ctx, log, ch, m, from, cmd)
return
}
ch.History.Add(m.Time(), m.ID, m.Sender, m.Text)
// If the message is a reply to e.g. Bocchi, TMI adds @Bocchi to the
// start of the message text.
// That's helpful for commands, which we've already processed, but
// otherwise we probably don't want to see it. Remove it.
if _, ok := msg.Tag("reply-parent-msg-id"); ok && strings.HasPrefix(m.Text, "@") {
_, t, _ := strings.Cut(m.Text, " ")
log.DebugContext(ctx, "stripped reply mention", slog.String("text", t))
m.Text = t
}
robo.learn(ctx, log, ch, robo.hashes(), m)
switch err := ch.Memery.Check(m.Time(), from, m.Text); err {
case channel.ErrNotCopypasta: // do nothing
case nil:
// Meme detected. Copypasta.
t := time.Now()
r := ch.Rate.ReserveN(t, 1)
if d := r.DelayFrom(t); d > 0 {
// But we can't meme it. Restore it so we can next time.
log.InfoContext(ctx, "rate limited",
slog.String("action", "speak"),
slog.String("action", "copypasta"),
slog.String("delay", d.String()),
)
ch.Memery.Unblock(m.Text)
r.CancelAt(t)
return
}
msg := message.Format("", ch.Name, "%s", sef)
f := ch.Effects.Pick(rand.Uint32())
s := command.Effect(log, f, m.Text)
if ch.Block.MatchString(s) && !ch.Meme.MatchString(s) {
// We would copypasta something that is blocked.
// Note that since we reached here at all, that implies the
// effect made it unacceptable.
log.WarnContext(ctx, "blocked copypasta", slog.String("text", s), slog.String("effect", f))
return
}
ch.Memery.Block(m.Time(), s)
log.InfoContext(ctx, "copypasta",
slog.String("text", s),
slog.String("effect", f),
)
msg := message.Format("", ch.Name, "%s", s)
robo.sendTMI(ctx, send, msg)
return
default:
log.ErrorContext(ctx, "failed copypasta check", slog.Any("err", err))
// Continue on.
}
if rand.Float64() > ch.Responses {
return
}
start := time.Now()
s, trace, err := brain.Speak(ctx, robo.brain, ch.Send, "")
cost := time.Since(start)
if err != nil {
log.ErrorContext(ctx, "wanted to speak but failed", slog.Any("err", err))
return
}
if s == "" {
log.InfoContext(ctx, "spoke nothing", slog.String("tag", ch.Send))
return
}
x := rand.Uint64()
e := ch.Emotes.Pick(uint32(x))
f := ch.Effects.Pick(uint32(x >> 32))
log.InfoContext(ctx, "speak",
slog.String("text", s),
slog.String("emote", e),
slog.String("effect", f),
)
se := strings.TrimSpace(s + " " + e)
sef := command.Effect(log, f, se)
if err := robo.spoken.Record(ctx, ch.Send, sef, trace, time.Now(), cost, s, e, f); err != nil {
log.ErrorContext(ctx, "record trace failed", slog.Any("err", err))
return
}
if ch.Block.MatchString(se) || ch.Block.MatchString(sef) {
log.WarnContext(ctx, "wanted to send blocked message", slog.String("text", sef))
return
}
// Now that we've done all the work, which might take substantial time,
// check whether we can use it.
t := time.Now()
r := ch.Rate.ReserveN(t, 1)
if d := r.DelayFrom(t); d > 0 {
log.InfoContext(ctx, "rate limited",
slog.String("action", "speak"),
slog.String("delay", d.String()),
)
r.CancelAt(t)
return
}
robo.enqueue(ctx, group, work)
out := message.Format("", ch.Name, "%s", sef)
robo.sendTMI(ctx, send, out)
}

func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Channel, m *message.Received, from, cmd string) {
Expand Down Expand Up @@ -194,45 +189,6 @@ func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Ch
c.fn(ctx, &r, &inv)
}

func (robo *Robot) enqueue(ctx context.Context, group *errgroup.Group, work func(context.Context)) {
var w chan func(context.Context)
// Get a worker if one exists. Otherwise, spawn a new one.
select {
case w = <-robo.works:
default:
w = make(chan func(context.Context), 1)
group.Go(func() error {
worker(ctx, robo.works, w)
return nil
})
}
// Send it work.
select {
case <-ctx.Done():
return
case w <- work:
}
}

// worker runs works for a while. The provided context is passed to each work.
func worker(ctx context.Context, works chan chan func(context.Context), ch chan func(context.Context)) {
for {
select {
case <-ctx.Done():
return
case work := <-ch:
work(ctx)
// Replace ourselves in the pool if it needs additional capacity.
// Otherwise, we're done.
select {
case works <- ch:
default:
return
}
}
}
}

// learn learns a given message's text if it passes ch's filters.
func (robo *Robot) learn(ctx context.Context, log *slog.Logger, ch *channel.Channel, hasher userhash.Hasher, msg *message.Received) {
if !ch.Enabled.Load() {
Expand Down
Loading

0 comments on commit e92f7de

Please sign in to comment.