79 lines
2.6 KiB
TypeScript
79 lines
2.6 KiB
TypeScript
// Long-lived worker process. Runs alongside Next.js (separate container in prod).
|
|
// Handles:
|
|
// - booking-reminder fired 24h before each CONFIRMED booking
|
|
// - expire-stale-holds recurring every minute; cancels timed-out HOLD bookings
|
|
|
|
import "dotenv/config";
|
|
import { PrismaPg } from "@prisma/adapter-pg";
|
|
import { PrismaClient } from "../src/generated/prisma/client";
|
|
import { jobs, QUEUES, type BookingReminderPayload, stopJobs } from "../src/lib/jobs";
|
|
import { handleReminderJob } from "../src/lib/reminders";
|
|
import { expireStaleHolds } from "../src/lib/booking";
|
|
|
|
const adapter = new PrismaPg({ connectionString: process.env.DATABASE_URL });
|
|
const db = new PrismaClient({ adapter });
|
|
|
|
async function main() {
|
|
const boss = jobs();
|
|
await boss.start();
|
|
console.log("[worker] pg-boss started");
|
|
|
|
// pg-boss 12+ requires queues to be created before send/schedule/work.
|
|
// createQueue is idempotent — safe to call on every boot.
|
|
await boss.createQueue(QUEUES.BOOKING_REMINDER);
|
|
await boss.createQueue(QUEUES.EXPIRE_STALE_HOLDS);
|
|
|
|
// Booking reminders.
|
|
await boss.work(
|
|
QUEUES.BOOKING_REMINDER,
|
|
async (incoming) => {
|
|
const items = Array.isArray(incoming) ? incoming : [incoming];
|
|
for (const job of items) {
|
|
const data = job.data as BookingReminderPayload;
|
|
try {
|
|
const result = await handleReminderJob(db, data);
|
|
console.log(
|
|
`[worker] booking-reminder bookingId=${data.bookingId} ${
|
|
result.sent ? "SENT" : `SKIP(${result.reason})`
|
|
}`,
|
|
);
|
|
} catch (err) {
|
|
console.error(
|
|
`[worker] booking-reminder bookingId=${data.bookingId} FAILED:`,
|
|
err,
|
|
);
|
|
throw err; // pg-boss will retry per the job's retryLimit
|
|
}
|
|
}
|
|
},
|
|
);
|
|
|
|
// Hold expiry — recurring every minute.
|
|
await boss.schedule(QUEUES.EXPIRE_STALE_HOLDS, "* * * * *");
|
|
await boss.work(QUEUES.EXPIRE_STALE_HOLDS, async () => {
|
|
const count = await expireStaleHolds(db);
|
|
if (count > 0) {
|
|
console.log(`[worker] expire-stale-holds expired ${count}`);
|
|
}
|
|
});
|
|
|
|
console.log("[worker] handlers registered, idling");
|
|
|
|
// Graceful shutdown.
|
|
const shutdown = async (signal: string) => {
|
|
console.log(`[worker] ${signal} received, shutting down`);
|
|
await stopJobs();
|
|
await db.$disconnect();
|
|
process.exit(0);
|
|
};
|
|
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
|
process.on("SIGINT", () => shutdown("SIGINT"));
|
|
}
|
|
|
|
main().catch(async (err) => {
|
|
console.error("[worker] fatal:", err);
|
|
await stopJobs();
|
|
await db.$disconnect();
|
|
process.exit(1);
|
|
});
|