Skip to content Skip to sidebar Skip to footer

Why Does This Readline Async Iterator Not Work Properly?

This is part of a larger process that I've distilled down to the minimal, reproducible example in node v14.4.0. In this code, it outputs nothing from inside the for loop. I see on

Solution 1:

It turns out the underlying issue is that readline.createInterface() immediately, upon calling it will add a data event listener (code reference here) and resume the stream to start the stream flowing.

input.on('data', ondata);

and

input.resume();

Then, in the ondata listener, it parses the data for lines and when it finds a line, it fires a line events here.

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

But, in my examples, there were other asynchronous things happening between the time that readline.createInterface() was called and the async iterator was created (that would listen for the line events). So, line events were being emitted and nothing was yet listening for them.

So, to work properly readline.createInterface() REQUIRES that whatever is going to listen for the line events MUST be added synchronously after calling readline.createInterface() or there is a race condition and line events may get lost.


In my original code example, a reliable way to work-around it is to not call readline.createInterface() until after I've done the await once(...). Then, the asynchronous iterator will be created synchronously right after readline.createInterface() is called.

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


asyncfunctiontest(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here// since readline has bugs about not properly reporting file open errors// this await must be done before either call to readline.createInterface()// to avoid race conditions that can lead to lost lines of dataawaitPromise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        forawait (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

One way to fix this general issue would be to change readline.createInterface() so that it does not add the data event and resume the stream UNTIL somebody adds a line event listener. This would prevent data loss. It would allow the readline interface object to sit there quietly without losing data until the receiver of its output was actually ready. This would work for the async iterator and it would also prevent other uses of the interface that had other asynchronous code mixed in from possibly losing line events.

Note about this added to a related open readline bug issue here.

Solution 2:

The readline module could also be replaced with a simple Transform stream using the more modern stream API. The modern stream API supports async iterators out of the box as well as backpressure (e.g. the write side of the stream (file reading) will pause until the read side of the stream (line reading) is being consumed).

const fs = require('fs');
const { Transform } = require('stream');

functiontoLines() {
    let remaining = '';
    returnnewTransform({
        writableObjectMode: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            try {
                const lines = (remaining + chunk).split(/\r?\n/g);
                remaining = lines.pop();
                for (const line of lines) {
                    this.push(line);
                }
                callback();
            } catch (err) {
                callback(err);
            }
        },
        flush(callback) {
            if (remaining !== '') {
                this.push(remaining);
            }
            callback();
        }
    });
}


asyncfunctiontest(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
        const rl1 = stream1.pipe(toLines());

        const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
        const rl2 = stream2.pipe(toLines());

        console.log('before for() loop');
        forawait (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

This example doesn't support the crlfDelay option of the readline module, but the algorithm could be modified to do something similar. It also (as far as I can tell) has better error handling than is supported by the readline module.

Solution 3:

You can make this work as expected if you create the async iterator immediately after constructing the readline interface. If you wait to create the async iterator you may lose some lines as the line events are not buffered by the readline interface, but by virtue of the async iterator they will be buffered.

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

asyncfunctiontest(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        awaitonce(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const rl1Iterator = rl1[Symbol.asyncIterator]();

        const stream2 = fs.createReadStream(file2);
        awaitonce(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        forawait (const line1 of rl1Iterator) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("stream.txt", "stream.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

Based on discussion in the comments this still may not be a desirable solution as the readline module has various other issues, but I figured I would add an answer to resolve the problem as indicated in the original question.

Post a Comment for "Why Does This Readline Async Iterator Not Work Properly?"