BackendStack21/fast-gateway

onResponse Stream Transform

Opened this issue · 2 comments

Feature proposal

onResponse could support node transforms applied to the pipeline in order to allow reasonable performant, on-the-fly, transformations .

A developer can achieve such result in a reusable fashion with the following implementation:

index.js

import { parse, stringify } from 'JSONStream';

import onResponse from './lib/on-response';
import { API_HOST } from './config/env';
 
// [...]
routes: [
  {
      prefix: '/endpoint',
      target: API_HOST,
      hooks: {
        rewriteRequestHeaders(req, headers) {
          headers['accept-encoding'] = 'identity'; // Request clear (not gzipped) resources from upstream 
          return headers;
        },
        onResponse: (req, res, stream) => onResponse(req, res, stream, [
          parse('products', (product) => {
            product.id = product._id;
            return product;
          }),
          stringify(),
        ]),
      },
  },
],
// [...]

lib/on-response.js

import { pipeline } from 'stream';
import streamToArray from 'stream-to-array';

const TRANSFER_ENCODING_HEADER_NAME = 'transfer-encoding';
const CONTENT_LENGTH_HEADER_NAME = 'content-length';

/**
 * OnResponse
 * Hook executed on response received from target (http remote service)
 * @param {*} req http req object
 * @param {*} res http res object
 * @param {*} stream remote stream data
 * @param {array} transforms array of Transforms to execute on response pipeline
 */
export default async function onResponse(req, res, stream, transforms = []) {
  const chunked = stream.headers[TRANSFER_ENCODING_HEADER_NAME]
    ? stream.headers[TRANSFER_ENCODING_HEADER_NAME].endsWith('chunked')
    : false;

  if (req.headers.connection === 'close' && chunked) {
    try {
      // remove transfer-encoding header
      const transferEncoding = stream.headers[TRANSFER_ENCODING_HEADER_NAME].replace(/(,( )?)?chunked/, '');
      if (transferEncoding) {
        // header format includes many encodings, example: gzip, chunked
        res.setHeader(TRANSFER_ENCODING_HEADER_NAME, transferEncoding);
      } else {
        res.removeHeader(TRANSFER_ENCODING_HEADER_NAME);
      }

      if (!stream.headers[CONTENT_LENGTH_HEADER_NAME]) {
        // pack all pieces into 1 buffer to calculate content length
        const resBuffer = Buffer.concat(await streamToArray(stream));

        // add content-length header and send the merged response buffer
        res.setHeader(CONTENT_LENGTH_HEADER_NAME, '' + Buffer.byteLength(resBuffer));
        res.end(resBuffer);
      }
    } catch (err) {
      res.statusCode = 500;
      res.end(err.message);
    }
  } else {
    res.statusCode = stream.statusCode;
    res.removeHeader(CONTENT_LENGTH_HEADER_NAME);

    const pipelineActions = [
      stream,
      ...transforms,
      res,
      (err) => err && req.log.error(err),
    ];

    pipeline(
      ...pipelineActions,
    );
  }
}

Why not generalising this approach and make it a feature of fast-gateway?

ps I'm using the built-in pipeline here that can supersedes the 3d party pump

Hi @enricodeleo, please excuse my delay on getting back to this. It sounds like a great extension to our API. Would you like to push PR on this? I would be more than happy to merge it and give your the ownership on pushing this improvement. Please let know.

Many thanks in advance.

No worries @jkyberneees I understand that was not a priority :)
Still I think it's super useful when you need to manipulate data safely without polluting the application memory.

Unfortunately I haven't been working on the project for months now, I have no possibilities in the near future to get back on track with the repo I'll try as soon as I can