/snippets/async-queue
Async Task Queue
A concurrent task queue with rate limiting and priority support for managing async operations in TypeScript.
- async
- concurrency
- utilities
A robust task queue for managing concurrent async operations with configurable concurrency limits, priority support, and error handling.
interface Task<T> {
id: string;
execute: () => Promise<T>;
priority: number;
retries: number;
}
interface QueueOptions {
concurrency?: number;
maxRetries?: number;
retryDelayMs?: number;
}
class AsyncQueue {
private queue: Task<any>[] = [];
private running: number = 0;
private concurrency: number;
private maxRetries: number;
private retryDelayMs: number;
private paused: boolean = false;
constructor(options: QueueOptions = {}) {
this.concurrency = options.concurrency || 1;
this.maxRetries = options.maxRetries || 0;
this.retryDelayMs = options.retryDelayMs || 1000;
}
async add<T>(
execute: () => Promise<T>,
priority: number = 0
): Promise<T> {
return new Promise((resolve, reject) => {
const task: Task<T> & { resolve: (value: T) => void; reject: (error: any) => void } = {
id: crypto.randomUUID(),
execute,
priority,
retries: 0,
resolve,
reject,
};
// Insert task in priority order (higher priority first)
const insertIndex = this.queue.findIndex(t => t.priority < priority);
if (insertIndex === -1) {
this.queue.push(task);
} else {
this.queue.splice(insertIndex, 0, task);
}
this.process();
});
}
private async process(): Promise<void> {
if (this.paused || this.running >= this.concurrency || this.queue.length === 0) {
return;
}
this.running++;
const task = this.queue.shift()!;
try {
const result = await task.execute();
(task as any).resolve(result);
} catch (error) {
if (task.retries < this.maxRetries) {
// Retry with exponential backoff
task.retries++;
const delay = this.retryDelayMs * Math.pow(2, task.retries - 1);
console.log(`Task ${task.id} failed, retrying in ${delay}ms (attempt ${task.retries}/${this.maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
// Re-add to queue with same priority
this.queue.unshift(task);
} else {
(task as any).reject(error);
}
} finally {
this.running--;
this.process(); // Process next task
}
}
pause(): void {
this.paused = true;
}
resume(): void {
this.paused = false;
this.process();
}
clear(): void {
this.queue = [];
}
size(): number {
return this.queue.length;
}
get isIdle(): boolean {
return this.running === 0 && this.queue.length === 0;
}
async onIdle(): Promise<void> {
if (this.isIdle) return;
return new Promise((resolve) => {
const checkIdle = setInterval(() => {
if (this.isIdle) {
clearInterval(checkIdle);
resolve();
}
}, 100);
});
}
}
// Usage Examples:
// Basic queue with concurrency limit
const queue = new AsyncQueue({ concurrency: 3 });
// Add tasks to queue
for (let i = 0; i < 10; i++) {
queue.add(async () => {
console.log(`Starting task ${i}`);
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`Completed task ${i}`);
return i;
});
}
// With priority (higher priority runs first)
const urgentQueue = new AsyncQueue({ concurrency: 2 });
urgentQueue.add(async () => {
console.log('Low priority task');
return 'low';
}, 1);
urgentQueue.add(async () => {
console.log('High priority task'); // Runs first
return 'high';
}, 10);
// With retries
const retryQueue = new AsyncQueue({
concurrency: 1,
maxRetries: 3,
retryDelayMs: 500
});
retryQueue.add(async () => {
if (Math.random() > 0.5) {
throw new Error('Random failure');
}
return 'Success!';
}).then(result => {
console.log(result);
}).catch(error => {
console.error('Task failed after retries:', error);
});
// Rate-limited API calls
const apiQueue = new AsyncQueue({ concurrency: 5 });
async function fetchUser(id: string) {
return apiQueue.add(async () => {
const response = await fetch(`/api/users/${id}`);
return response.json();
});
}
// Batch processing
async function processUserBatch(userIds: string[]) {
const queue = new AsyncQueue({ concurrency: 10 });
const results = await Promise.all(
userIds.map(id => queue.add(() => fetchUser(id)))
);
await queue.onIdle();
console.log('All users processed');
return results;
}
// Pause and resume
const controllableQueue = new AsyncQueue({ concurrency: 3 });
for (let i = 0; i < 5; i++) {
controllableQueue.add(async () => {
await new Promise(r => setTimeout(r, 1000));
console.log(`Task ${i}`);
});
}
// Pause after 2 seconds
setTimeout(() => {
console.log('Pausing queue');
controllableQueue.pause();
}, 2000);
// Resume after 5 seconds
setTimeout(() => {
console.log('Resuming queue');
controllableQueue.resume();
}, 5000);
// File upload queue with progress tracking
class UploadQueue extends AsyncQueue {
private progress = new Map<string, number>();
async uploadFile(file: File, onProgress?: (percent: number) => void): Promise<string> {
return this.add(async () => {
const formData = new FormData();
formData.append('file', file);
const xhr = new XMLHttpRequest();
return new Promise<string>((resolve, reject) => {
xhr.upload.onprogress = (e) => {
if (e.lengthComputable && onProgress) {
const percent = (e.loaded / e.total) * 100;
onProgress(percent);
}
};
xhr.onload = () => {
if (xhr.status === 200) {
resolve(xhr.responseText);
} else {
reject(new Error(`Upload failed: ${xhr.status}`));
}
};
xhr.onerror = () => reject(new Error('Upload failed'));
xhr.open('POST', '/api/upload');
xhr.send(formData);
});
}, 5); // High priority
}
}
const uploadQueue = new UploadQueue({ concurrency: 2, maxRetries: 3 });
async function handleFilesUpload(files: File[]) {
const uploads = files.map(file =>
uploadQueue.uploadFile(file, (percent) => {
console.log(`${file.name}: ${percent.toFixed(1)}% uploaded`);
})
);
const results = await Promise.allSettled(uploads);
results.forEach((result, i) => {
if (result.status === 'fulfilled') {
console.log(`${files[i].name} uploaded successfully`);
} else {
console.error(`${files[i].name} failed:`, result.reason);
}
});
}
Use Cases
- API Rate Limiting: Prevent overwhelming external APIs
- Batch Processing: Process large datasets with controlled concurrency
- File Uploads: Manage multiple file uploads without flooding the server
- Database Operations: Limit concurrent database queries
- Web Scraping: Respect rate limits and avoid being blocked
Features
- Configurable concurrency limits
- Priority-based execution
- Automatic retries with exponential backoff
- Pause/resume functionality
- Idle detection
- TypeScript support with full type inference