Skip to content

Transactional outbox event processor with graceful shutdown and horizontal scalability

License

Notifications You must be signed in to change notification settings

dillonstreator/txob

Repository files navigation

txob

Reliably process side-effects in your Node.js applications without data loss


The Problem

When building applications, you often need to perform multiple operations together: update your database AND send an email, publish an event, trigger a webhook, or notify another service. This creates a critical challenge:

// ❌ The problem: What if the email fails after the database commit?
await db.createUser(user);
await db.commit();
await emailService.sendWelcomeEmail(user.email); // 💥 Fails! User created but no email sent
// ❌ Also problematic: What if the database fails after sending the email?
await emailService.sendWelcomeEmail(user.email); // ✅ Email sent
await db.createUser(user);
await db.commit(); // 💥 Fails! Email sent but no user record
// ❌ What about using a message queue?
await db.createUser(user);
await messageQueue.publish("user.created", user); // ✅ Message queued
await db.commit(); // 💥 Fails! Message is in queue but no user record
// The queue and database are separate systems - you can't make them atomic!

The Transactional Outbox Pattern solves this by storing both the business data and events in a single database transaction, then processing events asynchronously with guaranteed delivery.

// ✅ Solution: Save both user and event in the same transaction
await db.query("BEGIN");

// Save your business data
await db.query("INSERT INTO users (id, email, name) VALUES ($1, $2, $3)", [
  userId,
  email,
  name,
]);

// Save the event in the SAME transaction
await db.query(
  "INSERT INTO events (id, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6)",
  [randomUUID(), "UserCreated", { userId, email }, correlationId, {}, 0],
);

await db.query("COMMIT");
// ✅ Both user and event are saved atomically!
// If commit fails, neither is saved. If it succeeds, both are saved.
// The processor will pick up the event and send the email (and any other side effects that you register) asynchronously

Features

  • At-least-once delivery - Events are never lost, even during failures or crashes
  • Graceful shutdown - Finish processing in-flight events before shutting down
  • Horizontal scalability - Run multiple processors without conflicts using row-level locking
  • Database agnostic - Built-in support for PostgreSQL and MongoDB, or implement your own
  • Reduced polling frequency - Optional wakeup signals (Postgres NOTIFY, MongoDB Change Streams) to reduce database load and reduce latency
  • Configurable error handling - Exponential backoff, max retries, and custom error hooks
  • TypeScript-first - Full type safety and autocompletion
  • Handler result tracking - Track the execution status of each handler independently
  • Minimal dependencies - Only p-limit and p-queue (plus your database driver)

Quick Start

Installation

# For PostgreSQL
npm install txob pg

# For MongoDB
npm install txob mongodb

Basic Example (PostgreSQL)

1. Create the events table:

CREATE TABLE events (
  id UUID PRIMARY KEY,
  timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
  type VARCHAR(255) NOT NULL,
  data JSONB,
  correlation_id UUID,
  handler_results JSONB DEFAULT '{}',
  errors INTEGER DEFAULT 0,
  backoff_until TIMESTAMPTZ,
  processed_at TIMESTAMPTZ
);

-- Critical index for performance
CREATE INDEX idx_events_processing ON events(processed_at, backoff_until, errors)
WHERE processed_at IS NULL;

2. Set up the event processor:

import { EventProcessor } from "txob";
import { createProcessorClient } from "txob/pg";
import pg from "pg";

const client = new pg.Client({
  /* your config */
});
await client.connect();

const processor = EventProcessor(createProcessorClient({ querier: client }), {
  UserCreated: {
    // Handlers are processed concurrently and independently with retries
    // If one handler fails, others continue processing
    sendWelcomeEmail: async (event, { signal }) => {
      await emailService.send({
        to: event.data.email,
        subject: "Welcome!",
        template: "welcome",
      });
    },
    createStripeCustomer: async (event, { signal }) => {
      await stripe.customers.create({
        email: event.data.email,
        metadata: { userId: event.data.userId },
      });
    },
  },
});

processor.start();

// Graceful shutdown
process.on("SIGTERM", () => processor.stop());

3. Save events transactionally with your business logic:

import { randomUUID } from "crypto";

// Inside your application code
await client.query("BEGIN");

// Save your business data
await client.query("INSERT INTO users (id, email, name) VALUES ($1, $2, $3)", [
  userId,
  email,
  name,
]);

// Save the event in the SAME transaction
await client.query(
  "INSERT INTO events (id, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6)",
  [randomUUID(), "UserCreated", { userId, email }, correlationId, {}, 0],
);

await client.query("COMMIT");
// ✅ Both user and event are saved atomically!
// The processor will pick up the event and send the email

That's it! The processor will automatically poll for new events and execute your handlers.

Optional: Reduce polling with wakeup signals

For better performance, you can set up wakeup signals to reduce polling frequency:

// PostgreSQL: Use Postgres NOTIFY
import { createWakeupEmitter } from "txob/pg";

const wakeupEmitter = await createWakeupEmitter({
  listenClientConfig: clientConfig,
  createTrigger: true,
  querier: client,
});

// MongoDB: Use Change Streams
import { createWakeupEmitter } from "txob/mongodb";

const wakeupEmitter = await createWakeupEmitter({
  mongo: mongoClient,
  db: "myapp",
});

// Use with EventProcessor
const processor = new EventProcessor({
  client: processorClient,
  wakeupEmitter, // Polls immediately when new events arrive
  handlerMap: {
    /* ... */
  },
});

When a wakeup emitter is provided, the processor will:

  • Poll immediately when new events are inserted (via wakeup signal)
  • Still poll periodically as a fallback if wakeup signals are missed
  • Throttle wakeup signals to prevent excessive polling during bursts

How It Works

┌─────────────────────────────────────────────────────────────────┐
│                     Your Application                             │
├─────────────────────────────────────────────────────────────────┤
│  BEGIN TRANSACTION                                               │
│    1. Insert/Update business data (users, orders, etc.)         │
│    2. Insert event record                                        │
│  COMMIT TRANSACTION                                              │
└─────────────────────────────────────────────────────────────────┘
                           │
                           │ Both saved atomically ✅
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Events Table                                  │
│  [id] [type] [data] [processed_at] [errors] [backoff_until]    │
└─────────────────────────────────────────────────────────────────┘
                           │
        ┌──────────────────┴──────────────────┐
        │                                      │
        │ (Optional) Wakeup Signal            │
        │ (Postgres NOTIFY / MongoDB Stream) │
        │                                      │
        ▼                                      ▼
┌──────────────────────────────┐   ┌──────────────────────────────┐
│      Polling Component       │   │   Fallback Polling Loop      │
│  (Decoupled from Processing) │   │   (If wakeup signals missed)│
├──────────────────────────────┤   ├──────────────────────────────┤
│  • Listens for wakeup signals│   │  • Polls periodically        │
│  • Throttles rapid signals   │   │  • Only if no recent wakeup  │
│  • Triggers immediate poll    │   │  • Uses same throttled poll  │
└──────────────────────────────┘   └──────────────────────────────┘
        │                                      │
        └──────────────────┬──────────────────┘
                           │
                           │ SELECT unprocessed events
                           │ (FOR UPDATE SKIP LOCKED)
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Processing Queue                             │
│  (Concurrency-controlled event queue)                           │
└─────────────────────────────────────────────────────────────────┘
                           │
                           │ Process events concurrently
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│                   Event Processor                                │
├─────────────────────────────────────────────────────────────────┤
│  1. Lock event in transaction                                   │
│  2. Execute handlers concurrently (send email, webhook, etc.)  │
│  3. UPDATE event with results and processed_at                  │
│  4. On failure: increment errors, set backoff_until             │
└─────────────────────────────────────────────────────────────────┘

Key Points:

  • Events are saved in the same transaction as your business data
  • If the transaction fails, neither the data nor event is saved
  • The processor runs independently and guarantees at-least-once delivery
  • Multiple processors can run concurrently using database row locking
  • Failed events are retried with exponential backoff
  • Polling and processing are decoupled - polling finds events, processing queue handles execution
  • Wakeup signals (Postgres NOTIFY or MongoDB Change Streams) reduce polling latency
  • Throttled polling prevents excessive database queries during event bursts
  • Fallback polling ensures events are processed even if wakeup signals are missed

For a detailed architecture diagram, see architecture.mmd.

Core Concepts

Event Structure

Every event in txob follows this structure:

interface TxOBEvent<EventType extends string> {
  id: string; // Unique event identifier (UUID recommended)
  timestamp: Date; // When the event was created
  type: EventType; // Event type (e.g., "UserCreated", "OrderPlaced")
  data: Record<string, unknown>; // Event payload - your custom data
  correlation_id: string; // For tracing requests across services
  handler_results: Record<string, TxOBEventHandlerResult>; // Results from each handler
  errors: number; // Number of processing attempts
  backoff_until?: Date; // When to retry (null if not backing off)
  processed_at?: Date; // When fully processed (null if pending)
}

Field Explanations:

  • handler_results: Tracks each handler's status independently. If one handler fails, others can still succeed
  • errors: Global error count. When it reaches maxErrors, the event is marked as processed (failed)
  • backoff_until: Prevents immediate retries. Set to future timestamp after failures
  • correlation_id: Essential for distributed tracing and debugging

Event Handlers

Handlers are async functions that execute your side-effects:

type TxOBEventHandler = (
  event: TxOBEvent,
  opts: { signal?: AbortSignal },
) => Promise<void>;

Important: Handlers should be idempotent because they may be called multiple times for the same event (at-least-once delivery).

// ✅ Good: Idempotent handler
const sendEmail: TxOBEventHandler = async (event) => {
  const alreadySent = await checkIfEmailSent(event.data.userId);
  if (alreadySent) return; // Safe to retry

  await emailService.send(event.data.email);
};

// ❌ Bad: Not idempotent
const incrementCounter: TxOBEventHandler = async (event) => {
  await db.query("UPDATE counters SET count = count + 1"); // Will increment multiple times!
};

Handler Results

Each handler's execution is tracked independently:

type TxOBEventHandlerResult = {
  processed_at?: Date; // When this handler succeeded
  unprocessable_at?: Date; // When this handler was marked unprocessable
  errors?: Array<{
    // Error history for this handler
    error: unknown;
    timestamp: Date;
  }>;
};

This means if you have 3 handlers and 1 fails, the other 2 won't be re-executed on retry.

Delivery Guarantees

txob implements at-least-once delivery:

  • ✅ Events are never lost even if the processor crashes
  • ⚠️ Handlers may be called multiple times for the same event
  • ⚠️ If updateEvent fails after handlers succeed, they will be re-invoked

Why this matters:

// This handler will be called again if the event update fails
UserCreated: {
  sendEmail: async (event) => {
    await emailService.send(event.data.email); // ✅ Sent successfully
    // 💥 But if updateEvent() fails here, this will run again!
  };
}

Solution: Make your handlers idempotent (check if work was already done before doing it again).

Error Handling

txob provides sophisticated error handling:

1. Automatic Retries with Backoff

EventProcessor(client, handlers, {
  maxErrors: 5, // Retry up to 5 times
  backoff: (errorCount) => {
    // Custom backoff strategy
    const delayMs = 1000 * 2 ** errorCount; // Exponential: 1s, 2s, 4s, 8s, 16s
    return new Date(Date.now() + delayMs);
  },
});

2. Custom Backoff with TxOBError

You can throw TxOBError to specify a custom backoff time for retries:

import { TxOBError } from "txob";

UserCreated: {
  sendEmail: async (event) => {
    try {
      await emailService.send(event.data.email);
    } catch (err) {
      if (err.code === "RATE_LIMIT_EXCEEDED") {
        // Retry after 1 minute instead of using default backoff
        throw new TxOBError("Rate limit exceeded", {
          cause: err,
          backoffUntil: new Date(Date.now() + 60000),
        });
      }
      throw err; // Use default backoff for other errors
    }
  };
}

Note: If multiple handlers throw TxOBError with different backoffUntil dates, the processor will use the latest (maximum) backoff time.

3. Unprocessable Events

Sometimes an event cannot be processed (e.g., invalid data). Mark it as unprocessable to stop retrying:

import { ErrorUnprocessableEventHandler } from "txob";

UserCreated: {
  sendEmail: async (event) => {
    if (!isValidEmail(event.data.email)) {
      throw new ErrorUnprocessableEventHandler(
        new Error("Invalid email address"),
      );
    }
    await emailService.send(event.data.email);
  };
}

4. Max Errors Hook

When an event reaches max errors, you can create a "dead letter" event:

EventProcessor(client, handlers, {
  onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
    // Save a failure event in the same transaction
    await txClient.createEvent({
      id: randomUUID(),
      type: "EventFailed",
      data: {
        originalEventId: event.id,
        originalEventType: event.type,
        reason: "Max errors reached",
      },
      correlation_id: event.correlation_id,
      handler_results: {},
      errors: 0,
    });

    // Send alert, log to monitoring system, etc.
  },
});

Database Setup

PostgreSQL

1. Create the events table:

CREATE TABLE events (
  id UUID PRIMARY KEY,
  timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
  type VARCHAR(255) NOT NULL,
  data JSONB,
  correlation_id UUID,
  handler_results JSONB DEFAULT '{}',
  errors INTEGER DEFAULT 0,
  backoff_until TIMESTAMPTZ,
  processed_at TIMESTAMPTZ
);

2. Create indexes for optimal performance:

-- Critical: Partial index for unprocessed events (keeps index small and fast)
CREATE INDEX idx_events_processing ON events(processed_at, backoff_until, errors)
WHERE processed_at IS NULL;

-- Unique index on id (if not using id as PRIMARY KEY)
CREATE UNIQUE INDEX idx_events_id ON events(id);

-- Optional: For querying by correlation_id
CREATE INDEX idx_events_correlation_id ON events(correlation_id);

Why these indexes?

The idx_events_processing partial index is critical for performance. It:

  • Only indexes unprocessed events (WHERE processed_at IS NULL)
  • Stays small as events are processed
  • Covers the main query pattern: processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < maxErrors

3. Use the PostgreSQL client:

import { createProcessorClient } from "txob/pg";
import pg from "pg";

const client = new pg.Client({
  user: process.env.POSTGRES_USER,
  password: process.env.POSTGRES_PASSWORD,
  database: process.env.POSTGRES_DB,
});
await client.connect();

const processorClient = createProcessorClient({
  querier: client,
  table: "events", // Optional: table name (default: "events")
  limit: 100, // Optional: max events per poll (default: 100)
});

4. (Optional) Set up wakeup signals to reduce polling:

import { createWakeupEmitter } from "txob/pg";

// Create a wakeup emitter using Postgres NOTIFY
// This will automatically create a trigger that sends NOTIFY on INSERT
const wakeupEmitter = await createWakeupEmitter({
  listenClientConfig: clientConfig,
  createTrigger: true,
  querier: client,
  table: "events", // Optional: table name (default: "events")
  channel: "txob_events", // Optional: NOTIFY channel (default: "txob_events")
});

// Use with EventProcessor
const processor = new EventProcessor({
  client: processorClient,
  wakeupEmitter, // Reduces polling frequency when new events arrive
  handlerMap: {
    /* ... */
  },
  pollingIntervalMs: 5000, // Still used as fallback if wakeup signals are missed
  wakeupTimeoutMs: 60000, // Fallback poll if no wakeup signal received in 60s
  wakeupThrottleMs: 1000, // Throttle wakeup signals to prevent excessive polling
});

MongoDB

1. Create the events collection with indexes:

import { MongoClient } from "mongodb";

const mongoClient = new MongoClient(process.env.MONGO_URL);
await mongoClient.connect();
const db = mongoClient.db("myapp");

// Create collection
const eventsCollection = db.collection("events");

// Create indexes
await eventsCollection.createIndex(
  { processed_at: 1, backoff_until: 1, errors: 1 },
  { partialFilterExpression: { processed_at: null } },
);
await eventsCollection.createIndex({ id: 1 }, { unique: true });
await eventsCollection.createIndex({ correlation_id: 1 });

2. Use the MongoDB client:

import { createProcessorClient } from "txob/mongodb";

const processorClient = createProcessorClient({
  mongo: mongoClient,
  db: "myapp", // Database name
  collection: "events", // Optional: collection name (default: "events")
  limit: 100, // Optional: max events per poll (default: 100)
});

3. (Optional) Set up wakeup signals to reduce polling:

import { createWakeupEmitter } from "txob/mongodb";

// Create a wakeup emitter using MongoDB Change Streams
// Note: Requires a replica set or sharded cluster
const wakeupEmitter = await createWakeupEmitter({
  mongo: mongoClient,
  db: "myapp",
  collection: "events", // Optional: collection name (default: "events")
});

// Use with EventProcessor
const processor = new EventProcessor({
  client: processorClient,
  wakeupEmitter, // Reduces polling frequency when new events arrive
  handlerMap: {
    /* ... */
  },
  pollingIntervalMs: 5000, // Still used as fallback if wakeup signals are missed
  wakeupTimeoutMs: 60000, // Fallback poll if no wakeup signal received in 60s
  wakeupThrottleMs: 100, // Throttle wakeup signals to prevent excessive polling
});

// Handle wakeup emitter errors (e.g., if not configured for replica set)
wakeupEmitter.on("error", (err) => {
  console.error("Wakeup emitter error:", err);
  // Processor will automatically fall back to polling
});

Note:

  • MongoDB transactions require a replica set or sharded cluster. See MongoDB docs.
  • MongoDB Change Streams (used for wakeup signals) also require a replica set or sharded cluster. If your MongoDB instance is a standalone server, you must convert it to a single-node replica set by running rs.initiate() in the mongo shell.

Custom Database

Implement the TxOBProcessorClient interface:

interface TxOBProcessorClient<EventType extends string> {
  getEventsToProcess(opts: {
    signal?: AbortSignal;
    maxErrors: number;
  }): Promise<Pick<TxOBEvent<EventType>, "id" | "errors">[]>;

  transaction(
    fn: (txClient: TxOBTransactionProcessorClient<EventType>) => Promise<void>,
  ): Promise<void>;
}

interface TxOBTransactionProcessorClient<EventType extends string> {
  getEventByIdForUpdateSkipLocked(
    eventId: string,
    opts: { signal?: AbortSignal; maxErrors: number },
  ): Promise<TxOBEvent<EventType> | null>;

  updateEvent(event: TxOBEvent<EventType>): Promise<void>;

  createEvent(
    event: Omit<TxOBEvent<EventType>, "processed_at" | "backoff_until">,
  ): Promise<void>;
}

See src/pg/client.ts or src/mongodb/client.ts for reference implementations.

Configuration

Processor Options

EventProcessor(client, handlerMap, {
  // Polling interval in milliseconds (default: 5000)
  sleepTimeMs: 5000,

  // Maximum errors before marking event as processed/failed (default: 5)
  maxErrors: 5,

  // Backoff calculation function (default: exponential backoff)
  backoff: (errorCount: number): Date => {
    const baseDelayMs = 1000;
    const maxDelayMs = 60000;
    const backoffMs = Math.min(baseDelayMs * 2 ** errorCount, maxDelayMs);
    return new Date(Date.now() + backoffMs);
  },

  // Maximum concurrent events being processed (default: 5)
  maxEventConcurrency: 5,

  // Maximum concurrent handlers per event (default: 10)
  maxHandlerConcurrency: 10,

  // Custom logger (default: undefined)
  logger: {
    debug: (msg, ...args) => console.debug(msg, ...args),
    info: (msg, ...args) => console.info(msg, ...args),
    warn: (msg, ...args) => console.warn(msg, ...args),
    error: (msg, ...args) => console.error(msg, ...args),
  },

  // Hook called when max errors reached (default: undefined)
  onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
    // Create a dead-letter event, send alerts, etc.
  },
});

Configuration Reference

Option Type Default Description
pollingIntervalMs number 5000 Milliseconds between polling cycles (used when no wakeup emitter or as fallback)
maxErrors number 5 Max retry attempts before marking as failed
backoff (count: number) => Date Exponential Calculate next retry time
maxEventConcurrency number 5 Max events processed simultaneously
maxHandlerConcurrency number 10 Max handlers per event running concurrently
wakeupEmitter WakeupEmitter undefined Optional wakeup signal emitter (Postgres NOTIFY or MongoDB Change Streams)
wakeupTimeoutMs number 30000 Fallback poll if no wakeup signal received (only used with wakeupEmitter)
wakeupThrottleMs number 1000 Throttle wakeup signals to prevent excessive polling (only used with wakeupEmitter)
logger Logger undefined Custom logger interface
onEventMaxErrorsReached function undefined Hook for max errors

Usage Examples

Complete HTTP API Example

This example shows a complete HTTP API that creates users and sends welcome emails transactionally:

import http from "node:http";
import { randomUUID } from "node:crypto";
import pg from "pg";
import gracefulShutdown from "http-graceful-shutdown";
import { EventProcessor, ErrorUnprocessableEventHandler } from "txob";
import { createProcessorClient } from "txob/pg";

// 1. Define your event types
const eventTypes = {
  UserCreated: "UserCreated",
  EventMaxErrorsReached: "EventMaxErrorsReached",
} as const;

type EventType = keyof typeof eventTypes;

// 2. Set up database connection
const client = new pg.Client({
  user: process.env.POSTGRES_USER,
  password: process.env.POSTGRES_PASSWORD,
  database: process.env.POSTGRES_DB,
});
await client.connect();

// 3. Create and start the processor
const processor = EventProcessor(
  createProcessorClient<EventType>({ querier: client }),
  {
    UserCreated: {
      sendEmail: async (event, { signal }) => {
        // Check if email was already sent (idempotency)
        const sent = await checkEmailSent(event.data.userId);
        if (sent) return;

        // Send email
        await emailService.send({
          to: event.data.email,
          subject: "Welcome!",
          template: "welcome",
        });

        // Use signal for cleanup on shutdown
        signal?.addEventListener("abort", () => {
          emailService.cancelPending();
        });
      },

      publishToEventBus: async (event) => {
        await eventBus.publish("user.created", event.data);
      },
    },

    EventMaxErrorsReached: {
      alertOps: async (event) => {
        await slack.send({
          channel: "#alerts",
          text: `Event failed: ${event.data.eventType} (${event.data.eventId})`,
        });
      },
    },
  },
  {
    sleepTimeMs: 5000,
    maxErrors: 5,
    logger: console,
    onEventMaxErrorsReached: async ({ event, txClient }) => {
      await txClient.createEvent({
        id: randomUUID(),
        timestamp: new Date(),
        type: eventTypes.EventMaxErrorsReached,
        data: {
          eventId: event.id,
          eventType: event.type,
        },
        correlation_id: event.correlation_id,
        handler_results: {},
        errors: 0,
      });
    },
  },
);

processor.start();

// 4. Create HTTP server
const server = http.createServer(async (req, res) => {
  if (req.url !== "/users" || req.method !== "POST") {
    res.statusCode = 404;
    return res.end();
  }

  const correlationId = req.headers["x-correlation-id"] || randomUUID();

  try {
    const body = await getBody(req);
    const { email, name } = JSON.parse(body);

    // Start transaction
    await client.query("BEGIN");

    // Save user
    const userId = randomUUID();
    await client.query(
      "INSERT INTO users (id, email, name) VALUES ($1, $2, $3)",
      [userId, email, name],
    );

    // Save event IN THE SAME TRANSACTION
    await client.query(
      "INSERT INTO events (id, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6)",
      [
        randomUUID(),
        eventTypes.UserCreated,
        { userId, email, name },
        correlationId,
        {},
        0,
      ],
    );

    // Commit transaction
    await client.query("COMMIT");

    res.statusCode = 201;
    res.end(JSON.stringify({ userId }));
  } catch (error) {
    await client.query("ROLLBACK").catch(() => {});
    res.statusCode = 500;
    res.end(JSON.stringify({ error: "Internal server error" }));
  }
});

const HTTP_PORT = process.env.PORT || 3000;
server.listen(HTTP_PORT, () => console.log(`Server listening on ${HTTP_PORT}`));

// 5. Graceful shutdown
gracefulShutdown(server, {
  onShutdown: async () => {
    await processor.stop(); // Wait for in-flight events to complete
    await client.end();
  },
});

Multiple Event Types

const processor = EventProcessor(createProcessorClient({ querier: client }), {
  UserCreated: {
    sendWelcomeEmail: async (event) => {
      /* ... */
    },
    createStripeCustomer: async (event) => {
      /* ... */
    },
  },

  OrderPlaced: {
    sendConfirmationEmail: async (event) => {
      /* ... */
    },
    updateInventory: async (event) => {
      /* ... */
    },
    notifyWarehouse: async (event) => {
      /* ... */
    },
  },

  PaymentFailed: {
    sendRetryEmail: async (event) => {
      /* ... */
    },
    logToAnalytics: async (event) => {
      /* ... */
    },
  },
});

Custom Backoff Strategy

// Linear backoff: 5s, 10s, 15s, 20s, 25s
const linearBackoff = (errorCount: number): Date => {
  const delayMs = 5000 * errorCount;
  return new Date(Date.now() + delayMs);
};

// Fixed delay: always 30s
const fixedBackoff = (): Date => {
  return new Date(Date.now() + 30000);
};

// Fibonacci backoff: 1s, 1s, 2s, 3s, 5s, 8s, 13s...
const fibonacciBackoff = (() => {
  const fib = (n: number): number => (n <= 1 ? 1 : fib(n - 1) + fib(n - 2));
  return (errorCount: number): Date => {
    const delayMs = fib(errorCount) * 1000;
    return new Date(Date.now() + delayMs);
  };
})();

EventProcessor(client, handlers, {
  backoff: linearBackoff, // or fixedBackoff, or fibonacciBackoff
});

Using AbortSignal for Cleanup

UserCreated: {
  sendEmail: async (event, { signal }) => {
    // Long-running operation
    const emailJob = emailService.sendLarge(event.data);

    // Listen for shutdown signal
    signal?.addEventListener("abort", () => {
      console.log("Shutdown requested, canceling email...");
      emailJob.cancel(); // Clean up quickly so event can be saved
    });

    await emailJob;
  };
}

Using txob with Message Queues

Use txob to guarantee consistency between your database and queue, then let the queue handle low-latency distribution:

import { EventProcessor } from "txob";
import { createProcessorClient } from "txob/pg";
import { Kafka } from "kafkajs";

const kafka = new Kafka({ brokers: ["localhost:9092"] });
const producer = kafka.producer();
await producer.connect();

const processor = EventProcessor(createProcessorClient({ querier: client }), {
  UserCreated: {
    // Publish to Kafka with guaranteed consistency
    publishToKafka: async (event) => {
      // Kafka's idempotent producer handles deduplication
      // Using event.id as the key ensures retries are safe
      await producer.send({
        topic: "user-events",
        messages: [
          {
            key: event.id, // Use event.id for idempotency
            value: JSON.stringify({
              type: event.type,
              data: event.data,
              timestamp: event.timestamp,
            }),
          },
        ],
      });
    },

    // Also handle other side effects
    sendEmail: async (event) => {
      await emailService.send(event.data.email);
    },
  },
});

processor.start();

Benefits of this approach:

  • ✅ Database and Kafka are guaranteed consistent (via txob's transactional guarantees)
  • ✅ If Kafka publish fails, txob will retry automatically
  • ✅ Downstream consumers get low-latency events from Kafka
  • ✅ You can still handle other side effects (email, webhooks) in parallel
  • ✅ Best of both worlds: consistency from txob + speed from Kafka

Separate Processor Service

You can run the processor as a separate service from your API:

// processor-service.ts
import { EventProcessor } from "txob";
import { createProcessorClient } from "txob/pg";
import pg from "pg";

const client = new pg.Client({
  /* config */
});
await client.connect();

const processor = EventProcessor(createProcessorClient({ querier: client }), {
  // All your handlers...
});

processor.start();
console.log("Event processor started");

// Graceful shutdown
const shutdown = async () => {
  console.log("Shutting down...");
  await processor.stop();
  await client.end();
  process.exit(0);
};

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);

Run multiple instances for horizontal scaling:

# Terminal 1
node processor-service.js

# Terminal 2
node processor-service.js

# Terminal 3
node processor-service.js

All three will coordinate using database row locking (FOR UPDATE SKIP LOCKED).

API Reference

EventProcessor(client, handlerMap, options?)

Creates and returns a processor instance with start() and stop() methods.

Parameters:

  • client: TxOBProcessorClient<EventType> - Database client
  • handlerMap: TxOBEventHandlerMap<EventType> - Map of event types to handlers
  • options?: ProcessorOptions - Configuration options

Returns:

{
  start: () => void;
  stop: (opts?: { timeoutMs?: number }) => Promise<void>;
}

Example:

const processor = EventProcessor(client, handlers);
processor.start();
await processor.stop({ timeoutMs: 10000 }); // 10 second timeout

createProcessorClient (PostgreSQL)

Creates a PostgreSQL processor client.

import { createProcessorClient } from "txob/pg";

createProcessorClient<EventType>(opts: {
  querier: pg.Client;
  table?: string;    // Default: "events"
  limit?: number;   // Default: 100
}): TxOBProcessorClient<EventType>

createProcessorClient (MongoDB)

Creates a MongoDB processor client.

import { createProcessorClient } from "txob/mongodb";

createProcessorClient<EventType>(opts: {
  mongo: mongodb.MongoClient;
  db: string;               // Database name
  collection?: string;      // Default: "events"
  limit?: number;           // Default: 100
}): TxOBProcessorClient<EventType>

TxOBError

Error class to specify custom backoff times for retries.

import { TxOBError } from "txob";

// Throw with custom backoff time
throw new TxOBError("Rate limit exceeded", {
  backoffUntil: new Date(Date.now() + 60000), // Retry after 1 minute
});

// Throw with cause and custom backoff
throw new TxOBError("Processing failed", {
  cause: originalError,
  backoffUntil: new Date(Date.now() + 30000), // Retry after 30 seconds
});

Note: If multiple handlers throw TxOBError with different backoffUntil dates, the processor will use the latest (maximum) backoff time.

ErrorUnprocessableEventHandler

Error class to mark a handler as unprocessable (stops retrying).

import { ErrorUnprocessableEventHandler } from "txob";

throw new ErrorUnprocessableEventHandler(new Error("Invalid data"));

createWakeupEmitter (PostgreSQL)

Creates a Postgres NOTIFY-based wakeup emitter to reduce polling frequency.

import { createWakeupEmitter } from "txob/pg";

const wakeupEmitter = await createWakeupEmitter({
  listenClientConfig: clientConfig,
  createTrigger: true, // Automatically create database trigger
  querier: client, // Required if createTrigger is true
  table: "events", // Optional: table name (default: "events")
  channel: "txob_events", // Optional: NOTIFY channel (default: "txob_events")
});

The trigger automatically sends NOTIFY when new events are inserted. The wakeup emitter emits wakeup events that trigger immediate polling, reducing the need for constant polling.

createWakeupEmitter (MongoDB)

Creates a MongoDB Change Stream-based wakeup emitter to reduce polling frequency.

import { createWakeupEmitter } from "txob/mongodb";

const wakeupEmitter = await createWakeupEmitter({
  mongo: mongoClient,
  db: "myapp",
  collection: "events", // Optional: collection name (default: "events")
});

Important: MongoDB Change Streams require a replica set or sharded cluster. If your MongoDB instance is a standalone server, you must convert it to a single-node replica set by running rs.initiate() in the mongo shell.

If the database is not configured for Change Streams, an error will be emitted via the error event on the returned WakeupEmitter. The processor will automatically fall back to polling.

Types

// Main event type
type TxOBEvent<EventType extends string> = {
  id: string;
  timestamp: Date;
  type: EventType;
  data: Record<string, unknown>;
  correlation_id: string;
  handler_results: Record<string, TxOBEventHandlerResult>;
  errors: number;
  backoff_until?: Date | null;
  processed_at?: Date;
};

// Handler function signature
type TxOBEventHandler = (
  event: TxOBEvent,
  opts: { signal?: AbortSignal },
) => Promise<void>;

// Handler map structure
type TxOBEventHandlerMap<EventType extends string> = Record<
  EventType,
  Record<string, TxOBEventHandler>
>;

// Handler result tracking
type TxOBEventHandlerResult = {
  processed_at?: Date;
  unprocessable_at?: Date;
  errors?: Array<{
    error: unknown;
    timestamp: Date;
  }>;
};

// Logger interface
interface Logger {
  debug(message?: unknown, ...optionalParams: unknown[]): void;
  info(message?: unknown, ...optionalParams: unknown[]): void;
  warn(message?: unknown, ...optionalParams: unknown[]): void;
  error(message?: unknown, ...optionalParams: unknown[]): void;
}

Best Practices

✅ Do

  • Keep handlers focused - Each handler should perform a single, independent side effect (send email, call webhook, etc.)
  • Make handlers idempotent - Check if work was already done before doing it again
  • Use correlation IDs - Essential for tracing and debugging distributed systems
  • Set appropriate maxErrors - Balance between retry attempts and failure detection
  • Monitor handler performance - Track execution time and error rates
  • Use AbortSignal - Implement quick cleanup during graceful shutdown
  • Create indexes - The partial index on processed_at is critical for performance
  • Validate event data - Throw ErrorUnprocessableEventHandler for invalid data
  • Use transactions - Always save events with business data in the same transaction
  • Test handlers - Unit test handlers independently with mock events
  • Log with context - Include event.id and correlation_id in all logs

❌ Don't

  • Don't assume exactly-once - Handlers may be called multiple times
  • Don't require event ordering - Handlers should be independent; if you need ordering, reconsider your design
  • Don't perform long operations without signal checks - Delays shutdown
  • Don't ignore errors - Handle them appropriately or let them propagate
  • Don't skip the indexes - Performance will degrade rapidly
  • Don't save events outside transactions - Defeats the purpose of the outbox pattern
  • Don't use for real-time processing - Polling introduces latency (default 5s)
  • Don't modify events in handlers - Event object is read-only
  • Don't share mutable state - Handlers may run concurrently
  • Don't forget correlation IDs - Makes debugging distributed issues very difficult

Performance Tips

  1. Tune concurrency limits - Adjust maxEventConcurrency and maxHandlerConcurrency based on your workload
  2. Reduce polling interval - Lower sleepTimeMs for lower latency (at cost of more database queries)
  3. Batch operations - If handlers can batch work, collect multiple events
  4. Monitor query performance - Use EXPLAIN ANALYZE on the getEventsToProcess query
  5. Partition the events table - For very high volume, partition by processed_at or timestamp
  6. Archive processed events - Move old processed events to archive table to keep main table small

Troubleshooting

Events are not being processed

Check:

  1. Is the processor started? processor.start() was called?
  2. Is the database connection working?
  3. Are events actually being saved? Query the events table
  4. Is processed_at NULL on pending events?
  5. Is backoff_until in the past (or NULL)?
  6. Is errors less than maxErrors?

Debug:

EventProcessor(client, handlers, {
  logger: console, // Enable logging
});

High error rates

Check:

  1. Are handlers throwing errors? Check logs
  2. Is an external service down? (email, API, etc.)
  3. Is event data invalid? Add validation
  4. Are handlers timing out? Increase timeouts

Solutions:

  • Use ErrorUnprocessableEventHandler for invalid data
  • Implement circuit breakers for external services
  • Add retries within handlers for transient failures
  • Increase maxErrors if failures are expected

Events stuck in "processing" state

This happens when:

  1. Processor crashed after locking event but before updating
  2. Transaction was rolled back

Solution: Events are never truly "stuck" - they're locked at the transaction level. Once the transaction ends (commit or rollback), the lock is released and another processor can pick it up.

If using FOR UPDATE SKIP LOCKED properly (which txob does), stuck events are not possible.

Performance is slow

Check:

  1. Do you have the recommended indexes?
  2. How many unprocessed events are in the table?
  3. What's your maxEventConcurrency setting?
  4. Are handlers slow? Profile them

Solutions:

  • Create the partial index on processed_at
  • Archive or delete old processed events
  • Increase maxEventConcurrency
  • Optimize slow handlers
  • Run multiple processor instances

Memory usage is high

Check:

  1. How many events are processed concurrently?
  2. Are handlers leaking memory?
  3. Is the events table huge?

Solutions:

  • Lower maxEventConcurrency
  • Profile handlers for memory leaks
  • Archive old events
  • Reduce limit in createProcessorClient({ querier: client, table, limit })

Duplicate handler executions

This is expected behavior due to at-least-once delivery. It happens when:

  • Event update fails after handler succeeds
  • Processor crashes after handler succeeds but before updating event

Solution: Make handlers idempotent:

// ✅ Idempotent: Check before doing work
const handler = async (event) => {
  const alreadyDone = await checkWorkStatus(event.id);
  if (alreadyDone) return;

  await doWork(event.data);
  await markWorkDone(event.id);
};

Frequently Asked Questions

How is this different from a message queue?

The fundamental problem with message queues:

Message queues (RabbitMQ, SQS, Kafka) are separate systems from your database. You cannot make both operations atomic:

// ❌ This is NOT atomic - the queue and database are separate systems
await db.query("BEGIN");
await db.createUser(user);
await messageQueue.publish("user.created", user); // ✅ Succeeds
await db.query("COMMIT"); // 💥 Fails! Message is queued but user doesn't exist

Even if you publish after commit, you have the opposite problem:

// ❌ Also NOT atomic
await db.query("BEGIN");
await db.createUser(user);
await db.query("COMMIT"); // ✅ Succeeds
await messageQueue.publish("user.created", user); // 💥 Fails! User exists but no message

Message queues require:

  • Additional infrastructure to run and monitor
  • Network calls to publish messages (can fail independently of database)
  • Handling connection failures
  • No way to guarantee consistency between database and queue
  • Complex error recovery (replay, reconciliation, etc.)

Transactional Outbox with txob:

  • Uses your existing database (no additional infrastructure)
  • Guaranteed consistency - events saved in same transaction as data (atomicity via ACID)
  • No network calls during transaction (everything is in one database)
  • Simpler operational model
  • If transaction fails, neither data nor events are saved
  • If transaction succeeds, both data and events are saved

Trade-offs:

  • Message queues: Lower latency (~10ms), higher throughput (10k+/s)
  • txob: Higher latency (~5s default), moderate throughput (10-100/s per processor)

Can I use txob WITH message queues?

Yes! This is actually a great pattern. Use txob to guarantee consistency, then publish to your queue from a handler:

UserCreated: {
  publishToKafka: async (event) => {
    // Now this is guaranteed to only run if user was created
    await kafka.publish("user.created", event.data);
    // If this fails, txob will retry it
  };
}

This gives you:

  • ✅ Guaranteed consistency between database and queue (via txob)
  • ✅ Low latency downstream (via message queue)
  • ✅ Idempotent publishing (txob handles retries)

Can I use this without microservices?

Yes! The transactional outbox pattern is useful in any application that needs reliable side-effects:

  • Monoliths that send emails
  • Single-service apps that call webhooks
  • Any app that needs guaranteed event delivery

You don't need a microservices architecture to benefit from txob.

What happens if the processor crashes?

The processor is designed to handle crashes gracefully:

  1. During handler execution: The transaction hasn't committed yet, so the event remains unprocessed. Another processor (or restart) will pick it up.
  2. After handler but before update: Same as above - event remains unprocessed.
  3. During event update: Database transaction ensures atomicity. Either the update completes or it doesn't.

Result: Events are never lost. At worst, handlers are called again (which is why idempotency matters).

How do I handle duplicate handler executions?

Make your handlers idempotent by checking if work was already done:

// Pattern 1: Check external system
const sendEmail = async (event) => {
  const sent = await emailService.checkSent(event.id);
  if (sent) return; // Already sent

  await emailService.send(event.data.email);
};

// Pattern 2: Use unique constraints
const createStripeCustomer = async (event) => {
  try {
    await stripe.customers.create({
      id: event.data.userId, // Stripe will reject if already exists
      email: event.data.email,
    });
  } catch (err) {
    if (err.code === "resource_already_exists") return; // Already created
    throw err;
  }
};

// Pattern 3: Track in database
const processPayment = async (event) => {
  const processed = await db.query(
    "SELECT 1 FROM payment_events WHERE event_id = $1",
    [event.id],
  );
  if (processed.rowCount > 0) return;

  await processPayment(event.data);
  await db.query("INSERT INTO payment_events (event_id) VALUES ($1)", [
    event.id,
  ]);
};

Can I process events in a specific order?

Short answer: You generally shouldn't need to.

Events are processed concurrently by default, and handlers should contain single, independent side effects. If you need ordering, it usually indicates a design issue.

Why ordering is usually a design smell:

  • Each handler should represent one side effect (send email, call webhook, etc.)
  • Side effects are typically independent and don't need ordering
  • Ordering defeats the purpose of concurrent processing and reduces throughput
  • If side effects must happen in sequence, they might belong in the same handler

Better approaches:

  1. Make side effects independent (recommended):
UserCreated: {
  sendEmail: async (event) => { /* sends welcome email */ },
  createStripeCustomer: async (event) => { /* creates customer */ },
  // These can run in any order or concurrently ✅
}
  1. If truly dependent, combine into one handler:
UserCreated: {
  completeOnboarding: async (event) => {
    // These MUST happen in order
    await createStripeCustomer(event.data);
    await sendWelcomeEmail(event.data);
    await enrollInTrial(event.data);
  };
}
  1. Use separate event types for workflows:
// Event 1 creates the customer
UserCreated: {
  createStripeCustomer: async (event) => {
    await stripe.customers.create(event.data);
    // Create next event when done
    await createEvent({ type: "StripeCustomerCreated", ... });
  }
}

// Event 2 sends the email
StripeCustomerCreated: {
  sendWelcomeEmail: async (event) => {
    await emailService.send(event.data);
  }
}

If you absolutely must process events sequentially (not recommended):

EventProcessor(client, handlers, {
  maxEventConcurrency: 1, // Forces sequential processing
  // ⚠️ This sacrifices throughput and concurrency benefits
});

How do I monitor event processing?

1. Use the logger option:

EventProcessor(client, handlers, {
  logger: myLogger, // Logs all processing activity
});

2. Query the events table:

-- Pending events
SELECT COUNT(*) FROM events WHERE processed_at IS NULL;

-- Failed events (max errors reached)
SELECT * FROM events WHERE errors >= 5 AND processed_at IS NOT NULL;

-- Events by type
SELECT type, COUNT(*) FROM events GROUP BY type;

-- Average processing time (requires timestamp tracking)
SELECT type, AVG(processed_at - timestamp) as avg_duration
FROM events WHERE processed_at IS NOT NULL
GROUP BY type;

3. Create monitoring events:

onEventMaxErrorsReached: async ({ event, txClient }) => {
  await txClient.createEvent({
    id: randomUUID(),
    type: "EventFailed",
    data: { originalEvent: event },
    correlation_id: event.correlation_id,
    handler_results: {},
    errors: 0,
  });

  // Send to monitoring service
  await monitoring.recordFailure(event);
};

Can I use this with TypeScript?

Yes! txob is written in TypeScript and provides full type safety:

// Define your event types
const eventTypes = {
  UserCreated: "UserCreated",
  OrderPlaced: "OrderPlaced",
} as const;

type EventType = keyof typeof eventTypes;

// TypeScript will enforce all event types have handlers
const processor = EventProcessor<EventType>(
  createProcessorClient<EventType>({ querier: client }),
  {
    UserCreated: {
      /* handlers */
    },
    OrderPlaced: {
      /* handlers */
    },
    // Missing an event type? TypeScript error!
  },
);

What's the performance impact?

Database Impact:

  • One SELECT query per polling interval (default: every 5 seconds)
  • One SELECT + UPDATE per event processed
  • With proper indexes, queries are very fast (< 10ms typically)

Processing Latency:

  • Average latency: sleepTimeMs / 2 (default: 2.5 seconds)
  • Worst case: sleepTimeMs (default: 5 seconds)

Throughput:

  • Depends on handler speed and concurrency settings
  • Single processor: 10-100 events/second typical
  • Horizontally scalable: add more processors for higher throughput

Optimization:

  • Lower sleepTimeMs for lower latency (at cost of more queries)
  • Increase maxEventConcurrency for higher throughput
  • Run multiple processors for horizontal scaling

How does horizontal scaling work?

Run multiple processor instances (same code, different processes/machines):

# Machine 1
node processor.js

# Machine 2
node processor.js

# Machine 3
node processor.js

Each processor will:

  1. Query for unprocessed events
  2. Lock events using FOR UPDATE SKIP LOCKED
  3. Process locked events
  4. Release locks on commit/rollback

Key mechanism: FOR UPDATE SKIP LOCKED ensures each event is locked by only one processor. Other processors skip locked rows and process different events.

No coordination needed - processors don't need to know about each other. The database handles coordination.

Can I prioritize certain events?

Yes, modify the query in your custom client:

// Custom client with priority
const getEventsToProcess = async (opts) => {
  const events = await client.query(
    `SELECT id, errors FROM events
     WHERE processed_at IS NULL
     AND (backoff_until IS NULL OR backoff_until < NOW())
     AND errors < $1
     ORDER BY priority DESC, timestamp ASC  -- High priority first
     LIMIT 100`,
    [opts.maxErrors],
  );
  return events.rows;
};

Add a priority column to your events table.

When to Use txob

✅ Use txob when:

  • You need guaranteed event delivery (can't lose events)
  • You want to avoid distributed transactions (2PC, Saga)
  • You're already using PostgreSQL or MongoDB
  • You need at-least-once delivery semantics
  • You can make handlers idempotent
  • You're building reliable background processing
  • You want simple infrastructure (no separate message queue)
  • You need horizontal scalability without coordination

⚠️ Consider alternatives when:

  • You need exactly-once semantics (use Kafka with transactions)
  • You need real-time processing (< 1 second latency) - use message queue
  • You need high throughput (> 10k events/second) - use message queue
  • You already have message queue infrastructure you're happy with
  • You can't make handlers idempotent
  • You need complex routing or pub/sub patterns - use message broker

Comparison with alternatives:

Feature txob RabbitMQ Kafka AWS SQS
Infrastructure Database only Separate service Separate cluster Managed service
Consistency Strong (ACID) Eventual Eventual Eventual
Latency ~5s default ~10ms ~10ms ~1s
Throughput 10-100/s per processor 10k+/s 100k+/s 3k/s
Horizontal scaling ✅ Yes ✅ Yes ✅ Yes ✅ Yes
Exactly-once ❌ No ❌ No ✅ Yes ❌ No
Operational complexity Low Medium High Low
Cost DB storage Self-hosted Self-hosted Pay per request

Contributing

Contributions are welcome! To contribute:

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/my-feature
  3. Make your changes with tests
  4. Run tests: npm test
  5. Run linting: npm run format
  6. Commit your changes: git commit -m "Add my feature"
  7. Push to your fork: git push origin feature/my-feature
  8. Open a Pull Request

Guidelines:

  • Add tests for new features
  • Update documentation for API changes
  • Follow existing code style
  • Keep PRs focused on a single concern

Examples

See the examples directory for complete working examples:

  • PostgreSQL example - HTTP API with user creation and email sending
  • More examples coming soon!

Support & Community

  • 📖 Documentation: You're reading it!
  • 🐛 Bug Reports: GitHub Issues

Learn More

License

MIT © Dillon Streator

Acknowledgments

Implements the Transactional Outbox pattern microservices patterns.

About

Transactional outbox event processor with graceful shutdown and horizontal scalability

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •