From 93fce809e27c24d29a062c0521400f7bda318886 Mon Sep 17 00:00:00 2001 From: Hammer Date: Fri, 30 Jan 2026 00:48:07 +0000 Subject: [PATCH] feat: pg-boss job queue, notifications, client interactions, bulk email --- src/db/schema.ts | 56 +++++++++++++ src/index.ts | 14 ++++ src/routes/activity.ts | 35 +++++++- src/routes/emails.ts | 144 +++++++++++++++++++++++++++++++- src/routes/interactions.ts | 141 +++++++++++++++++++++++++++++++ src/routes/notifications.ts | 85 +++++++++++++++++++ src/services/jobs.ts | 160 ++++++++++++++++++++++++++++++++++++ 7 files changed, 632 insertions(+), 3 deletions(-) create mode 100644 src/routes/interactions.ts create mode 100644 src/routes/notifications.ts create mode 100644 src/services/jobs.ts diff --git a/src/db/schema.ts b/src/db/schema.ts index f870e1a..5360503 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -155,10 +155,37 @@ export const communications = pgTable('communications', { aiModel: text('ai_model'), // Which model was used status: text('status').default('draft'), // 'draft' | 'approved' | 'sent' sentAt: timestamp('sent_at'), + batchId: text('batch_id'), // for grouping bulk sends createdAt: timestamp('created_at').defaultNow().notNull(), }); +// Notifications table +export const notifications = pgTable('notifications', { + id: uuid('id').primaryKey().defaultRandom(), + userId: text('user_id').references(() => users.id, { onDelete: 'cascade' }).notNull(), + type: text('type').notNull(), // 'event_reminder' | 'interaction' | 'system' + title: text('title').notNull(), + message: text('message').notNull(), + read: boolean('read').default(false), + clientId: uuid('client_id').references(() => clients.id, { onDelete: 'set null' }), + eventId: uuid('event_id').references(() => events.id, { onDelete: 'set null' }), + createdAt: timestamp('created_at').defaultNow().notNull(), +}); + +// Interactions table (touchpoint logging) +export const interactions = pgTable('interactions', { + id: uuid('id').primaryKey().defaultRandom(), + userId: text('user_id').references(() => users.id, { onDelete: 'cascade' }).notNull(), + clientId: uuid('client_id').references(() => clients.id, { onDelete: 'cascade' }).notNull(), + type: text('type').notNull(), // 'call' | 'meeting' | 'email' | 'note' | 'other' + title: text('title').notNull(), + description: text('description'), + duration: integer('duration'), // in minutes + contactedAt: timestamp('contacted_at').notNull(), + createdAt: timestamp('created_at').defaultNow().notNull(), +}); + // Client notes table export const clientNotes = pgTable('client_notes', { id: uuid('id').primaryKey().defaultRandom(), @@ -175,6 +202,8 @@ export const usersRelations = relations(users, ({ many }) => ({ clients: many(clients), events: many(events), communications: many(communications), + notifications: many(notifications), + interactions: many(interactions), sessions: many(sessions), accounts: many(accounts), })); @@ -187,6 +216,33 @@ export const clientsRelations = relations(clients, ({ one, many }) => ({ events: many(events), communications: many(communications), notes: many(clientNotes), + interactions: many(interactions), +})); + +export const notificationsRelations = relations(notifications, ({ one }) => ({ + user: one(users, { + fields: [notifications.userId], + references: [users.id], + }), + client: one(clients, { + fields: [notifications.clientId], + references: [clients.id], + }), + event: one(events, { + fields: [notifications.eventId], + references: [events.id], + }), +})); + +export const interactionsRelations = relations(interactions, ({ one }) => ({ + user: one(users, { + fields: [interactions.userId], + references: [users.id], + }), + client: one(clients, { + fields: [interactions.clientId], + references: [clients.id], + }), })); export const clientNotesRelations = relations(clientNotes, ({ one }) => ({ diff --git a/src/index.ts b/src/index.ts index 966a9f3..709922b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -14,10 +14,13 @@ import { activityRoutes } from './routes/activity'; import { insightsRoutes } from './routes/insights'; import { reportsRoutes } from './routes/reports'; import { notesRoutes } from './routes/notes'; +import { notificationRoutes } from './routes/notifications'; +import { interactionRoutes } from './routes/interactions'; import { db } from './db'; import { users } from './db/schema'; import { eq } from 'drizzle-orm'; import type { User } from './lib/auth'; +import { initJobQueue } from './services/jobs'; const app = new Elysia() // CORS @@ -72,6 +75,8 @@ const app = new Elysia() .use(insightsRoutes) .use(reportsRoutes) .use(notesRoutes) + .use(notificationRoutes) + .use(interactionRoutes) ) // Error handler @@ -111,6 +116,15 @@ const app = new Elysia() console.log(`🚀 Network App API running at ${app.server?.hostname}:${app.server?.port}`); +// Initialize pg-boss job queue +(async () => { + try { + await initJobQueue(); + } catch (e) { + console.error('pg-boss init failed (will retry on next restart):', e); + } +})(); + // Bootstrap: ensure donovan@donovankelly.xyz is admin (async () => { try { diff --git a/src/routes/activity.ts b/src/routes/activity.ts index 2821d3d..aa19ad5 100644 --- a/src/routes/activity.ts +++ b/src/routes/activity.ts @@ -1,12 +1,12 @@ import { Elysia, t } from 'elysia'; import { db } from '../db'; -import { clients, events, communications } from '../db/schema'; +import { clients, events, communications, interactions } from '../db/schema'; import { eq, and, desc } from 'drizzle-orm'; import type { User } from '../lib/auth'; export interface ActivityItem { id: string; - type: 'email_sent' | 'email_drafted' | 'event_created' | 'client_contacted' | 'client_created' | 'client_updated'; + type: 'email_sent' | 'email_drafted' | 'event_created' | 'client_contacted' | 'client_created' | 'client_updated' | 'interaction'; title: string; description?: string; date: string; @@ -110,6 +110,37 @@ export const activityRoutes = new Elysia({ prefix: '/clients' }) }); } + // Interactions + const clientInteractions = await db.select() + .from(interactions) + .where(and( + eq(interactions.clientId, params.id), + eq(interactions.userId, user.id), + )) + .orderBy(desc(interactions.contactedAt)); + + for (const interaction of clientInteractions) { + const typeLabels: Record = { + call: '📞 Phone Call', + meeting: '🤝 Meeting', + email: '✉️ Email', + note: '📝 Note', + other: '📌 Interaction', + }; + activities.push({ + id: `interaction-${interaction.id}`, + type: 'interaction', + title: `${typeLabels[interaction.type] || typeLabels.other}: ${interaction.title}`, + description: interaction.description || undefined, + date: interaction.contactedAt.toISOString(), + metadata: { + interactionId: interaction.id, + interactionType: interaction.type, + duration: interaction.duration, + }, + }); + } + // Sort by date descending activities.sort((a, b) => new Date(b.date).getTime() - new Date(a.date).getTime()); diff --git a/src/routes/emails.ts b/src/routes/emails.ts index 5311731..57757fc 100644 --- a/src/routes/emails.ts +++ b/src/routes/emails.ts @@ -1,10 +1,11 @@ import { Elysia, t } from 'elysia'; import { db } from '../db'; import { clients, communications, userProfiles } from '../db/schema'; -import { eq, and } from 'drizzle-orm'; +import { eq, and, inArray } from 'drizzle-orm'; import { generateEmail, generateSubject, generateBirthdayMessage, type AIProvider } from '../services/ai'; import { sendEmail } from '../services/email'; import type { User } from '../lib/auth'; +import { randomUUID } from 'crypto'; export const emailRoutes = new Elysia({ prefix: '/emails' }) // Generate email for a client @@ -294,6 +295,147 @@ export const emailRoutes = new Elysia({ prefix: '/emails' }) }), }) + // Bulk generate emails + .post('/bulk-generate', async ({ body, user }: { + body: { clientIds: string[]; purpose: string; provider?: AIProvider }; + user: User; + }) => { + const batchId = randomUUID(); + + // Get user profile + const [profile] = await db.select() + .from(userProfiles) + .where(eq(userProfiles.userId, user.id)) + .limit(1); + + const advisorInfo = { + name: user.name, + title: profile?.title || '', + company: profile?.company || '', + phone: profile?.phone || '', + signature: profile?.emailSignature || '', + }; + + // Get all selected clients + const selectedClients = await db.select() + .from(clients) + .where(and( + inArray(clients.id, body.clientIds), + eq(clients.userId, user.id), + )); + + if (selectedClients.length === 0) { + throw new Error('No valid clients found'); + } + + const results = []; + + for (const client of selectedClients) { + try { + const content = await generateEmail({ + advisorName: advisorInfo.name, + advisorTitle: advisorInfo.title, + advisorCompany: advisorInfo.company, + advisorPhone: advisorInfo.phone, + advisorSignature: advisorInfo.signature, + clientName: client.firstName, + interests: client.interests || [], + notes: client.notes || '', + purpose: body.purpose, + provider: body.provider, + }); + + const subject = await generateSubject(body.purpose, client.firstName, body.provider); + + const [comm] = await db.insert(communications) + .values({ + userId: user.id, + clientId: client.id, + type: 'email', + subject, + content, + aiGenerated: true, + aiModel: body.provider || 'anthropic', + status: 'draft', + batchId, + }) + .returning(); + + results.push({ clientId: client.id, email: comm, success: true }); + } catch (error: any) { + results.push({ clientId: client.id, error: error.message, success: false }); + } + } + + return { batchId, results, total: selectedClients.length, generated: results.filter(r => r.success).length }; + }, { + body: t.Object({ + clientIds: t.Array(t.String({ format: 'uuid' }), { minItems: 1 }), + purpose: t.String({ minLength: 1 }), + provider: t.Optional(t.Union([t.Literal('anthropic'), t.Literal('openai')])), + }), + }) + + // Bulk send all drafts in a batch + .post('/bulk-send', async ({ body, user }: { + body: { batchId: string }; + user: User; + }) => { + // Get all drafts in this batch + const drafts = await db.select({ + email: communications, + client: clients, + }) + .from(communications) + .innerJoin(clients, eq(communications.clientId, clients.id)) + .where(and( + eq(communications.batchId, body.batchId), + eq(communications.userId, user.id), + eq(communications.status, 'draft'), + )); + + const results = []; + + for (const { email, client } of drafts) { + if (!client.email) { + results.push({ id: email.id, success: false, error: 'Client has no email' }); + continue; + } + + try { + await sendEmail({ + to: client.email, + subject: email.subject || 'Message from your advisor', + content: email.content, + }); + + await db.update(communications) + .set({ status: 'sent', sentAt: new Date() }) + .where(eq(communications.id, email.id)); + + await db.update(clients) + .set({ lastContactedAt: new Date() }) + .where(eq(clients.id, client.id)); + + results.push({ id: email.id, success: true }); + } catch (error: any) { + results.push({ id: email.id, success: false, error: error.message }); + } + } + + return { + batchId: body.batchId, + total: drafts.length, + sent: results.filter(r => r.success).length, + failed: results.filter(r => !r.success).length, + results, + }; + }, { + body: t.Object({ + batchId: t.String({ minLength: 1 }), + }), + }) + // Delete draft .delete('/:id', async ({ params, user }: { params: { id: string }; user: User }) => { const [deleted] = await db.delete(communications) diff --git a/src/routes/interactions.ts b/src/routes/interactions.ts new file mode 100644 index 0000000..1ff9f13 --- /dev/null +++ b/src/routes/interactions.ts @@ -0,0 +1,141 @@ +import { Elysia, t } from 'elysia'; +import { db } from '../db'; +import { interactions, clients } from '../db/schema'; +import { eq, and, desc } from 'drizzle-orm'; +import type { User } from '../lib/auth'; + +export const interactionRoutes = new Elysia() + // List interactions for a client + .get('/clients/:clientId/interactions', async ({ params, user }: { params: { clientId: string }; user: User }) => { + // Verify client belongs to user + const [client] = await db.select() + .from(clients) + .where(and(eq(clients.id, params.clientId), eq(clients.userId, user.id))) + .limit(1); + + if (!client) throw new Error('Client not found'); + + const items = await db.select() + .from(interactions) + .where(and(eq(interactions.clientId, params.clientId), eq(interactions.userId, user.id))) + .orderBy(desc(interactions.contactedAt)); + + return items; + }, { + params: t.Object({ clientId: t.String({ format: 'uuid' }) }), + }) + + // Create interaction + .post('/clients/:clientId/interactions', async ({ params, body, user }: { + params: { clientId: string }; + body: { type: string; title: string; description?: string; duration?: number; contactedAt: string }; + user: User; + }) => { + // Verify client belongs to user + const [client] = await db.select() + .from(clients) + .where(and(eq(clients.id, params.clientId), eq(clients.userId, user.id))) + .limit(1); + + if (!client) throw new Error('Client not found'); + + const [interaction] = await db.insert(interactions) + .values({ + userId: user.id, + clientId: params.clientId, + type: body.type, + title: body.title, + description: body.description, + duration: body.duration, + contactedAt: new Date(body.contactedAt), + }) + .returning(); + + // Auto-update lastContactedAt on the client + const contactDate = new Date(body.contactedAt); + if (!client.lastContactedAt || contactDate > client.lastContactedAt) { + await db.update(clients) + .set({ lastContactedAt: contactDate, updatedAt: new Date() }) + .where(eq(clients.id, params.clientId)); + } + + return interaction; + }, { + params: t.Object({ clientId: t.String({ format: 'uuid' }) }), + body: t.Object({ + type: t.String({ minLength: 1 }), + title: t.String({ minLength: 1 }), + description: t.Optional(t.String()), + duration: t.Optional(t.Number({ minimum: 0 })), + contactedAt: t.String(), + }), + }) + + // Update interaction + .put('/interactions/:id', async ({ params, body, user }: { + params: { id: string }; + body: { type?: string; title?: string; description?: string; duration?: number; contactedAt?: string }; + user: User; + }) => { + const updateData: Record = {}; + if (body.type !== undefined) updateData.type = body.type; + if (body.title !== undefined) updateData.title = body.title; + if (body.description !== undefined) updateData.description = body.description; + if (body.duration !== undefined) updateData.duration = body.duration; + if (body.contactedAt !== undefined) updateData.contactedAt = new Date(body.contactedAt); + + const [updated] = await db.update(interactions) + .set(updateData) + .where(and(eq(interactions.id, params.id), eq(interactions.userId, user.id))) + .returning(); + + if (!updated) throw new Error('Interaction not found'); + return updated; + }, { + params: t.Object({ id: t.String({ format: 'uuid' }) }), + body: t.Object({ + type: t.Optional(t.String()), + title: t.Optional(t.String()), + description: t.Optional(t.String()), + duration: t.Optional(t.Number({ minimum: 0 })), + contactedAt: t.Optional(t.String()), + }), + }) + + // Delete interaction + .delete('/interactions/:id', async ({ params, user }: { params: { id: string }; user: User }) => { + const [deleted] = await db.delete(interactions) + .where(and(eq(interactions.id, params.id), eq(interactions.userId, user.id))) + .returning({ id: interactions.id }); + + if (!deleted) throw new Error('Interaction not found'); + return { success: true, id: deleted.id }; + }, { + params: t.Object({ id: t.String({ format: 'uuid' }) }), + }) + + // Get all recent interactions across all clients (for dashboard) + .get('/interactions/recent', async ({ query, user }: { query: { limit?: string }; user: User }) => { + const limit = query.limit ? parseInt(query.limit) : 10; + + const items = await db.select({ + interaction: interactions, + client: { + id: clients.id, + firstName: clients.firstName, + lastName: clients.lastName, + }, + }) + .from(interactions) + .innerJoin(clients, eq(interactions.clientId, clients.id)) + .where(eq(interactions.userId, user.id)) + .orderBy(desc(interactions.contactedAt)) + .limit(limit); + + return items.map(({ interaction, client }) => ({ + ...interaction, + client, + })); + }, { + query: t.Object({ limit: t.Optional(t.String()) }), + }); diff --git a/src/routes/notifications.ts b/src/routes/notifications.ts new file mode 100644 index 0000000..a2e0584 --- /dev/null +++ b/src/routes/notifications.ts @@ -0,0 +1,85 @@ +import { Elysia, t } from 'elysia'; +import { db } from '../db'; +import { notifications, clients } from '../db/schema'; +import { eq, and, desc, sql } from 'drizzle-orm'; +import type { User } from '../lib/auth'; + +export const notificationRoutes = new Elysia({ prefix: '/notifications' }) + // List notifications + .get('/', async ({ query, user }: { query: { limit?: string; unreadOnly?: string }; user: User }) => { + const limit = query.limit ? parseInt(query.limit) : 50; + const unreadOnly = query.unreadOnly === 'true'; + + let conditions = [eq(notifications.userId, user.id)]; + if (unreadOnly) { + conditions.push(eq(notifications.read, false)); + } + + const items = await db.select({ + notification: notifications, + client: { + id: clients.id, + firstName: clients.firstName, + lastName: clients.lastName, + }, + }) + .from(notifications) + .leftJoin(clients, eq(notifications.clientId, clients.id)) + .where(and(...conditions)) + .orderBy(desc(notifications.createdAt)) + .limit(limit); + + // Unread count + const [unreadResult] = await db.select({ + count: sql`count(*)::int`, + }) + .from(notifications) + .where(and(eq(notifications.userId, user.id), eq(notifications.read, false))); + + return { + notifications: items.map(({ notification, client }) => ({ + ...notification, + client: client?.id ? client : null, + })), + unreadCount: unreadResult?.count || 0, + }; + }, { + query: t.Object({ + limit: t.Optional(t.String()), + unreadOnly: t.Optional(t.String()), + }), + }) + + // Mark notification as read + .put('/:id/read', async ({ params, user }: { params: { id: string }; user: User }) => { + const [updated] = await db.update(notifications) + .set({ read: true }) + .where(and(eq(notifications.id, params.id), eq(notifications.userId, user.id))) + .returning(); + + if (!updated) throw new Error('Notification not found'); + return updated; + }, { + params: t.Object({ id: t.String({ format: 'uuid' }) }), + }) + + // Mark all as read + .post('/mark-all-read', async ({ user }: { user: User }) => { + await db.update(notifications) + .set({ read: true }) + .where(and(eq(notifications.userId, user.id), eq(notifications.read, false))); + + return { success: true }; + }) + + // Delete notification + .delete('/:id', async ({ params, user }: { params: { id: string }; user: User }) => { + const [deleted] = await db.delete(notifications) + .where(and(eq(notifications.id, params.id), eq(notifications.userId, user.id))) + .returning({ id: notifications.id }); + + if (!deleted) throw new Error('Notification not found'); + return { success: true }; + }, { + params: t.Object({ id: t.String({ format: 'uuid' }) }), + }); diff --git a/src/services/jobs.ts b/src/services/jobs.ts new file mode 100644 index 0000000..639d271 --- /dev/null +++ b/src/services/jobs.ts @@ -0,0 +1,160 @@ +import { PgBoss } from 'pg-boss'; +import { db } from '../db'; +import { events, notifications, clients, users } from '../db/schema'; +import { eq, and, gte, lte, sql } from 'drizzle-orm'; +import { sendEmail } from './email'; + +let boss: PgBoss | null = null; + +export async function initJobQueue(): Promise { + const connectionString = process.env.DATABASE_URL; + if (!connectionString) { + throw new Error('DATABASE_URL required for job queue'); + } + + boss = new PgBoss(connectionString); + + boss.on('error', (error) => { + console.error('[pg-boss] Error:', error); + }); + + await boss.start(); + console.log('✅ pg-boss job queue started'); + + // Register job handlers + await boss.work('check-upcoming-events', { teamConcurrency: 1 }, checkUpcomingEvents); + await boss.work('send-event-reminder', { teamConcurrency: 5 }, sendEventReminder); + + // Schedule daily check at 8am UTC + await boss.schedule('check-upcoming-events', '0 8 * * *', {}, { + tz: 'UTC', + }); + + console.log('✅ Job schedules registered'); + return boss; +} + +export function getJobQueue(): PgBoss | null { + return boss; +} + +// Job: Check upcoming events and create notifications +async function checkUpcomingEvents(job: PgBoss.Job) { + console.log(`[jobs] Running checkUpcomingEvents at ${new Date().toISOString()}`); + + try { + const now = new Date(); + + // Get all events with their reminder windows + const allEvents = await db.select({ + event: events, + client: clients, + }) + .from(events) + .innerJoin(clients, eq(events.clientId, clients.id)); + + let created = 0; + + for (const { event, client } of allEvents) { + const reminderDays = event.reminderDays || 7; + let eventDate = new Date(event.date); + + // For recurring events, adjust to this year + if (event.recurring) { + eventDate = new Date(now.getFullYear(), eventDate.getMonth(), eventDate.getDate()); + // If the date already passed this year, check next year + if (eventDate < now) { + eventDate = new Date(now.getFullYear() + 1, eventDate.getMonth(), eventDate.getDate()); + } + } + + const daysUntil = Math.ceil((eventDate.getTime() - now.getTime()) / (1000 * 60 * 60 * 24)); + + // Within reminder window and not already triggered recently + if (daysUntil >= 0 && daysUntil <= reminderDays) { + // Check if already notified (within last 24h for this event) + const dayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); + const existing = await db.select({ id: notifications.id }) + .from(notifications) + .where(and( + eq(notifications.eventId, event.id), + eq(notifications.userId, event.userId), + gte(notifications.createdAt, dayAgo), + )) + .limit(1); + + if (existing.length === 0) { + // Create notification + await db.insert(notifications).values({ + userId: event.userId, + type: 'event_reminder', + title: daysUntil === 0 ? `Today: ${event.title}` : `${event.title} in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`, + message: `${event.title} for ${client.firstName} ${client.lastName} is ${daysUntil === 0 ? 'today' : `coming up in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`}.`, + clientId: client.id, + eventId: event.id, + }); + created++; + + // Also queue email reminder + if (boss) { + await boss.send('send-event-reminder', { + userId: event.userId, + eventId: event.id, + clientId: client.id, + eventTitle: event.title, + clientName: `${client.firstName} ${client.lastName}`, + daysUntil, + }); + } + } + } + } + + console.log(`[jobs] checkUpcomingEvents: created ${created} notifications`); + } catch (error) { + console.error('[jobs] checkUpcomingEvents error:', error); + throw error; + } +} + +// Job: Send email reminder to advisor +async function sendEventReminder(job: PgBoss.Job<{ + userId: string; + eventId: string; + clientId: string; + eventTitle: string; + clientName: string; + daysUntil: number; +}>) { + const { userId, eventTitle, clientName, daysUntil } = job.data; + + try { + // Get user email + const [user] = await db.select({ email: users.email, name: users.name }) + .from(users) + .where(eq(users.id, userId)) + .limit(1); + + if (!user?.email) { + console.log(`[jobs] sendEventReminder: no email for user ${userId}`); + return; + } + + const subject = daysUntil === 0 + ? `Reminder: ${eventTitle} is today!` + : `Reminder: ${eventTitle} in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`; + + const content = `Hi ${user.name},\n\nThis is a reminder that "${eventTitle}" for ${clientName} is ${daysUntil === 0 ? 'today' : `coming up in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`}.\n\nLog in to your Network App to prepare.\n\nBest,\nThe Network App`; + + await sendEmail({ + to: user.email, + subject, + content, + }); + + console.log(`[jobs] Sent event reminder to ${user.email}: ${subject}`); + } catch (error) { + console.error(`[jobs] sendEventReminder error:`, error); + // Don't throw - email failures shouldn't retry aggressively + } +}