brianc/node-pg-copy-streams

Something seems wrong with binary transfer for COPY ... FROM STDIN BINARY

falk-miracode opened this issue · 8 comments

Yes, I know that in general it is not a good idea to save large files to the database. This is about a historically grown thing, which I have to support for now.

I'm trying to transfer files to the Database.

If the files contain at least one byte bigger than \x7F (using more than 7 bits), I get an exeption with the message:

error - uncaughtException: error: row field count is < < random two bytes from the file interpreted as Int16BE > >, expected 6

The parser assumes that a new line was started, but it shouldn't, and takes the first two bytes after that new line as field count.

Dokumentation of the binary file format: https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4

Here is my code:

// helper functions

function toInt16BEBinary(value: number): Buffer {
    const buf = Buffer.alloc(2)
    buf.writeInt16BE(value)
    return buf
}

function toUInt32BEBinary(value: number): Buffer {
    const buf = Buffer.alloc(4)
    buf.writeUInt32BE(value)
    return buf
}

function toInt32BEBinary(value: number): Buffer {
    const buf = Buffer.alloc(4)
    buf.writeInt32BE(value)
    return buf
}

function toIntBinaryField(value: number): Buffer {
    const intBuf = toInt32BEBinary(value)
    const lenBuf = toInt32BEBinary(intBuf.length)
    const buf = Buffer.concat([lenBuf, intBuf])
    return buf
}

function toUtf8BinaryField(value: string): Buffer {
    const strBuf = Buffer.from(value, "utf-8")
    const strLenBuf = toInt32BEBinary(strBuf.length)
    const buf = Buffer.concat([strLenBuf, strBuf])
    return buf
}

// handling a multer-request; file was temporarily uploaded to the server file system
// then the file is streamed from the fs to the DB

const pool = new Pool()
const client = await pool.connect()

// Copy stream to DB
const dbInputStream = client
            .query(
                from(
                    'COPY "File" (id, name, "mimeType", size, state, "binaryContent") FROM STDIN BINARY',
                ),
            )

            const fileStream = fs.createReadStream(multerRequest.file!.path, {
                encoding: "binary",
            })

// Intermediate stream to hook in for logging of the final stream to be sent to DB
class IntermediateStream extends Transform {
    buffer: Buffer[]

    constructor(options?: TransformOptions) {
        super(options)
        this.buffer = []
    }

    _transform(
        chunk: any,
        encoding: BufferEncoding,
        callback: TransformCallback,
    ) {
        this.buffer.push(chunk)
        callback()
    }

    _flush(callback: TransformCallback) {
        this.buffer.forEach((data) => this.push(data))
        this.buffer = []
        callback()
    }
}

const intermediateStream = new IntermediateStream()

// PG binary format, header, meta data colums, size of the data column
intermediateStream.write(
    Buffer.concat([
        Buffer.from("PGCOPY\n", "binary"),
        Buffer.from("\xFF", "binary"),
        Buffer.from("\r\n\x00", "binary"),
        toUInt32BEBinary(0),
        toUInt32BEBinary(0),
        toInt16BEBinary(6), // number of columns
        toUtf8BinaryField(fileId), // id
        toUtf8BinaryField(req.body.name as string), // name
        toUtf8BinaryField(req.body.mimeType as string), //mimeType
        toIntBinaryField(length), // size
        toUtf8BinaryField(FileState.BINARY), //state
        toInt32BEBinary(length), //binaryContent size
    ]),
)

fileStream!.pipe(intermediateStream, { end: false })

let progress = 0

fileStream
    ?.on("data", (chunk) => { // progress logging
        progress +=
            chunk instanceof Buffer
                ? chunk.byteLength
                : chunk.length
        console.log(
            `Progress: ${progress}/${length} = ${Math.round(
                (100 * progress) / length,
            )}%`,
        )
    })
    ?.on("close", () => { // terminate PG binary data and end stream
        intermediateStream.write(toInt16BEBinary(-1))
        intermediateStream.end()
    })

// pipe into DB
intermediateStream.pipe(dbInputStream)

// log the resulting pg copy stream
let logBuffer = Buffer.alloc(0)
intermediateStream
    .on("data", (chunk) => {
        logBuffer = Buffer.concat([logBuffer, chunk])
    })
    .on("end", () => {
        console.log(jsesc(logBuffer.toString()))
    })

await new Promise((success, error) =>
    dbInputStream
        .on("close", () => {
            success("success")
        }),
)

// release client, end pool etc.

I tried with several files. Textfiles always passed. Binary files usually failed with this exeption:

error - uncaughtException: error: row field count is 32767, expected 6
    at Parser.parseErrorMessage (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/parser.js:287:98)
    at Parser.handlePacket (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/parser.js:126:29)
    at Parser.parse (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/parser.js:39:38)
    at TLSSocket.<anonymous> (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/index.js:11:42)
    at TLSSocket.emit (node:events:513:28)
    at addChunk (node:internal/streams/readable:315:12)
    at readableAddChunk (node:internal/streams/readable:289:9)
    at TLSSocket.Readable.push (node:internal/streams/readable:228:10)
    at TLSWrap.onStreamRead (node:internal/stream_base_commons:190:23) {
  length: 119,
  severity: 'ERROR',
  code: '22P04',
  detail: undefined,
  hint: undefined,
  position: undefined,
  internalPosition: undefined,
  internalQuery: undefined,
  where: 'COPY File, line 2',
  schema: undefined,
  table: undefined,
  column: undefined,
  dataType: undefined,
  constraint: undefined,
  file: 'copyfromparse.c',
  line: '941',
  routine: 'NextCopyFrom'
}
error - uncaughtException: error: row field count is 32767, expected 6
    at Parser.parseErrorMessage (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/parser.js:287:98)
    at Parser.handlePacket (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/parser.js:126:29)
    at Parser.parse (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/parser.js:39:38)
    at TLSSocket.<anonymous> (/workspaces/bhard-nextjs/node_modules/.pnpm/pg-protocol@1.6.0/node_modules/pg-protocol/dist/index.js:11:42)
    at TLSSocket.emit (node:events:513:28)
    at addChunk (node:internal/streams/readable:315:12)
    at readableAddChunk (node:internal/streams/readable:289:9)
    at TLSSocket.Readable.push (node:internal/streams/readable:228:10)
    at TLSWrap.onStreamRead (node:internal/stream_base_commons:190:23) {
  length: 119,
  severity: 'ERROR',
  code: '22P04',
  detail: undefined,
  hint: undefined,
  position: undefined,
  internalPosition: undefined,
  internalQuery: undefined,
  where: 'COPY File, line 2',
  schema: undefined,
  table: undefined,
  column: undefined,
  dataType: undefined,
  constraint: undefined,
  file: 'copyfromparse.c',
  line: '941',
  routine: 'NextCopyFrom'
}

I fiddled arround and found out.

This file will pass:

Content:

foo

(Plain ASCII text)

copy stream (escaped with jesc):

PGCOPY\n\uFFFD\r\n\0\0\0\0\0\0\0\0\0\0\x06\0\0\0$d9fcabfc-8433-4ee5-8fd7-af26011fb23c\0\0\0\tsmall.txt\0\0\0\ntext/plain\0\0\0\x04\0\0\0\x03\0\0\0\x06BINARY\0\0\0\x03foo\uFFFD\uFFFD

This file will fail:

Content:

\x80\x7F\x7F

(0x80===0b10000000===128 uses the most significant bit)

copy stream (escaped with jesc):

PGCOPY\n\uFFFD\r\n\0\0\0\0\0\0\0\0\0\0\x06\0\0\0$1f2436a5-5034-4743-841e-652fc1570ef7\0\0\0\rupload-16.bin\0\0\0\x18application/octet-stream\0\0\0\x04\0\0\0\x03\0\0\0\x06BINARY\0\0\0\x03\x80\x7F\x7F\uFFFD\uFFFD

This file will pass:

Content:

\x7F\x7F\x7F

(only 7 bits used per byte)

copy stream (escaped with jesc):

PGCOPY\n\uFFFD\r\n\0\0\0\0\0\0\0\0\0\0\x06\0\0\0$dbe60e9b-47cc-4645-b13d-d0faa781fb5c\0\0\0\rupload-16.bin\0\0\0\x18application/octet-stream\0\0\0\x04\0\0\0\x03\0\0\0\x06BINARY\0\0\0\x03\x7F\x7F\x7F\uFFFD\uFFFD

The resulting copy streams where correct in each case. It seems the data is not transfered in binary mode, like there is some weird 7-bit stuff is going on somewere in the chain.

Hello,
I am trying to apprehend your bug report.

first, I am a bit surprised because as far as I remember I already managed to fill bytea fields with binary data coming from files without a problem. So I suspect the issue is not in the library itself but I cannot be 100% sure so it is worth digging into the problem.

my first question will be why "\uFFFD\uFFFD" at the end of the data you send ?

As far as I understand, there is no tuple trailer in the protocol, but only a file trailer. So there should only be one instance of uint16 -1, \xFFFF

I am not familiar with jsesc escaping which may be an explanation. Could you try and use hexy (https://www.npmjs.com/package/hexy) instead of jesc ? It may be easier then to read the real binary data that is sent.

Another question: how is length calculated/obtained in toInt32BEBinary(length) ?

could you also show the result of od -x on the files multerRequest.file!.path to see how multer saved them ? I want to make sure that it is not multer that saves them as 7bit binary (multipart mime sometimes encodes binaries as 7bit-binaries)

Hello @jeromew!

Thanks for your quick reply.

tl;dr

At some point between reading the file from the fs and streaming the data into dbInputStream, the file data seems to get UTF-8 encoded. So, atm it does not look like an issue in the library to me.

Update

I had to remove the { encoding: "binary" } from the file read stream creation call and the binary data where not modified.

Your questions and suggestions

\uFFFD\uFFFD - U+FFFD is the replacement character: https://www.charset.org/utf-8/66 Ctrl+F "U+FFFD". It's an artifact of the debugging output, because I called logBuffer.toString(), which had unicode issues with the binary data and placed these characters. These characters where not in the stream sent to the DB. Sorry for the confusion!

Thank you for suggesting hexy. That's much better!

The length is determined by multer multerRequest.file.length

how multer is used

// NextJS API route using next-connect und multer express middleware
import { createRouter } from "next-connect"
import multer from "multer"

const upload = multer({
    storage: multer.diskStorage({}),
    limits: {
        fileSize: 1024 * 1024 * 1024,
    },
})

const router = createRouter<NextApiRequest, NextApiResponse>()

router //
    .use(upload.single("file") as any) //
    .post(async (req, res) => {
        const multerRequest = req as Express.Request
        const length = multerRequest.file!.size

        if (length > 0) {
            return uploadFileStreamCopy(req, length, res, multerRequest)
        }

        res.status(400).json({ error: `Empty or no file. Length: ${length}` })
    }

The outputs you asked for

I changed the line
console.log(jsesc(logBuffer.toString()))
to
console.log(hexy.hexy(logBuffer, { format: "twos" }))

The File with data \x80\x7f\x7f (which fails to parse):

00000000: 50 47 43 4f 50 59 0a ff 0d 0a 00 00 00 00 00 00    PGCOPY..........
00000010: 00 00 00 00 06 00 00 00 24 37 34 65 31 65 64 62    ........$74e1edb
00000020: 34 2d 39 35 61 32 2d 34 38 33 65 2d 38 37 35 39    4-95a2-483e-8759
00000030: 2d 62 34 31 65 31 66 64 62 64 36 64 64 00 00 00    -b41e1fdbd6dd...
00000040: 0d 75 70 6c 6f 61 64 2d 31 36 2e 62 69 6e 00 00    .upload-16.bin..
00000050: 00 18 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6f 63    ..application/oc
00000060: 74 65 74 2d 73 74 72 65 61 6d 00 00 00 04 00 00    tet-stream......
00000070: 00 03 00 00 00 06 42 49 4e 41 52 59 00 00 00 03    ......BINARY....
00000080: c2 80 7f 7f ff ff                                  ....

Remarkable:

  • Stream Data is correctly terminated by Int16BE(-1) (ffff)
  • Between the field length of the column binaryContent (0003) and the expected first byte of the content (80), another byte c2 was added.

Due to the extra byte (if this isn't a debugging artifact too), the pg copy parser assumes the current data set ends after the first 7f and therefore interpreted 7fff as 16-bit field count (32767) for the next data set.

Temp file safed by multer

$ od -x /tmp/fc3d8be58522f2626f5abfacde8006ac
0000000 7f80 007f
0000003
$ cat /tmp/fc3d8be58522f2626f5abfacde8006ac | xxd
00000000: 807f 7f                                  ...

Looks correct to me.

The File with data \x7f\x7f\x7f (which gets successfully copied to DB):

00000000: 50 47 43 4f 50 59 0a ff 0d 0a 00 00 00 00 00 00    PGCOPY..........
00000010: 00 00 00 00 06 00 00 00 24 39 30 35 32 30 30 34    ........$9052004
00000020: 62 2d 37 38 62 64 2d 34 66 34 32 2d 38 62 63 34    b-78bd-4f42-8bc4
00000030: 2d 64 30 31 35 38 34 30 35 39 61 64 36 00 00 00    -d01584059ad6...
00000040: 0d 75 70 6c 6f 61 64 2d 31 36 2e 62 69 6e 00 00    .upload-16.bin..
00000050: 00 18 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6f 63    ..application/oc
00000060: 74 65 74 2d 73 74 72 65 61 6d 00 00 00 04 00 00    tet-stream......
00000070: 00 03 00 00 00 06 42 49 4e 41 52 59 00 00 00 03    ......BINARY....
00000080: 7f 7f 7f ff ff                                     ..

Remarkable

  • No extra bytes added

I crafted another file consisting of 7F 80 7F 81. It got an extra C2 before 80 an 81. That looks like UTF-8 encoding to me. 🤔

So the error is due to the extra byte added before each byte > 0x7f, probably because of UTF-8 encoding somewere in the chain.

I'll go through my code. Maybe I have to specify binary encoding at some point.

Thank you very much so far.

@jeromew

Got it. I had to remove the { encoding: "binary" } from the file read stream creation call and the binary data where not modified. Probably I got that encoding-Option wrong.

Thank you very much for the support!

@falk-miracode , it makes sense. { encoding: binary } seems to be for something else and could totally generate the c2 you observe - https://nodejs.org/api/buffer.html#buffers-and-character-encodings

In case there is any interest for you, you could look at pg-copy-streams-binary - https://github.com/jeromew/node-pg-copy-streams-binary which is a library dedicated to binary handling of the COPY mechanism.

also note that when injecting a bytea into the database, there are 2 choices

  • binary, but you need to know the length of the binary beforehand because the protocol needs it
  • text, where you can inject a binary with an unknown length. For this you can look at https://github.com/bendrucker/postgres-bytea which shows how the escaping is done.

Thanks @jeromew!

I found out myself about the pg-copy-streams-binary lib right after I figured, how to implement the pg binary file format myself. It willl probably help to simplify my code.

These pg escape formats are interesting, didn't know that. I think I will stay with binary data transfer, because with the escape formats the volume of data to send will be 3 to 4 times larger for my understanding.

I think, thoes pg escape formats could explain, why the memory consumption exploded with the former prisma implementation of the upload feature, as we tried to upload bigger files. (Which is the reason why I switched to copy-streams in the first place.)

Have a nice day!

I would have said 2x bigger as 1 byte leads to 2 chars in hex format but I did not check. Sometimes the length is not available beforehand or cannot be trusted from the browser + there can be a need to stream the binary into the db without hitting the filesystem.

For the memory explosion it depends if the memory explosion is due to node.js or postgres. on the node.js side, if the conversion binToHex necessitated to fetch the whole file in memory (no streaming) this can be a problem. On the postgres side, the internals of postgres are not totally clear to me. One thing for sure is that when you do a select, each tuple has to be fetched by postgres in memory before beeing sent to the reader (either via a simple select or via a copy to) so for huge files it can be a huge penalty on the postgres server to read bytea out of the database because postgres needs to allocate memory to store the whole bytea. I don't know (did not check) if this constraint also exists during ingestion.

As you said in your preamble, storing files in postgres/bytea is not so easy when files are too big.

You are right, the bytea hex format is only 2x bigger, because the whole bytea hex representation is prefixed by \x. I thought in the first place every 4 hex digits needed to be escaped by \x; than it had been 3x bigger.

But the escaped bytea format using octal seems to be 4x bigger, because every byte needs to be escaped by \. Actually this is the worst case, bacause printable characters don't need to be escaped at least.

https://www.postgresql.org/docs/12/datatype-binary.html#id-1.5.7.12.9

I was thinking about implementing a multer storage engine by myself, which is afaik the only way to get a filestream from the request to stream directly into the DB. In that case, it would probably be better not to trust the browser with the filesize and not to rely on precomputed file size at all. I totally agree on that.

But I think, I will stay with the temporary file. Determining the size from the file system should be good enough then. (I'm currently pushing the limits of an MVP here, while a re-implementation of the whole thing is planed, which won't store file-content in the DB. So I think I should cut the efford.)

The memory explosion happened clearly on the node side. PostgreSQL runs on another VM. I can imagine that prisma uses the escape format. That would fit the observed memory consumption. Didn't investigate that further, because I realized, prisma is great for ORM, but not the right tool to push large amounts of data into the DB.