Pull request · Acme

#312 — Move notification delivery onto a queue

9 files +418 / −190 branch notify-queuemain author @priya
Prompt Write up PR #312 for my reviewers. Explain the motivation, walk them through the change file by file with the why for each, show before/after behavior, and tell them exactly where to focus. They haven't touched the notification code in six months.
TL;DR

Notification sends were happening inline in the request path. Under load they added 200–800 ms to mutation latency and silently dropped emails when the SMTP pool was exhausted. This PR moves delivery onto the existing pg-boss queue so the API returns immediately and failed sends retry with backoff.

Why

When we added @mentions in comments last quarter, every mention started triggering up to three sends (in-app, email, Slack) inside the same transaction that saved the comment. That was fine at launch. It is not fine now that a single task update can fan out to forty watchers.

Before
  • Sends run inline in the mutation handler
  • SMTP timeout = 500 error for the comment
  • No retries — a dropped email is gone
  • p99 on comments.create: 1.4 s
After
  • Handler enqueues one job per recipient, returns
  • Worker retries 3× with exponential backoff
  • Dead-letter table for inspection after exhaustion
  • p99 on comments.create: 180 ms (staging)

File-by-file

Ordered for reading, not alphabetically. Start at the worker — it's the new thing — then the enqueue call site, then the plumbing.

packages/notify/src/worker.ts new +126

The heart of the PR. A pg-boss subscriber that pulls notify.deliver jobs, resolves the user's channel preferences, and calls the right adapter. Retries are configured per-channel — email gets three attempts, Slack gets one because its API is already idempotent on our side.

boss.work('notify.deliver', { batchSize: 20 }, async (jobs) => {
  for (const job of jobs) {
    const { userId, event, channel } = job.data;
    const prefs = await getPrefs(userId);
    if (!prefs[channel]) return;          // user muted this channel

    try {
      await adapters[channel].send(userId, event);
    } catch (err) {
      if (job.retryCount >= MAX_RETRY[channel]) {
        await deadLetter(job, err);         // don't throw — ack & park
        return;
      }
      throw err;                            // pg-boss reschedules
    }
  }
});
packages/api/src/routers/comments.ts mod +14 −62

Where the win shows up. The mutation used to call sendEmail, sendSlack, and createInApp directly. Now it inserts the comment, computes recipients, and enqueues. The try/catch soup is gone.

  const comment = await db.comments.insert(input);
  const recipients = await resolveWatchers(input.taskId, input.mentions);

  for (const r of recipients) {
    await sendEmail(r, comment);       // blocked the response
    await sendSlack(r, comment);
  }
  await boss.insert(recipients.flatMap((r) =>
    CHANNELS.map((ch) => ({
      name: 'notify.deliver',
      data: { userId: r.id, channel: ch, event: toEvent(comment) },
      singletonKey: `${comment.id}:${r.id}:${ch}`,  // idempotent
    }))));
  return comment;
packages/db/migrations/0051_dead_letter.sql new +22

Table for jobs that exhaust their retries. Deliberately not auto-pruned — we want to look at these weekly until we trust the new path. Has the full job payload and the last error string.

packages/notify/src/adapters/{email,slack,inapp}.ts mod +88 −74

Mostly moves. Each adapter now implements a shared Adapter interface and throws a typed RetryableError or PermanentError so the worker knows whether to retry. The email adapter also drops its internal retry loop — the queue owns retries now, double-retrying was how we got duplicate emails in April.

apps/worker/src/index.ts, infra/fly.toml mod +31 −4

Registers the new subscriber in the existing worker process and bumps its concurrency from 5 → 20. No new deploy unit.

packages/notify/src/__tests__/worker.test.ts new +137

Covers the retry boundary, the dead-letter path, channel muting, and the singleton key dedupe. Uses a real pg-boss against the test database — we got burned last quarter when mocked queue tests passed but prod ordering broke.

Where to focus your review

1
The retry / dead-letter boundary
worker.ts:31–44. I catch, check retryCount, and either park or rethrow. If this logic is wrong we either retry forever or drop messages — the two failure modes this PR exists to fix.
2
The singleton key
comments.ts:28. ${commentId}:${userId}:${channel} should make re-enqueues idempotent if the API handler retries. Sanity-check that this can't collide across tasks.
3
What I deliberately did not do
No per-user digest batching, no delivery receipts, no priority lanes. All of those layer on top of this cleanly; bundling them would make this unreviewable.

Test plan

Unit: retry → dead-letter path, channel mute, singleton dedupe
packages/notify — 14 cases, real pg-boss on test db
Integration: create comment with 3 watchers, assert 9 jobs enqueued and drained
Staging load: 500 rps on comments.create for 10 min, p99 = 180 ms
was 1.4 s before — dashboard linked in the PR description
Manual: kill SMTP mid-burst, confirm jobs land in dead-letter and nothing 500s
will do during the 10% ramp

Rollout

Behind notify_queue_v2. The old inline path stays in the codebase, dead but dormant, for one release in case we need to flip back.

Day 0
internal
Acme team only. Watch dead-letter table + worker error rate.
Day 2
10%
Random sample. Alert if dead-letter rate > 0.5% of sends.
Day 4
100%
Ramp fully, delete the inline path in a follow-up PR next week.