Offload data exports to a worker

Background Export with Chunked MongoDB Aggregation + S3 Multipart Upload

Currently, report exports run the full user-defined aggregation synchronously, which blocks threads and can cause multi-minute queries that degrade the entire app. The fix involves two main pieces: moving exports off the main thread and breaking the query into manageable chunks.


Job Queue

When a user triggers an export, instead of running the aggregation immediately, enqueue a BullMQ job and return a job ID to the client. The client can poll for status or receive a notification when complete.

// Enqueue the export job
const job = await exportQueue.add('run-export', {
  reportId,
  userPipeline,
  s3UploadId: null,
  lastId: null,
  totalProcessed: 0,
});
res.json({ jobId: job.id });

Chunked Aggregation with Cursor-Based Pagination

Rather than running the full pipeline once, the worker runs it in batches. After any $group or $lookup stages, inject pagination stages. For the first batch:

const pipeline = [
  ...userPipeline,
  { $sort: { _id: 1 } },
  { $limit: BATCH_SIZE },
];

For each subsequent batch, use the last _id from the previous result to seek forward efficiently:

const pipeline = [
  ...userPipeline,
  { $match: { _id: { $gt: lastId } } },
  { $sort: { _id: 1 } },
  { $limit: BATCH_SIZE },
];

This avoids the performance penalty of $skip since MongoDB seeks directly using the _id index. When the batch returns fewer records than BATCH_SIZE, the export is complete.


S3 Multipart Upload

On the first batch, open a multipart upload session and store the upload ID in the job:

const { UploadId } = await s3.createMultipartUpload({
  Bucket: process.env.S3_BUCKET,
  Key: `exports/${reportId}.csv`,
}).promise();
await job.updateData({ ...job.data, s3UploadId: UploadId });

Each batch gets uploaded as a numbered part:

const part = await s3.uploadPart({
  Bucket: process.env.S3_BUCKET,
  Key: `exports/${reportId}.csv`,
  UploadId,
  PartNumber: partNumber,
  Body: batchToCsv(results),
}).promise();
parts.push({ PartNumber: partNumber, ETag: part.ETag });

When the final batch is processed, commit the multipart session:

await s3.completeMultipartUpload({
  Bucket: process.env.S3_BUCKET,
  Key: `exports/${reportId}.csv`,
  UploadId,
  MultipartUpload: { Parts: parts },
}).promise();
const url = await s3.getSignedUrlPromise('getObject', {
  Bucket: process.env.S3_BUCKET,
  Key: `exports/${reportId}.csv`,
  Expires: 3600,
});
// Notify user with presigned URL
await notifyUser(userId, url);

Error Handling / Resilience

If the worker crashes mid-export, the UploadId stored in the job payload allows the worker to resume from the last completed part or abort cleanly to avoid orphaned S3 uploads. Always set maxTimeMS on each batch query as a safety net:

await db.collection('inspections').aggregate(pipeline, {
  maxTimeMS: 30000,
}).toArray();

If a batch times out, fail the job with a clear error and abort the multipart upload:

await s3.abortMultipartUpload({
  Bucket: process.env.S3_BUCKET,
  Key: `exports/${reportId}.csv`,
  UploadId,
}).promise();

Transcript context

In the Monday kickoff, Chris noted that a Friday release had to be rolled back after a bad query hurt production. The team’s current understanding is that long-running Mongo queries are consuming enough memory to block threads and trigger cascading Next.js server action timeouts. The proposed direction from the discussion was to avoid waiting on long export queries in-process by dumping CSV exports to S3 and letting users download them asynchronously.

Please authenticate to join the conversation.

Upvoters
Status

Completed

Board
🏠

Main App

Date

About 1 month ago

Author

Linear

Subscribe to post

Get notified by email when there are changes.