diff --git a/goqite.go b/goqite.go index 27fb730..64e3920 100644 --- a/goqite.go +++ b/goqite.go @@ -88,17 +88,36 @@ func (q *Queue) Send(ctx context.Context, m Message) error { // SendTx is like Send, but within an existing transaction. func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error { + _, err := q.SendAndGetIDTx(ctx, tx, m) + return err +} + +// SendAndGetID is like Send, but also returns the message ID, which can be used +// to interact with the message without receiving it first. +func (q *Queue) SendAndGetID(ctx context.Context, m Message) (ID, error) { + var id ID + err := internalsql.InTx(q.db, func(tx *sql.Tx) error { + var err error + id, err = q.SendAndGetIDTx(ctx, tx, m) + return err + }) + return id, err +} + +// SendAndGetIDTx is like SendAndGetID, but within an existing transaction. +func (q *Queue) SendAndGetIDTx(ctx context.Context, tx *sql.Tx, m Message) (ID, error) { if m.Delay < 0 { panic("delay cannot be negative") } timeout := time.Now().Add(m.Delay).Format(rfc3339Milli) - _, err := tx.ExecContext(ctx, `insert into goqite (queue, body, timeout) values (?, ?, ?)`, q.name, m.Body, timeout) - if err != nil { - return err + var id ID + query := `insert into goqite (queue, body, timeout) values (?, ?, ?) returning id` + if err := tx.QueryRowContext(ctx, query, q.name, m.Body, timeout).Scan(&id); err != nil { + return "", err } - return nil + return id, nil } // Receive a Message from the queue, or nil if there is none. diff --git a/goqite_test.go b/goqite_test.go index fd9e2cf..cffee37 100644 --- a/goqite_test.go +++ b/goqite_test.go @@ -189,6 +189,23 @@ func TestQueue_Receive(t *testing.T) { }) } +func TestQueue_SendAndGetID(t *testing.T) { + t.Run("returns the message ID", func(t *testing.T) { + q := newQ(t, goqite.NewOpts{}, ":memory:") + + m := goqite.Message{ + Body: []byte("yo"), + } + + id, err := q.SendAndGetID(context.Background(), m) + is.NotError(t, err) + is.Equal(t, 34, len(id)) + + err = q.Delete(context.Background(), id) + is.NotError(t, err) + }) +} + func TestQueue_Extend(t *testing.T) { t.Run("does not receive a message that has had the timeout extended", func(t *testing.T) { q := newQ(t, goqite.NewOpts{Timeout: time.Millisecond}, ":memory:")