// Long-lived worker process. Runs alongside Next.js (separate container in prod). // Handles: // - booking-reminder fired 24h before each CONFIRMED booking // - expire-stale-holds recurring every minute; cancels timed-out HOLD bookings import "dotenv/config"; import { PrismaPg } from "@prisma/adapter-pg"; import { PrismaClient } from "../src/generated/prisma/client"; import { jobs, QUEUES, type BookingReminderPayload, stopJobs } from "../src/lib/jobs"; import { handleReminderJob } from "../src/lib/reminders"; import { expireStaleHolds } from "../src/lib/booking"; const adapter = new PrismaPg({ connectionString: process.env.DATABASE_URL }); const db = new PrismaClient({ adapter }); async function main() { const boss = jobs(); await boss.start(); console.log("[worker] pg-boss started"); // pg-boss 12+ requires queues to be created before send/schedule/work. // createQueue is idempotent — safe to call on every boot. await boss.createQueue(QUEUES.BOOKING_REMINDER); await boss.createQueue(QUEUES.EXPIRE_STALE_HOLDS); // Booking reminders. await boss.work( QUEUES.BOOKING_REMINDER, async (incoming) => { const items = Array.isArray(incoming) ? incoming : [incoming]; for (const job of items) { const data = job.data as BookingReminderPayload; try { const result = await handleReminderJob(db, data); console.log( `[worker] booking-reminder bookingId=${data.bookingId} ${ result.sent ? "SENT" : `SKIP(${result.reason})` }`, ); } catch (err) { console.error( `[worker] booking-reminder bookingId=${data.bookingId} FAILED:`, err, ); throw err; // pg-boss will retry per the job's retryLimit } } }, ); // Hold expiry — recurring every minute. await boss.schedule(QUEUES.EXPIRE_STALE_HOLDS, "* * * * *"); await boss.work(QUEUES.EXPIRE_STALE_HOLDS, async () => { const count = await expireStaleHolds(db); if (count > 0) { console.log(`[worker] expire-stale-holds expired ${count}`); } }); console.log("[worker] handlers registered, idling"); // Graceful shutdown. const shutdown = async (signal: string) => { console.log(`[worker] ${signal} received, shutting down`); await stopJobs(); await db.$disconnect(); process.exit(0); }; process.on("SIGTERM", () => shutdown("SIGTERM")); process.on("SIGINT", () => shutdown("SIGINT")); } main().catch(async (err) => { console.error("[worker] fatal:", err); await stopJobs(); await db.$disconnect(); process.exit(1); });