pinojs/pino

Error: overwritten\n at writeSync

Opened this issue · 7 comments

Randomly facing an issue

Log:
Error: overwritten\n at writeSync (/workspace/development/app/node_modules/thread-stream/index.js:465:13)\n at ThreadStream.write (/workspace/development/app/node_modules/thread-stream/index.js:240:9)\n at Pino.write (/workspace/development/app/node_modules/pino/lib/proto.js:215:10)\n at Pino.LOG [as info] (/workspace/development/app/node_modules/pino/lib/tools.js:62:21)\n

Not really sure why Pino is throwing this error.

Has anyone faced similar issue?

Ouch, this is pretty bad. Can you give us a way to reproduce?

Not at the moment. These are production logs and coming randomly. Could they be because of a large size of log being flushed? We checked thread-stream/index.js:465 Saw it has got something to do with an (unexpectedly) negative offset within the buffer at the time of flushSync() call.
Our process was getting killed from the uncaught exception arising from this. As a workaround, for now we changed the default pino functions like error, info etc, by creating a wrapper function which adds a try catch block around calling the original pino functions. We did this for pino.{method} and pino.child.{method} both.

Could they be because of a large size of log being flushed?

It's unlikely on its own. We have battle tested this in the last few years, and this never got reported despite massive logs.


What's your complete setup?

setup


import Pino from 'pino';
import pinoDebug from 'pino-debug';
import config from 'config';

const configRedact = (config as any).redact || [];
let redactAttrs: Array<string> = [];
for (const redactAttr of configRedact) {
    if (redactAttr.match(/^\*\*/)) {
      const fieldName = redactAttr.replace(/^\*\*\./, '');
      redactAttrs.push(`${fieldName}`, `*.${fieldName}`, `*.*.${fieldName}`, `*.*.*.${fieldName}`, `*.*.*.*.${fieldName}`, `*.*.*.*.*.${fieldName}`,
        `*.*.*.*.*.*.${fieldName}`, `*.*.*.*.*.*.*.${fieldName}`, `*.*.*.*.*.*.*.*.${fieldName}`, `*.*.*.*.*.*.*.*.*.${fieldName}`
      );
  } else {
      redactAttrs.push(redactAttr);
    }
}

const logger: Pino.Logger = Pino({
  level: (config as any).log_level || 'debug',
  transport: {
    target: '../pino/pino-opentelemetry-transport',
    options:  { 
                destination: 1, 
                Resource: { 'service.name': process.env.OTEL_SERVICE_NAME || 'unknown_service:node',
                env: process.env.NODE_ENV } 
              }
  },
  redact: {
    paths: redactAttrs,
    censor: '*****'
  }
});

pinoDebug(logger, {
  auto: true, // default
  map: {
    'express:router': 'debug',
    '*': 'trace' // everything else - trace
  }
});

export { logger };

This is how we export logger and use it throughout in our service

eg:

logger.info('hello world')

What’s pino-opentelemetry-transport?

Here is the pino-open-telemetry code as asked by you . The process is running with NODE_ENV = prod
ccing @xCodeMuse

'use strict';

const build = require('pino-abstract-transport');
const { SonicBoom } = require('sonic-boom');
const { once } = require('events');

const DEFAULT_MESSAGE_KEY = 'msg';

const ZEROS_FROM_MILLI_TO_NANO = '0'.repeat(6);

/**
 * @typedef {Object} CommonBindings
 * @property {string=} msg
 * @property {number=} level
 * @property {number=} time
 * @property {string=} hostname
 * @property {number=} pid
 *
 * @typedef {Record<string, string | number | Object> & CommonBindings} Bindings
 *
 */

/**
 * Pino OpenTelemetry transport
 *
 * Maps Pino log entries to OpenTelemetry Data model
 *
 * @typedef {Object} Options
 * @property {string | number} destination
 * @property {string} [messageKey="msg"]
 *
 * @param {Options} opts
 */
module.exports = async function (opts) {
  const destination = new SonicBoom({ dest: opts.destination, sync: false });
  const mapperOptions = {
    messageKey: opts.messageKey || DEFAULT_MESSAGE_KEY
  };
  const resourceOptions = {
    ...opts.Resource
  };

  return build(async function (/** @type { AsyncIterable<Bindings> } */ source) {
    for await (const obj of source) {
      const line = toOpenTelemetry(obj, mapperOptions, resourceOptions);
      let updatedLine;

      if (process.env.NODE_ENV != 'dev') {
        updatedLine = JSON.stringify(line) + '\n';
      } else {
        let timestamp = parseInt(line.Timestamp.replace(ZEROS_FROM_MILLI_TO_NANO, ''));
        let date = new Date(timestamp);

        let dateString = new Intl.DateTimeFormat('en-IN', { dateStyle: 'short', timeStyle: 'medium',timeZone: 'Asia/Kolkata' }).format(date);
        updatedLine = `${dateString} [${line.SeverityText}] ${line.TraceId ?? ''} ${line.SpanId ?? ''} ${JSON.stringify(line.Attributes)} ${line.Body}\n`;
      }

      const writeResult = destination.write(updatedLine);
      const toDrain = !writeResult;
      // This block will handle backpressure
      if (toDrain) {
        await once(destination, 'drain');
      }
    }
  }, {
    async close () {
      destination.end();
      await once(destination, 'close');
    }
  });
};

const FATAL_SEVERITY_NUMBER = 21;
/**
 * If the source format has only a single severity that matches the meaning of the range
 * then it is recommended to assign that severity the smallest value of the range.
 * https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#mapping-of-severitynumber
 */
const SEVERITY_NUMBER_MAP = {
  10: 1,
  20: 5,
  30: 9,
  40: 13,
  50: 17,
  60: FATAL_SEVERITY_NUMBER
};

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity
const SEVERITY_NAME_MAP = {
  1: 'TRACE',
  2: 'TRACE2',
  3: 'TRACE3',
  4: 'TRACE4',
  5: 'DEBUG',
  6: 'DEBUG2',
  7: 'DEBUG3',
  8: 'DEBUG4',
  9: 'INFO',
  10: 'INFO2',
  11: 'INFO3',
  12: 'INFO4',
  13: 'WARN',
  14: 'WARN2',
  15: 'WARN3',
  16: 'WARN4',
  17: 'ERROR',
  18: 'ERROR2',
  19: 'ERROR3',
  20: 'ERROR4',
  21: 'FATAL',
  22: 'FATAL2',
  23: 'FATAL3',
  24: 'FATAL4'
};

/**
 * Converts a pino log object to an OpenTelemetry log object.
 *
 * @typedef {Object} OpenTelemetryLogData
 * @property {string=} SeverityText
 * @property {string=} SeverityNumber
 * @property {string} Timestamp
 * @property {string} Body
 * @property {{ 'host.hostname': string, 'process.pid': number }} Resource
 * @property {Record<string, any>} Attributes
 *
 * @typedef {Object} MapperOptions
 * @property {string} messageKey
 *
 * @param {Bindings} sourceObject
 * @param {MapperOptions} mapperOptions
 * @returns {OpenTelemetryLogData}
 */
 function toOpenTelemetry (sourceObject, { messageKey }, resourceOptions) {
  const { time, level, hostname, pid, trace_id, span_id, trace_flags, [messageKey]: msg, ...attributes } = sourceObject;

  const severityNumber = SEVERITY_NUMBER_MAP[sourceObject.level];
  const severityText = SEVERITY_NAME_MAP[severityNumber];

  return {
    Body: msg,
    Timestamp: time + ZEROS_FROM_MILLI_TO_NANO,
    SeverityNumber: severityNumber,
    SeverityText: severityText,
    TraceId: trace_id,
    SpanId: span_id,
    TraceFlags: trace_flags,
    Resource: {
      ...resourceOptions,
      'host.hostname': hostname,
      'process.pid': pid
    },
    Attributes: attributes
  };
}

Let us know if this happen again, and if you can reproduce reliably.