Semaphore
A primitive used to control access to a common resource by multiple tasks.
In this example, 5 tasks are spawned but only 2 can run concurrently.
import { semaphore, sleep } from 'ciorent';
const logTime = (...args: any[]) => { console.log('[' + performance.now().toFixed(1) + 'ms]', ...args);};
// Creates a semaphore with 2 permits and internal queue size of 100, which// allows 2 tasks to run concurrently and 100 others to wait.const sem = semaphore.init(2, 100);
// Example taskconst runTask = async (id: number) => { // Acquire a permit or wait until a permit is available if (!await semaphore.acquire(sem)) { // Internal queue is full // Handle error somehow return; }
logTime(id, 'started'); await sleep(1000); logTime(id, 'done');
// Release the permit semaphore.release(sem);}
// Try to run 5 tasks concurrentlyfor (let i = 1; i <= 5; i++) runTask(i);With semaphore the output will be:
[19.2ms] 1 started[19.4ms] 2 started[1020.0ms] 1 done[1020.5ms] 3 started[1020.6ms] 2 done[1020.6ms] 4 started[2022.0ms] 3 done[2022.2ms] 5 started[2022.3ms] 4 done[3023.2ms] 5 doneWithout semaphore, all 5 tasks will start without waiting:
[18.2ms] 1 started[18.3ms] 2 started[18.3ms] 3 started[18.4ms] 4 started[18.4ms] 5 started[1019.2ms] 1 done[1019.4ms] 2 done[1019.6ms] 3 done[1019.6ms] 4 done[1019.6ms] 5 doneIt is recommended to wrap semaphore.release in a finally block to release the permit when an error is thrown.
const runTask = async (...args) => { if (!await semaphore.acquire(sem)) { // Internal queue is full // Handle error somehow return; }
try { return await task(...args); } finally { semaphore.release(sem); }}
const result = await runTask(...args);
// Or use semaphore.run// This throws if the semaphore internal queue is fullconst result = await semaphore.run(sem, task, ...args);ciorent also includes a mutex API, which is relatively similar to using semaphore with concurrency 1 but with a few differences:
mutexis faster as it doesn’t use a ring buffer under the hood to manage waiting promises.mutexdoesn’t limit how many waiting promises can be created.
Example usage:
import { mutex } from 'ciorent';
const mu = mutex.init();
const runTask = async (...args) => { const release = await mutex.acquire(mu);
try { return await task(...args); } finally { release(); }}const result = await runTask(...args);// Or mutex.runconst result = await mutex.run(mu, task, ...args);