Files
touchbase/scripts/worker.ts

79 lines
2.6 KiB
TypeScript
Raw Normal View History

2026-05-03 09:43:10 -04:00
// Long-lived worker process. Runs alongside Next.js (separate container in prod).
// Handles:
// - booking-reminder fired 24h before each CONFIRMED booking
// - expire-stale-holds recurring every minute; cancels timed-out HOLD bookings
import "dotenv/config";
import { PrismaPg } from "@prisma/adapter-pg";
import { PrismaClient } from "../src/generated/prisma/client";
import { jobs, QUEUES, type BookingReminderPayload, stopJobs } from "../src/lib/jobs";
import { handleReminderJob } from "../src/lib/reminders";
import { expireStaleHolds } from "../src/lib/booking";
const adapter = new PrismaPg({ connectionString: process.env.DATABASE_URL });
const db = new PrismaClient({ adapter });
async function main() {
const boss = jobs();
await boss.start();
console.log("[worker] pg-boss started");
// pg-boss 12+ requires queues to be created before send/schedule/work.
// createQueue is idempotent — safe to call on every boot.
await boss.createQueue(QUEUES.BOOKING_REMINDER);
await boss.createQueue(QUEUES.EXPIRE_STALE_HOLDS);
// Booking reminders.
await boss.work(
QUEUES.BOOKING_REMINDER,
async (incoming) => {
const items = Array.isArray(incoming) ? incoming : [incoming];
for (const job of items) {
const data = job.data as BookingReminderPayload;
try {
const result = await handleReminderJob(db, data);
console.log(
`[worker] booking-reminder bookingId=${data.bookingId} ${
result.sent ? "SENT" : `SKIP(${result.reason})`
}`,
);
} catch (err) {
console.error(
`[worker] booking-reminder bookingId=${data.bookingId} FAILED:`,
err,
);
throw err; // pg-boss will retry per the job's retryLimit
}
}
},
);
// Hold expiry — recurring every minute.
await boss.schedule(QUEUES.EXPIRE_STALE_HOLDS, "* * * * *");
await boss.work(QUEUES.EXPIRE_STALE_HOLDS, async () => {
const count = await expireStaleHolds(db);
if (count > 0) {
console.log(`[worker] expire-stale-holds expired ${count}`);
}
});
console.log("[worker] handlers registered, idling");
// Graceful shutdown.
const shutdown = async (signal: string) => {
console.log(`[worker] ${signal} received, shutting down`);
await stopJobs();
await db.$disconnect();
process.exit(0);
};
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
}
main().catch(async (err) => {
console.error("[worker] fatal:", err);
await stopJobs();
await db.$disconnect();
process.exit(1);
});