SIGN IN SIGN UP
caolan / async UNCLAIMED

Async utilities for node and the browser

28184 0 0 JavaScript
var async = require('../lib');
2018-07-08 19:28:55 -07:00
var {expect} = require('chai');
describe('priorityQueue', () => {
2016-03-22 15:26:46 -07:00
it('priorityQueue', (done) => {
2016-03-22 15:26:46 -07:00
var call_order = [];
// order of completion: 2,1,4,3
var q = async.priorityQueue((task, callback) => {
2016-03-22 15:26:46 -07:00
call_order.push('process ' + task);
callback('error', 'arg');
}, 1);
q.push(1, 1.4, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(3);
2016-03-22 15:26:46 -07:00
call_order.push('callback ' + 1);
});
q.push(2, 0.2, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(4);
2016-03-22 15:26:46 -07:00
call_order.push('callback ' + 2);
});
q.push(3, 3.8, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
call_order.push('callback ' + 3);
});
q.push(['arr', 'arr'], 2.9, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
call_order.push('callback arr');
2016-03-22 15:26:46 -07:00
});
expect(q.length()).to.equal(5);
2016-03-22 15:26:46 -07:00
expect(q.concurrency).to.equal(1);
q.drain(() => {
2016-03-22 15:26:46 -07:00
expect(call_order).to.eql([
'process 2', 'callback 2',
'process 1', 'callback 1',
'process arr', 'callback arr',
'process arr', 'callback arr',
2016-03-22 15:26:46 -07:00
'process 3', 'callback 3'
]);
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
expect(q.idle()).to.be.equal(true);
done()
});
try {
q.push(5, 5, 'NOT_A_FUNCTION')
} catch(e) {
expect(e.message).to.equal('task callback must be a function')
}
2016-03-22 15:26:46 -07:00
});
it('concurrency', (done) => {
2016-03-22 15:26:46 -07:00
var call_order = [],
chore: Add Azure Pipelines for CI (Windows, Linux, Mac) (#1630), Fix async function serialization in Safari. * Add azure pipelines CI. * Publish test results. * Enable coveralls * Use Safari for OSX browser tests. * Adding missing lib files to karma (only Safari complained). * Rename with dot so it gets better placed in order. * Use ci instead of install, so we stick with package-lock.json * Use npm test instead of mocha directly. It just needed a double -- escape. * Move DISPLAY to Linux only, with value set just once. * Use variables for DISPLAY, browser tests and coveralls. * Simplify steps with a browser name variable. * Run junit reporter on CI only. * DISPLAY should be already an environment variable. * Use vmImage for consistency with other OS. * Don't use verbose npm@1 task. * No need to setTimeout, let's `done` on the callback. * Don't resume twice. Check queue length on callback for safety, setTimeout may be delayed. * Avoid time dependency, act on events instead. * Avoid dependency on timing to final assertion. * Rewrite test to avoid setTimeout * Fix formatting. * Make diff more significant (125+50 ~= 200) * Make diff more significant to avoid race conditions. * Wait a bit more so default 5 retries did happen. * Make delays shorter to avoid "Timeout of 250ms exceeded" * Shorten delay to avoid 200ms test timeout in browser tests. * Shorten timeout to mitigate 200ms test timeout. * Add more diff on delays to enforce test behavior. * Make delays more significant to avoid race conditions. * Stop using setTimeout and rely on events to push new items, so order is enforced. * Fix async function serialization in Safari. * Add more delay so 4 never completes before 3. * Ensure 3 never completes before 4. * Use drain instead of task count. Trying to ensure done is called from Windows browser tests that fail frequently. * Ensure 2 starts before 3 arrives.Avoid expected 'process 2 3' to equal 'process 2' * Add retries to browser tests. * Don't fail all on Mac & Windows browser tests. * Ensure 3 happens before 2, it fails sparely on OSX. AssertionError: expected [ 1, 2, 3 ] to deeply equal [ 1, 3, 2 ] * Avoid unnecessary multilines. * Use Edge for Windows browser tests.
2019-04-08 00:25:07 +02:00
delays = [80,20,180,20];
2016-03-22 15:26:46 -07:00
// worker1: --2-3
// worker2: -1---4
// order of completion: 1,2,3,4
var q = async.priorityQueue((task, callback) => {
setTimeout(() => {
2016-03-22 15:26:46 -07:00
call_order.push('process ' + task);
callback('error', 'arg');
chore: Add Azure Pipelines for CI (Windows, Linux, Mac) (#1630), Fix async function serialization in Safari. * Add azure pipelines CI. * Publish test results. * Enable coveralls * Use Safari for OSX browser tests. * Adding missing lib files to karma (only Safari complained). * Rename with dot so it gets better placed in order. * Use ci instead of install, so we stick with package-lock.json * Use npm test instead of mocha directly. It just needed a double -- escape. * Move DISPLAY to Linux only, with value set just once. * Use variables for DISPLAY, browser tests and coveralls. * Simplify steps with a browser name variable. * Run junit reporter on CI only. * DISPLAY should be already an environment variable. * Use vmImage for consistency with other OS. * Don't use verbose npm@1 task. * No need to setTimeout, let's `done` on the callback. * Don't resume twice. Check queue length on callback for safety, setTimeout may be delayed. * Avoid time dependency, act on events instead. * Avoid dependency on timing to final assertion. * Rewrite test to avoid setTimeout * Fix formatting. * Make diff more significant (125+50 ~= 200) * Make diff more significant to avoid race conditions. * Wait a bit more so default 5 retries did happen. * Make delays shorter to avoid "Timeout of 250ms exceeded" * Shorten delay to avoid 200ms test timeout in browser tests. * Shorten timeout to mitigate 200ms test timeout. * Add more diff on delays to enforce test behavior. * Make delays more significant to avoid race conditions. * Stop using setTimeout and rely on events to push new items, so order is enforced. * Fix async function serialization in Safari. * Add more delay so 4 never completes before 3. * Ensure 3 never completes before 4. * Use drain instead of task count. Trying to ensure done is called from Windows browser tests that fail frequently. * Ensure 2 starts before 3 arrives.Avoid expected 'process 2 3' to equal 'process 2' * Add retries to browser tests. * Don't fail all on Mac & Windows browser tests. * Ensure 3 happens before 2, it fails sparely on OSX. AssertionError: expected [ 1, 2, 3 ] to deeply equal [ 1, 3, 2 ] * Avoid unnecessary multilines. * Use Edge for Windows browser tests.
2019-04-08 00:25:07 +02:00
}, delays.shift());
2016-03-22 15:26:46 -07:00
}, 2);
q.push(1, 1.4, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(2);
expect(q.running()).to.equal(1);
2016-03-22 15:26:46 -07:00
call_order.push('callback ' + 1);
});
q.push(2, 0.2, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(1);
expect(q.running()).to.equal(1);
2016-03-22 15:26:46 -07:00
call_order.push('callback ' + 2);
});
q.push(3, 3.8, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
expect(q.running()).to.equal(1);
2016-03-22 15:26:46 -07:00
call_order.push('callback ' + 3);
});
q.push(4, 2.9, (err, arg) => {
2016-03-22 15:26:46 -07:00
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
expect(q.running()).to.equal(0);
2016-03-22 15:26:46 -07:00
call_order.push('callback ' + 4);
});
expect(q.length()).to.equal(4);
expect(q.running()).to.equal(0);
2016-03-22 15:26:46 -07:00
expect(q.concurrency).to.equal(2);
q.drain(() => {
2016-03-22 15:26:46 -07:00
expect(call_order).to.eql([
'process 1', 'callback 1',
'process 2', 'callback 2',
'process 3', 'callback 3',
'process 4', 'callback 4'
]);
expect(q.concurrency).to.equal(2);
expect(q.length()).to.equal(0);
expect(q.running()).to.equal(0);
2016-03-22 15:26:46 -07:00
done();
});
2016-03-22 15:26:46 -07:00
});
it('pushAsync', done => {
const calls = [];
const q = async.priorityQueue((task, cb) => {
if (task === 2) return cb(new Error('fail'));
cb();
})
q.pushAsync(1, 1, () => { throw new Error('should not be called') }).then(() => calls.push(1));
q.pushAsync(2, 0).catch(err => {
expect(err.message).to.equal('fail');
calls.push(2);
});
q.pushAsync([3, 4], 0).map(p => p.then(() => calls.push('arr')));
q.drain(() => setTimeout(() => {
expect(calls).to.eql([2, 'arr', 'arr', 1]);
done();
}));
});
it('pause in worker with concurrency', (done) => {
2016-07-12 00:24:50 -07:00
var call_order = [];
var q = async.priorityQueue((task, callback) => {
2016-07-12 00:24:50 -07:00
if (task.isLongRunning) {
q.pause();
setTimeout(() => {
2016-07-12 00:24:50 -07:00
call_order.push(task.id);
q.resume();
callback();
}, 50);
}
else {
call_order.push(task.id);
setTimeout(callback, 10);
}
}, 10);
q.push({ id: 1, isLongRunning: true});
q.push({ id: 2 });
q.push({ id: 3 });
q.push({ id: 4 });
q.push({ id: 5 });
q.drain(() => {
2016-07-12 00:24:50 -07:00
expect(call_order).to.eql([1, 2, 3, 4, 5]);
done();
});
2016-07-12 00:24:50 -07:00
});
it('kill', (done) => {
var q = async.priorityQueue((/*task, callback*/) => {
setTimeout(() => {
throw new Error("Function should never be called");
}, 20);
}, 1);
q.drain(() => {
throw new Error("Function should never be called");
});
q.push(0);
q.kill();
setTimeout(() => {
expect(q.length()).to.equal(0);
done();
}, 40);
});
context('q.workersList():', () => {
it('should be the same length as running()', (done) => {
var q = async.priorityQueue((task, cb) => {
async.setImmediate(() => {
expect(q.workersList().length).to.equal(q.running());
cb();
});
}, 2);
q.drain(() => {
expect(q.workersList().length).to.equal(0);
expect(q.running()).to.equal(0);
done();
});
q.push('foo', 2);
q.push('bar', 1);
q.push('baz', 0);
});
it('should contain the items being processed', (done) => {
var itemsBeingProcessed = {
'foo': [
{data: 'bar', priority: 1},
{data: 'foo', priority: 2}
],
'foo_cb': [
{data: 'foo', priority: 2}
],
'bar': [
{data: 'baz', priority: 0},
{data: 'bar', priority: 1}
],
'bar_cb': [
{data: 'bar', priority: 1},
{data: 'foo', priority: 2}
],
'baz': [
{data: 'baz', priority: 0}
],
'baz_cb': [
{data: 'baz', priority: 0},
{data: 'bar', priority: 1}
]
};
function getWorkersListData(q) {
return q.workersList().map(({data, priority}) => {
return {data, priority};
});
}
var q = async.priorityQueue((task, cb) => {
expect(
getWorkersListData(q)
).to.eql(itemsBeingProcessed[task]);
expect(q.workersList().length).to.equal(q.running());
async.setImmediate(() => {
expect(
getWorkersListData(q)
).to.eql(itemsBeingProcessed[task+'_cb']);
expect(q.workersList().length).to.equal(q.running());
cb();
});
}, 2);
q.drain(() => {
expect(q.workersList()).to.eql([]);
expect(q.workersList().length).to.equal(q.running());
done();
});
q.push('foo', 2);
q.push('bar', 1);
q.push('baz', 0);
});
});
context('q.saturated(): ', () => {
it('should call the saturated callback if tasks length is concurrency', (done) => {
2016-04-26 20:56:45 -07:00
var calls = [];
var q = async.priorityQueue((task, cb) => {
2016-04-26 20:56:45 -07:00
calls.push('process ' + task);
async.setImmediate(cb);
}, 4);
q.saturated(() => {
2016-04-26 20:56:45 -07:00
calls.push('saturated');
});
q.empty(() => {
2016-04-26 20:56:45 -07:00
expect(calls.indexOf('saturated')).to.be.above(-1);
setTimeout(() => {
2016-04-26 20:56:45 -07:00
expect(calls).eql([
'process foo4',
'process foo3',
'process foo2',
"saturated",
'process foo1',
'foo4 cb',
"saturated",
'process foo0',
'foo3 cb',
'foo2 cb',
'foo1 cb',
'foo0 cb'
]);
done();
}, 50);
});
q.push('foo0', 5, () => {calls.push('foo0 cb');});
q.push('foo1', 4, () => {calls.push('foo1 cb');});
q.push('foo2', 3, () => {calls.push('foo2 cb');});
q.push('foo3', 2, () => {calls.push('foo3 cb');});
q.push('foo4', 1, () => {calls.push('foo4 cb');});
2016-04-26 20:56:45 -07:00
});
});
2016-03-22 15:26:46 -07:00
context('q.unsaturated(): ',() => {
it('should have a default buffer property that equals 25% of the concurrenct rate', (done) => {
var calls = [];
var q = async.priorityQueue((task, cb) => {
// nop
calls.push('process ' + task);
async.setImmediate(cb);
}, 10);
expect(q.buffer).to.equal(2.5);
done();
});
it('should allow a user to change the buffer property', (done) => {
var calls = [];
var q = async.priorityQueue((task, cb) => {
// nop
calls.push('process ' + task);
async.setImmediate(cb);
}, 10);
q.buffer = 4;
expect(q.buffer).to.not.equal(2.5);
expect(q.buffer).to.equal(4);
done();
});
it('should call the unsaturated callback if tasks length is less than concurrency minus buffer', (done) => {
var calls = [];
var q = async.priorityQueue((task, cb) => {
calls.push('process ' + task);
setTimeout(cb, 10);
}, 4);
q.unsaturated(() => {
calls.push('unsaturated');
});
q.empty(() => {
expect(calls.indexOf('unsaturated')).to.be.above(-1);
setTimeout(() => {
expect(calls).eql([
'process foo4',
'process foo3',
'process foo2',
'process foo1',
'foo4 cb',
'unsaturated',
'process foo0',
'foo3 cb',
'unsaturated',
'foo2 cb',
'unsaturated',
'foo1 cb',
'unsaturated',
'foo0 cb',
'unsaturated'
]);
done();
}, 50);
});
q.push('foo0', 5, () => {calls.push('foo0 cb');});
q.push('foo1', 4, () => {calls.push('foo1 cb');});
q.push('foo2', 3, () => {calls.push('foo2 cb');});
q.push('foo3', 2, () => {calls.push('foo3 cb');});
q.push('foo4', 1, () => {calls.push('foo4 cb');});
});
});
it('should call the drain callback if receives an empty push', (done) => {
var call_order = [];
var q = async.priorityQueue((task, callback) => {
call_order.push(task);
callback('error', 'arg');
}, 1);
q.drain(() => {
call_order.push('drain')
expect(call_order).to.eql([
'drain'
]);
expect(q.length()).to.equal(0);
expect(q.running()).to.equal(0);
done();
});
q.push([], 1, () => { throw new Error('should not be called') });
});
it('should not call the drain callback if receives empty push and tasks are still pending', (done) => {
var call_order = [];
var q = async.priorityQueue((task, callback) => {
call_order.push('process ' + task);
callback('error', 'arg');
}, 1);
q.push(1, 1, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
call_order.push('callback ' + 1);
});
q.push(2, 1, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
call_order.push('callback ' + 2);
});
expect(q.length()).to.equal(2);
q.drain(() => {
expect(call_order).to.eql([
'process 1', 'callback 1',
'process 2', 'callback 2'
]);
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
expect(q.running()).to.equal(0);
done();
});
q.push([], 1, () => {});
});
it('should be iterable', (done) => {
var q = async.priorityQueue((data, cb) => {
if (data === 3) {
q.push(6)
expect([...q]).to.eql([4, 5, 6]);
}
async.setImmediate(cb);
});
q.push([1, 2, 3, 4, 5]);
expect([...q]).to.eql([1, 2, 3, 4, 5]);
q.drain(() => {
expect([...q]).to.eql([]);
done();
});
});
it('should error when calling unshift', () => {
var q = async.priorityQueue(() => {});
expect(() => {
q.unshift(1);
}).to.throw();
});
it('should error when calling unshiftAsync', () => {
var q = async.priorityQueue(() => {});
expect(() => {
q.unshiftAsync(1);
}).to.throw();
});
it('should error when the callback is called more than once', (done) => {
var q = async.priorityQueue((task, callback) => {
callback();
expect(() => {
callback();
}).to.throw();
done();
}, 2);
q.push(1);
});
});