Skip to content
Advertisement

Javascript serial queue using async?

I have the following code:

var async  = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

var serialQueue = async.queue(function(task, callback) { 
    console.log(`done with tasks...`);
    callback(); 
}, 1);

async function fx(msg, waitTime) {
    console.log(`START FX: ${msg}. waitTime: ${waitTime}`);
    await timeout(waitTime);
    console.log(`DONE FX: ${msg}`);
}

async function test() {
    serialQueue.push(fx("1", 10000));
    serialQueue.push(fx("2", 2000));
    serialQueue.push(fx("3", 1000));
    await serialQueue.drain();
    console.log(`DRAINED!!!`);
}

test();

When I run the code above I get this:

START FX: 1. waitTime: 10000
START FX: 2. waitTime: 2000
START FX: 3. waitTime: 1000
DRAINED!!!
done with tasks...
done with tasks...
done with tasks...
all items have been processed
DONE FX: 3
DONE FX: 2
DONE FX: 1

To do that I referenced this link:

https://caolan.github.io/async/v3/docs.html#queue

Could someone help me understand why the “serial” nature of the queue is not respected here? I set concurrency to 1 when I created serialQueue but, it looks like the queue isn’t really serializing the execution of the three functions passed in.

How can I get this queue to execute the three functions in order, so have 2 wait for 1 and 3 wait for 2…? Also, why is drained called before all functions are actually done executing?

Advertisement

Answer

There are a few things wrong here.

First: serialQueue.push(fx(...)) invokes fk immediately, does not wait for it to complete (you have no await) and places the return value (a promise) to your queue. You need to pass something that actually captures your intent so that it can be invoked later. There are lots of ways of doing this. You could pass arrays, in the form [func, arg1, arg2, ...], or you could pass functions in the form async () => { await func(arg1, arg2, ...) }, or you could use currying to produce a new function with predefined arguments, etc. Regardless…

Second: You don’t do anything with task in your queue’s handler. It’s your job to do something, the queue just passes in the items you’ve given it. If you want the items to be functions, you have to invoke something, and await the promise it returns, if that’s what you want the queue to do for you. Right now, you’re ignoring task entirely.

Something like this:

var async  = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

var serialQueue = async.queue(async function(task, callback) {
    await task(); // Actually invoke the task
    console.log(`done with tasks...`);
    callback();
}, 1);

async function fx(msg, waitTime) {
    console.log(`START FX: ${msg}. waitTime: ${waitTime}`);
    await timeout(waitTime);
    console.log(`DONE FX: ${msg}`);
}

async function test() {
    // Don't invoke the function, instead pass a thing that can be invoked later, in the queue's handler function
    serialQueue.push(async () => { await fx("1", 10000) });
    serialQueue.push(async () => { await fx("2", 2000) });
    serialQueue.push(async () => { await fx("3", 1000) });
    await serialQueue.drain();
    console.log(`DRAINED!!!`);
}

test();

Output:

START FX: 1. waitTime: 10000
DONE FX: 1
done with tasks...
START FX: 2. waitTime: 2000
DONE FX: 2
done with tasks...
START FX: 3. waitTime: 1000
DONE FX: 3
done with tasks...
DRAINED!!!
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement