fb55/htmlparser2

How to combine stream with async await?

Opened this issue · 1 comments

I'm looking for a way to combine WritableStream with async await. I saw some old issues in this repository related to this problem and didn't find any relevant solution.

What am I trying to do:

  1. Get file stream from S3
  2. Parse file with streams using htmlparser2
  3. During parsing, insert chunks of data to Kafka

Example of code is following:

const readable = getFileStreamFromS3()

let buffer = []

const parser = new WStream(
    {
        onopentag(name, attributes) {
            // ... collect data on open tag
        },

        ontext(text) {
            // collect text on text
        },

        async onclosetag(tagname) {
            buffer.push(data)
            if (buffer.length >= 100) {
                await pushToKafka(buffer)
                buffer = []
            }
        },
    },
    { xmlMode: true },
)

return new Promise(resolve => readable.pipe(writableStream).on('finish', resolve))

However, it doesn't seem to work. Any ideas on how to implement this feature?

To combine a writable stream with async/await, you need to handle the asynchronous operations within the write method of the writable stream. However, the current approach you've shown is attempting to use async/await inside an event handler, which won't work as expected. Instead, you should handle the asynchronous operation inside the write method itself. Here's how you can refactor your code to achieve this:

const { Writable } = require('stream');
const { getFileStreamFromS3, pushToKafka } = require('./your-utils');

const readable = getFileStreamFromS3();

const parser = new Writable({
    write(chunk, encoding, callback) {
        // Parse chunk using htmlparser2
        // For example, assuming parseChunk is a function that parses the chunk
        parseChunk(chunk).then(async (parsedData) => {
            // Insert parsed data into Kafka
            await pushToKafka(parsedData);
            callback(); // Call the callback to indicate that the chunk has been processed
        }).catch(err => {
            callback(err); // If an error occurs, pass it to the callback
        });
    }
});

// Pipe the readable stream to the parser
readable.pipe(parser);

// Handle finish event to resolve the promise when parsing is done
return new Promise((resolve, reject) => {
    parser.on('finish', resolve);
    parser.on('error', reject); // Handle any errors that occur during parsing
});

Is it works let me know?