180 lines
4.3 KiB
TypeScript
180 lines
4.3 KiB
TypeScript
/*
|
|
MIT License
|
|
|
|
Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this
|
|
software and associated documentation files (the "Software"), to deal in the Software
|
|
without restriction, including without limitation the rights to use, copy, modify,
|
|
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
|
|
permit persons to whom the Software is furnished to do so, subject to the following
|
|
conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all copies
|
|
or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
|
|
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
|
|
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
|
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
|
|
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
|
|
OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
*/
|
|
|
|
/*
|
|
How it works:
|
|
`this._head` is an instance of `Node` which keeps track of its current value and nests
|
|
another instance of `Node` that keeps the value that comes after it. When a value is
|
|
provided to `.enqueue()`, the code needs to iterate through `this._head`, going deeper
|
|
and deeper to find the last value. However, iterating through every single item is slow.
|
|
This problem is solved by saving a reference to the last value as `this._tail` so that
|
|
it can reference it to add a new value.
|
|
*/
|
|
|
|
class Node {
|
|
value;
|
|
next;
|
|
|
|
constructor(value) {
|
|
this.value = value;
|
|
}
|
|
}
|
|
|
|
class Queue {
|
|
private _head;
|
|
private _tail;
|
|
private _size;
|
|
|
|
constructor() {
|
|
this.clear();
|
|
}
|
|
|
|
enqueue(value) {
|
|
const node = new Node(value);
|
|
|
|
if (this._head) {
|
|
this._tail.next = node;
|
|
this._tail = node;
|
|
} else {
|
|
this._head = node;
|
|
this._tail = node;
|
|
}
|
|
|
|
this._size++;
|
|
}
|
|
|
|
dequeue() {
|
|
const current = this._head;
|
|
if (!current) {
|
|
return;
|
|
}
|
|
|
|
this._head = this._head.next;
|
|
this._size--;
|
|
return current.value;
|
|
}
|
|
|
|
clear() {
|
|
this._head = undefined;
|
|
this._tail = undefined;
|
|
this._size = 0;
|
|
}
|
|
|
|
get size() {
|
|
return this._size;
|
|
}
|
|
|
|
*[Symbol.iterator]() {
|
|
let current = this._head;
|
|
|
|
while (current) {
|
|
yield current.value;
|
|
current = current.next;
|
|
}
|
|
}
|
|
}
|
|
|
|
interface LimitFunction {
|
|
readonly activeCount: number;
|
|
readonly pendingCount: number;
|
|
clearQueue: () => void;
|
|
<Arguments extends unknown[], ReturnType>(
|
|
fn: (...args: Arguments) => PromiseLike<ReturnType> | ReturnType,
|
|
...args: Arguments
|
|
): Promise<ReturnType>;
|
|
}
|
|
|
|
export default function pLimit(concurrency: number): LimitFunction {
|
|
if (
|
|
!(
|
|
(Number.isInteger(concurrency) ||
|
|
concurrency === Number.POSITIVE_INFINITY) &&
|
|
concurrency > 0
|
|
)
|
|
) {
|
|
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
|
|
}
|
|
|
|
const queue = new Queue();
|
|
let activeCount = 0;
|
|
|
|
const next = () => {
|
|
activeCount--;
|
|
|
|
if (queue.size > 0) {
|
|
queue.dequeue()();
|
|
}
|
|
};
|
|
|
|
const run = async (fn, resolve, args) => {
|
|
activeCount++;
|
|
|
|
const result = (async () => fn(...args))();
|
|
|
|
resolve(result);
|
|
|
|
try {
|
|
await result;
|
|
} catch {}
|
|
|
|
next();
|
|
};
|
|
|
|
const enqueue = (fn, resolve, args) => {
|
|
queue.enqueue(run.bind(undefined, fn, resolve, args));
|
|
|
|
(async () => {
|
|
// This function needs to wait until the next microtask before comparing
|
|
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
|
|
// when the run function is dequeued and called. The comparison in the if-statement
|
|
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
|
|
await Promise.resolve();
|
|
|
|
if (activeCount < concurrency && queue.size > 0) {
|
|
queue.dequeue()();
|
|
}
|
|
})();
|
|
};
|
|
|
|
const generator = (fn, ...args) =>
|
|
new Promise((resolve) => {
|
|
enqueue(fn, resolve, args);
|
|
});
|
|
|
|
Object.defineProperties(generator, {
|
|
activeCount: {
|
|
get: () => activeCount,
|
|
},
|
|
pendingCount: {
|
|
get: () => queue.size,
|
|
},
|
|
clearQueue: {
|
|
value: () => {
|
|
queue.clear();
|
|
},
|
|
},
|
|
});
|
|
|
|
return generator as any;
|
|
}
|