Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions packages/dispatcher/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,32 @@ initSentry();
import { join } from "node:path";
import { App, ExpressReceiver, LogLevel } from "@slack/bolt";
import { config as dotenvConfig } from "dotenv";
import { createLogger } from "@peerbot/shared";
import {
createLogger,
createDatabaseAdapter,
type DatabaseAdapter,
createMessageQueue,
type MessageQueue,
} from "@peerbot/shared";

const logger = createLogger("dispatcher");
import {
type AnthropicProxy,
createAnthropicProxy,
} from "./proxy/anthropic-proxy";
import { ThreadResponseConsumer } from "./queue/slack-thread-processor";
import { QueueProducer } from "./queue/task-queue-producer";
import { setupHealthEndpoints } from "./simple-http";
import { SlackEventHandlers } from "./slack/slack-event-handlers";
import type { DispatcherConfig } from "./types";
import { moduleRegistry } from "../../../modules";

export class SlackDispatcher {
private app: App;
private queueProducer: QueueProducer;
private messageQueue: MessageQueue;
private threadResponseConsumer?: ThreadResponseConsumer;
private anthropicProxy?: AnthropicProxy;
private config: DispatcherConfig;
private database: DatabaseAdapter;

constructor(config: DispatcherConfig) {
this.config = config;
Expand Down Expand Up @@ -90,9 +96,16 @@ export class SlackDispatcher {
logger.info("Initialized Slack app in Socket mode");
}

// Initialize queue producer - use DATABASE_URL for consistency
// Initialize shared database adapter and message queue
logger.info("Initializing queue mode");
this.queueProducer = new QueueProducer(config.queues.connectionString);
this.database = createDatabaseAdapter(config.queues.connectionString);
this.messageQueue = createMessageQueue({
provider: "postgresql",
connectionString: config.queues.connectionString,
retryLimit: config.queues.retryLimit,
retryDelay: config.queues.retryDelay,
expireInSeconds: config.queues.expireInSeconds,
});
// ThreadResponseConsumer will be created after event handlers are initialized

this.setupErrorHandling();
Expand Down Expand Up @@ -129,9 +142,9 @@ export class SlackDispatcher {
await moduleRegistry.initAll();
logger.info("✅ Modules initialized");

// Start queue producer
await this.queueProducer.start();
logger.info("✅ Queue producer started");
// Start message queue
await this.messageQueue.start();
logger.info("✅ Message queue started");

// Get bot's own user ID and bot ID dynamically before starting
await this.initializeBotInfo(this.config);
Expand Down Expand Up @@ -313,11 +326,13 @@ export class SlackDispatcher {
try {
await this.app.stop();

await this.queueProducer.stop();
await this.messageQueue.stop();
if (this.threadResponseConsumer) {
await this.threadResponseConsumer.stop();
}

await this.database.close();

logger.info("Slack dispatcher stopped");
} catch (error) {
logger.error("Error stopping Slack dispatcher:", error);
Expand Down Expand Up @@ -382,7 +397,12 @@ export class SlackDispatcher {

// Initialize queue-based event handlers
logger.info("Initializing queue-based event handlers");
new SlackEventHandlers(this.app, this.queueProducer, config);
new SlackEventHandlers(
this.app,
this.messageQueue,
config,
this.database
);

// Now create ThreadResponseConsumer
this.threadResponseConsumer = new ThreadResponseConsumer(
Expand Down
82 changes: 14 additions & 68 deletions packages/dispatcher/src/queue/task-queue-producer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#!/usr/bin/env bun

import * as Sentry from "@sentry/node";
import { Pool } from "pg";
import PgBoss from "pg-boss";
import { createLogger } from "@peerbot/shared";
import {
createLogger,
createDatabaseAdapter,
type DatabaseAdapter,
} from "@peerbot/shared";

const logger = createLogger("dispatcher");

Expand Down Expand Up @@ -55,36 +58,14 @@ export interface ThreadMessagePayload {

export class QueueProducer {
private pgBoss: PgBoss;
private pool?: Pool;
private database?: DatabaseAdapter;
private ownsDatabaseAdapter: boolean;
private isConnected = false;

constructor(
connectionString: string,
databaseConfig?: {
host: string;
port: number;
database: string;
username: string;
password: string;
ssl?: boolean;
}
) {
constructor(connectionString: string, databaseClient?: DatabaseAdapter) {
this.pgBoss = new PgBoss(connectionString);

// Create separate pool for RLS context management
if (databaseConfig) {
this.pool = new Pool({
host: databaseConfig.host,
port: databaseConfig.port,
database: databaseConfig.database,
user: databaseConfig.username,
password: databaseConfig.password,
ssl: databaseConfig.ssl,
max: 10,
min: 1,
idleTimeoutMillis: 30000,
});
}
this.database = databaseClient ?? createDatabaseAdapter(connectionString);
this.ownsDatabaseAdapter = !databaseClient;
}

/**
Expand Down Expand Up @@ -113,8 +94,8 @@ export class QueueProducer {
try {
this.isConnected = false;
await this.pgBoss.stop();
if (this.pool) {
await this.pool.end();
if (this.database && this.ownsDatabaseAdapter) {
await this.database.close();
}
logger.info("✅ Queue producer stopped");
} catch (error) {
Expand Down Expand Up @@ -168,38 +149,6 @@ export class QueueProducer {
}
}

/**
* Execute a query with user context for RLS
*/
async queryWithUserContext<T>(
userId: string,
query: string,
params?: any[]
): Promise<{ rows: T[]; rowCount: number }> {
if (!this.pool) {
throw new Error(
"Database pool not available - queue producer not configured with database config"
);
}

const client = await this.pool.connect();

try {
// Set user context for RLS policies using PostgreSQL session configuration
await client.query("SELECT set_config('app.current_user_id', $1, true)", [
userId,
]);

const result = await client.query(query, params);
return {
rows: result.rows,
rowCount: result.rowCount || 0,
};
} finally {
client.release();
}
}

/**
* Update job status using the database function
*/
Expand All @@ -208,18 +157,15 @@ export class QueueProducer {
status: "pending" | "active" | "completed" | "failed",
retryCount?: number
): Promise<void> {
if (!this.pool) {
if (!this.database) {
logger.warn(
`Cannot update job status for ${jobId} - database pool not available`
);
return;
}

try {
const query = "SELECT update_job_status($1, $2, $3)";
const params = [jobId, status, retryCount || null];

await this.pool.query(query, params);
await this.database.updateJobStatus(jobId, status, retryCount || null);
logger.debug(`Updated job ${jobId} status to: ${status}`);
} catch (error) {
logger.error(`Failed to update job status for ${jobId}:`, error);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bun

import { createLogger } from "@peerbot/shared";
import { createLogger, type DatabaseAdapter } from "@peerbot/shared";
import {
extractEnvVariables,
hasEnvVariables,
Expand All @@ -17,6 +17,7 @@ const logger = createLogger("dispatcher");
* Handle blockkit form submissions
*/
export async function handleBlockkitFormSubmission(
database: DatabaseAdapter,
userId: string,
view: any,
client: any,
Expand Down Expand Up @@ -50,6 +51,7 @@ export async function handleBlockkitFormSubmission(
if (envVars.length > 0) {
const repository = metadata.repository || null;
const result = await storeEnvVariables(
database,
userId,
envVars,
channelId,
Expand Down
9 changes: 4 additions & 5 deletions packages/dispatcher/src/slack/handlers/action-handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { createLogger } from "@peerbot/shared";
// import { getDbPool } from "@peerbot/shared"; // Currently unused
import { createLogger, type DatabaseAdapter } from "@peerbot/shared";

const logger = createLogger("dispatcher");
import type { QueueProducer } from "../../queue/task-queue-producer";
import type { SlackContext } from "../../types";
import type { MessageHandler } from "./message-handler";
// Dynamic module imports to avoid hardcoded dependencies
Expand All @@ -16,8 +14,8 @@ import {

export class ActionHandler {
constructor(
_queueProducer: QueueProducer,
private messageHandler: MessageHandler
private messageHandler: MessageHandler,
private database: DatabaseAdapter
) {}

/**
Expand Down Expand Up @@ -91,6 +89,7 @@ export class ActionHandler {

// Pass the fromHomeTab flag to ensure DM is sent when clicked from home
await handleTryDemo(
this.database,
userId,
channelId,
client,
Expand Down
43 changes: 15 additions & 28 deletions packages/dispatcher/src/slack/handlers/demo-handler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { createLogger } from "@peerbot/shared";
import { getDbPool } from "@peerbot/shared";
import { createLogger, type DatabaseAdapter } from "@peerbot/shared";

const logger = createLogger("dispatcher");

/**
* Handle Try Demo action - sets up demo repository for user
*/
export async function handleTryDemo(
database: DatabaseAdapter,
userId: string,
channelId: string,
client: any,
Expand All @@ -27,32 +27,19 @@ export async function handleTryDemo(
.replace(/\.git$/, "");
const [owner, repo] = repoPath.split("/");

// Store in user_environ for the demo
const dbPool = getDbPool(process.env.DATABASE_URL!);

// First ensure user exists
await dbPool.query(
`INSERT INTO users (platform, platform_user_id)
VALUES ('slack', $1)
ON CONFLICT (platform, platform_user_id) DO NOTHING`,
[userId.toUpperCase()]
);

// Get user ID
const userResult = await dbPool.query(
`SELECT id FROM users WHERE platform = 'slack' AND platform_user_id = $1`,
[userId.toUpperCase()]
);
const userDbId = userResult.rows[0].id;

// Set demo repository (just like selecting any other repository)
await dbPool.query(
`INSERT INTO user_environ (user_id, channel_id, repository, name, value, type)
VALUES ($1, $2, $3, 'GITHUB_REPOSITORY', $4, 'user')
ON CONFLICT (user_id, channel_id, repository, name)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()`,
[userDbId, null, null, demoRepo]
);
// Store repository selection for the demo user
await database.saveEnvironmentVariables({
platformUserId: userId,
variables: [
{
name: "GITHUB_REPOSITORY",
value: demoRepo,
repository: demoRepo,
type: "user",
encrypt: false,
},
],
});

// If from home tab, open a DM conversation first
let targetChannel = channelId;
Expand Down
Loading
Loading