Skip to content

┌─< lukebayliss.com >─┐

/snippets/async-queue

Async Task Queue

A concurrent task queue with rate limiting and priority support for managing async operations in TypeScript.

  • async
  • concurrency
  • utilities

A robust task queue for managing concurrent async operations with configurable concurrency limits, priority support, and error handling.

interface Task<T> {
  id: string;
  execute: () => Promise<T>;
  priority: number;
  retries: number;
}

interface QueueOptions {
  concurrency?: number;
  maxRetries?: number;
  retryDelayMs?: number;
}

class AsyncQueue {
  private queue: Task<any>[] = [];
  private running: number = 0;
  private concurrency: number;
  private maxRetries: number;
  private retryDelayMs: number;
  private paused: boolean = false;

  constructor(options: QueueOptions = {}) {
    this.concurrency = options.concurrency || 1;
    this.maxRetries = options.maxRetries || 0;
    this.retryDelayMs = options.retryDelayMs || 1000;
  }

  async add<T>(
    execute: () => Promise<T>,
    priority: number = 0
  ): Promise<T> {
    return new Promise((resolve, reject) => {
      const task: Task<T> & { resolve: (value: T) => void; reject: (error: any) => void } = {
        id: crypto.randomUUID(),
        execute,
        priority,
        retries: 0,
        resolve,
        reject,
      };

      // Insert task in priority order (higher priority first)
      const insertIndex = this.queue.findIndex(t => t.priority < priority);
      if (insertIndex === -1) {
        this.queue.push(task);
      } else {
        this.queue.splice(insertIndex, 0, task);
      }

      this.process();
    });
  }

  private async process(): Promise<void> {
    if (this.paused || this.running >= this.concurrency || this.queue.length === 0) {
      return;
    }

    this.running++;
    const task = this.queue.shift()!;

    try {
      const result = await task.execute();
      (task as any).resolve(result);
    } catch (error) {
      if (task.retries < this.maxRetries) {
        // Retry with exponential backoff
        task.retries++;
        const delay = this.retryDelayMs * Math.pow(2, task.retries - 1);

        console.log(`Task ${task.id} failed, retrying in ${delay}ms (attempt ${task.retries}/${this.maxRetries})`);

        await new Promise(resolve => setTimeout(resolve, delay));

        // Re-add to queue with same priority
        this.queue.unshift(task);
      } else {
        (task as any).reject(error);
      }
    } finally {
      this.running--;
      this.process(); // Process next task
    }
  }

  pause(): void {
    this.paused = true;
  }

  resume(): void {
    this.paused = false;
    this.process();
  }

  clear(): void {
    this.queue = [];
  }

  size(): number {
    return this.queue.length;
  }

  get isIdle(): boolean {
    return this.running === 0 && this.queue.length === 0;
  }

  async onIdle(): Promise<void> {
    if (this.isIdle) return;

    return new Promise((resolve) => {
      const checkIdle = setInterval(() => {
        if (this.isIdle) {
          clearInterval(checkIdle);
          resolve();
        }
      }, 100);
    });
  }
}

// Usage Examples:

// Basic queue with concurrency limit
const queue = new AsyncQueue({ concurrency: 3 });

// Add tasks to queue
for (let i = 0; i < 10; i++) {
  queue.add(async () => {
    console.log(`Starting task ${i}`);
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`Completed task ${i}`);
    return i;
  });
}

// With priority (higher priority runs first)
const urgentQueue = new AsyncQueue({ concurrency: 2 });

urgentQueue.add(async () => {
  console.log('Low priority task');
  return 'low';
}, 1);

urgentQueue.add(async () => {
  console.log('High priority task'); // Runs first
  return 'high';
}, 10);

// With retries
const retryQueue = new AsyncQueue({
  concurrency: 1,
  maxRetries: 3,
  retryDelayMs: 500
});

retryQueue.add(async () => {
  if (Math.random() > 0.5) {
    throw new Error('Random failure');
  }
  return 'Success!';
}).then(result => {
  console.log(result);
}).catch(error => {
  console.error('Task failed after retries:', error);
});

// Rate-limited API calls
const apiQueue = new AsyncQueue({ concurrency: 5 });

async function fetchUser(id: string) {
  return apiQueue.add(async () => {
    const response = await fetch(`/api/users/${id}`);
    return response.json();
  });
}

// Batch processing
async function processUserBatch(userIds: string[]) {
  const queue = new AsyncQueue({ concurrency: 10 });

  const results = await Promise.all(
    userIds.map(id => queue.add(() => fetchUser(id)))
  );

  await queue.onIdle();
  console.log('All users processed');

  return results;
}

// Pause and resume
const controllableQueue = new AsyncQueue({ concurrency: 3 });

for (let i = 0; i < 5; i++) {
  controllableQueue.add(async () => {
    await new Promise(r => setTimeout(r, 1000));
    console.log(`Task ${i}`);
  });
}

// Pause after 2 seconds
setTimeout(() => {
  console.log('Pausing queue');
  controllableQueue.pause();
}, 2000);

// Resume after 5 seconds
setTimeout(() => {
  console.log('Resuming queue');
  controllableQueue.resume();
}, 5000);

// File upload queue with progress tracking
class UploadQueue extends AsyncQueue {
  private progress = new Map<string, number>();

  async uploadFile(file: File, onProgress?: (percent: number) => void): Promise<string> {
    return this.add(async () => {
      const formData = new FormData();
      formData.append('file', file);

      const xhr = new XMLHttpRequest();

      return new Promise<string>((resolve, reject) => {
        xhr.upload.onprogress = (e) => {
          if (e.lengthComputable && onProgress) {
            const percent = (e.loaded / e.total) * 100;
            onProgress(percent);
          }
        };

        xhr.onload = () => {
          if (xhr.status === 200) {
            resolve(xhr.responseText);
          } else {
            reject(new Error(`Upload failed: ${xhr.status}`));
          }
        };

        xhr.onerror = () => reject(new Error('Upload failed'));

        xhr.open('POST', '/api/upload');
        xhr.send(formData);
      });
    }, 5); // High priority
  }
}

const uploadQueue = new UploadQueue({ concurrency: 2, maxRetries: 3 });

async function handleFilesUpload(files: File[]) {
  const uploads = files.map(file =>
    uploadQueue.uploadFile(file, (percent) => {
      console.log(`${file.name}: ${percent.toFixed(1)}% uploaded`);
    })
  );

  const results = await Promise.allSettled(uploads);

  results.forEach((result, i) => {
    if (result.status === 'fulfilled') {
      console.log(`${files[i].name} uploaded successfully`);
    } else {
      console.error(`${files[i].name} failed:`, result.reason);
    }
  });
}

Use Cases

  • API Rate Limiting: Prevent overwhelming external APIs
  • Batch Processing: Process large datasets with controlled concurrency
  • File Uploads: Manage multiple file uploads without flooding the server
  • Database Operations: Limit concurrent database queries
  • Web Scraping: Respect rate limits and avoid being blocked

Features

  • Configurable concurrency limits
  • Priority-based execution
  • Automatic retries with exponential backoff
  • Pause/resume functionality
  • Idle detection
  • TypeScript support with full type inference