I have the following code:
var async = require('async'); const timeout = ms => new Promise(resolve => setTimeout(resolve, ms)); var serialQueue = async.queue(function(task, callback) { console.log(`done with tasks...`); callback(); }, 1); async function fx(msg, waitTime) { console.log(`START FX: ${msg}. waitTime: ${waitTime}`); await timeout(waitTime); console.log(`DONE FX: ${msg}`); } async function test() { serialQueue.push(fx("1", 10000)); serialQueue.push(fx("2", 2000)); serialQueue.push(fx("3", 1000)); await serialQueue.drain(); console.log(`DRAINED!!!`); } test();
When I run the code above I get this:
START FX: 1. waitTime: 10000 START FX: 2. waitTime: 2000 START FX: 3. waitTime: 1000 DRAINED!!! done with tasks... done with tasks... done with tasks... all items have been processed DONE FX: 3 DONE FX: 2 DONE FX: 1
To do that I referenced this link:
https://caolan.github.io/async/v3/docs.html#queue
Could someone help me understand why the “serial” nature of the queue is not respected here? I set concurrency to 1 when I created serialQueue
but, it looks like the queue isn’t really serializing the execution of the three functions passed in.
How can I get this queue to execute the three functions in order, so have 2 wait for 1 and 3 wait for 2…? Also, why is drained called before all functions are actually done executing?
Advertisement
Answer
There are a few things wrong here.
First: serialQueue.push(fx(...))
invokes fk
immediately, does not wait for it to complete (you have no await
) and places the return value (a promise) to your queue. You need to pass something that actually captures your intent so that it can be invoked later. There are lots of ways of doing this. You could pass arrays, in the form [func, arg1, arg2, ...]
, or you could pass functions in the form async () => { await func(arg1, arg2, ...) }
, or you could use currying to produce a new function with predefined arguments, etc. Regardless…
Second: You don’t do anything with task
in your queue’s handler. It’s your job to do something, the queue just passes in the items you’ve given it. If you want the items to be functions, you have to invoke something, and await
the promise it returns, if that’s what you want the queue to do for you. Right now, you’re ignoring task
entirely.
Something like this:
var async = require('async'); const timeout = ms => new Promise(resolve => setTimeout(resolve, ms)); var serialQueue = async.queue(async function(task, callback) { await task(); // Actually invoke the task console.log(`done with tasks...`); callback(); }, 1); async function fx(msg, waitTime) { console.log(`START FX: ${msg}. waitTime: ${waitTime}`); await timeout(waitTime); console.log(`DONE FX: ${msg}`); } async function test() { // Don't invoke the function, instead pass a thing that can be invoked later, in the queue's handler function serialQueue.push(async () => { await fx("1", 10000) }); serialQueue.push(async () => { await fx("2", 2000) }); serialQueue.push(async () => { await fx("3", 1000) }); await serialQueue.drain(); console.log(`DRAINED!!!`); } test();
Output:
START FX: 1. waitTime: 10000 DONE FX: 1 done with tasks... START FX: 2. waitTime: 2000 DONE FX: 2 done with tasks... START FX: 3. waitTime: 1000 DONE FX: 3 done with tasks... DRAINED!!!