sfackler/rust-postgres

Example using FromSql with tokio_postgres query_raw stream?

rodya-mirov opened this issue ยท 15 comments

Hello, I'm currently a beginner with tokio_postgres so I may have missed something, but the documentation on what to do with the FromSql trait (as opposed to implementing it, which has a derive macro and is fine) is a little unclear.

I am reading a very large amount of data from a postgres database where there are a smallish number of columns, some of which are json blobs which contain a large number of fields I don't need. If I was receiving this as a postbody into a webserver or etc. I would just use serde, which would ignore those fields as I'm deserializing them, only keeping the ones I explicitly asked for, which would be great from a performance perspective.

The analogue here seems to be the FromSql trait, but I'm having trouble using it. Instead, the query_raw function (which seems to be the only way to stream results in) returns Row objects, which have already split the incoming data into Jsonb objects, which I then have to manually re-parse, clone from, and pay the drop fee.

This just doesn't feel right, since the FromSql trait is already "out there" and seems like I should be able to pass it in as a generic parameter, but I'm at a loss as to where to actually use it. I was not able to find any explicit mention of the trait in the documentation, except how to implement it (which is straightforward), and one note on one function indicating it could not be used there (but implied that it could be used other places, and this was the exception).

Am I missing something? Is there a missing feature, or am I holding it wrong?

For the record, this is how I'm reading data:

    let params: Vec<String> = vec![];
    let results_stream: RowStream = pg_conn.query_raw(READ_MCN_REPORTS, params.as_slice()).await?;

    pin_mut!(results_stream);

    let mut results: Vec<MyType> = Vec::new();

    while let Some(result) = results_stream.next().await {
        let row = result?;
        let report: MyType = manually_convert(row);
        results.push(report);

        // etc.
    }

Appreciate any assistance. Thank you!

You can deserialize directly through Serde with the Json type:

let value = row.get::<Json<MyType>>(0).0;

query_raw is not the best way to pull very large row counts though. You could use a portal which would let you early-exit the query more cleanly: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L771-L812. If you know you do want all results, a COPY query is another option: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs#L142-L169

The serde solution is causing some issues, I think due to double deserialization -- I've got a few fields which are strings (I have verified this in the database directly! It really is a string!), but occasionally are parseable as numbers, so the Json parser is helpfully turning them into floats, which then causes a process-killing panic (not a Result::Err for some reason) when the intended struct wants a float. I can convert back to a string with a custom serde deserializer, but if it has a decimal point, in some cases it's causing issues ("25.1" turning to "25.09999" or whatever).

Edit -- the aggressive string-to-float issue is a data issue, not a tokio_postgres issue, so ignore that for now. Some older data incorrectly codes it as a float, not a string, which is my problem to deal with.

I'll give the copy query a shot. Thanks.

use try_get instead of get and you'll get an Err back on parse errors.

I think that's just down to how Postgres normalizes JSON values in its backend.

You can deserialize directly through Serde with the Json type:

let value = row.get::<Json<MyType>>(0).0;

query_raw is not the best way to pull very large row counts though. You could use a portal which would let you early-exit the query more cleanly: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L771-L812. If you know you do want all results, a COPY query is another option: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs#L142-L169

Is the copy query useful simply because you can supply types? Or does it perform better than a rowstream? I don't actually want all rows, but I do want all rows that match the search query (which is a substantial number of individually-large rows).

COPY queries use a different mode of the communication protocol optimized for very large bulk transfers: https://www.postgresql.org/docs/current/sql-copy.html.

I'm not sure where the perf cutoff between normal queries and copy queries would be - you'd probably need to do some testing.

Appreciate it. I'll give it a shot. For the type arguments, is there a way to supply custom types? Or does it need to be a postgresql primitive?

The types are the Postgres types.

So there's no way to avoid the double deserialization then, is there? If it's going to deserialize these things into postgresql's Jsonb type, then pass that to serde, I've still got to clone and clean up all those strings?

That doesn't work if I've already deserialized the incoming cell data as a Jsonb object though, right? Or is Jsonb somehow delaying deserialization for this purpose?

What do you mean by that? There isn't a type called Jsonb in the repo as far as I'm aware of.

No, there's no "type", but there is a Type. I'm trying to follow the sample code in the test and I need to supply an array of Type instances. I don't see how to pass in my own custom type here; if I put in Type::JSONB then I think I'm doing eager json deserialization, which I then have to pay twice for, as opposed to deserializing directly from the incoming stream. My code attempt looks like this:

    let stream = pg_conn.copy_out(MY_QUERY_HERE).await?;

    // TODO: how to pass custom types here? Or is it even important to do that?
    let row_stream = BinaryCopyOutStream::new(stream, &[Type::VARCHAR, Type::JSONB]);

    pin_mut!(row_stream);

    while let Some(row) = row_stream.next().await {
        let row = row.context("Handle error here")?;
        // do processing here
    }

(aside: the test code is for some reason copying to stdin, but the postgres spec says to copy to stdout, I don't know how it's possible for that to be a typo, but I find it confusing)

if I put in Type::JSONB then I think I'm doing eager json deserialization

No, you aren't. You're telling the server the type of the value you want it to ship over the wire.

@sfackler Okay, great! It sounds like it's all working as intended. I appreciate the detailed attention you've given this issue as well as your sample code.

With love I say; is there a way this could have been a bit more discoverable? I was lost completely throughout the process; but clearly people are really using this crate (almost ten million downloads). I wonder if some of this information could find its way into the official documentation, or a curated examples folder, or etc.; I know these things take time.

Anyway for anyone that wanders into this issue later, here is what I think I've learned:

  • If your type implements serde's Deserialize and postgres_types's FromSql (both of which can be derived) then you can specify the return column type to be JSONB and it will deserialize through the usual fast rust machinery with no extra copies, right into your desired type
  • For larger result sets, a COPY query (rather then a simple SELECT) works better, including allowing you to specify your types; the sample code here shows us how to do this: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs#L142-L169
  • You can trigger an appropriate deserialization using something like let c: Json<MyType> = row.try_get(col_idx)? then grab the inner value with c.0.