Job Queues on Tawa

The Short Version

Declare queues in catalog-info.yaml. iec-queue POSTs jobs to your endpoint. Return 2xx = complete. Non-2xx = retry.

spec:
  queues:
    - name: process-claim
      endpoint: /internal/jobs/process-claim
      concurrency: 5
      retries: 3
      retryDelayMs: 5000
      timeoutMs: 30000

  internalDependencies:
    - service: iec-queue    # → IEC_QUEUE_URL

Worker Endpoint

app.post('/internal/jobs/process-claim', async (req, res) => {
  const { jobId, queue, attempt, data } = req.body
  const { claimId } = data

  // Idempotency check — retries mean this may run multiple times
  if (await isAlreadyProcessed(claimId, jobId)) {
    return res.json({ success: true, skipped: true })
  }

  try {
    const result = await processClaim(claimId)

    septor.emit('claim.processed', {
      entityId: claimId,
      data: { jobId, attempt, result },
      metadata: { who: 'process-claim-worker' },
    }).catch((err) => logger.error({ err }, 'Septor emit failed'))

    res.json({ success: true })
  } catch (err) {
    logger.error({ jobId, claimId, attempt, err }, 'Claim processing failed')
    res.status(500).json({ success: false })  // triggers retry
  }
})

Enqueuing a Job

const response = await fetch(`${process.env.IEC_QUEUE_URL}/jobs`, {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    namespace: process.env.SERVICE_NAMESPACE || 'my-service',
    queue: 'process-claim',
    data: { claimId },
  }),
})

if (!response.ok) throw new Error(`Failed to enqueue: ${response.status}`)
const { id: jobId } = await response.json()

Fan-Out Pattern (cron → many jobs)

app.post('/internal/cron/nightly-sync', async (req, res) => {
  res.json({ success: true })  // Return 200 FIRST — don't make cron wait

  try {
    const records = await getPendingRecords()
    await Promise.allSettled(
      records.map((r) =>
        fetch(`${process.env.IEC_QUEUE_URL}/jobs`, {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({
            namespace: 'my-service',
            queue: 'sync-record',
            data: { recordId: r.id },
          }),
        }).catch((err) => logger.warn({ err, recordId: r.id }, 'Failed to enqueue'))
      )
    )
  } catch (err) {
    logger.error({ err }, 'Fan-out failed')
  }
})

When to Use Queues vs Direct HTTP

Use iec-queue when...Use direct HTTP when...
Work takes > 1 secondWork is fast (< 100ms)
You need retriesYou handle errors yourself
Processing many itemsYou need synchronous result
Work must survive restartsSimple request-response

Queue Fields

FieldRequiredDefaultDescription
nameYesQueue name, unique within namespace
endpointYesYour service endpoint path
concurrencyNo5Max simultaneous jobs
retriesNo3Retry attempts on failure
retryDelayMsNo5000Delay between retries
timeoutMsNo30000HTTP timeout for your handler

What NOT to Do

// ❌ WRONG: queue endpoints in routes (they must NOT be public)
spec:
  routes:
    - path: /internal/jobs/process-claim  // PUBLIC — exposed through Janus

// ✅ CORRECT: queue endpoints only in queues spec
spec:
  queues:
    - name: process-claim
      endpoint: /internal/jobs/process-claim  // INTERNAL — called only by iec-queue

// ❌ WRONG: not returning 500 on error (iec-queue won't retry)
} catch (err) {
  res.json({ success: false })  // 200 = complete, no retry!

// ✅ CORRECT
} catch (err) {
  res.status(500).json({ success: false })  // 500 = retry

// ❌ WRONG: no idempotency check (retries will re-process)
// ✅ CORRECT: check jobId or data.recordId before processing

Last updated: February 28, 2026