seanmonstar/warp

Form error in 0.3.4 but not in 0.3.3

CypherNaught-0x opened this issue · 13 comments

Version
warp: 0.3.4
rustc: 1.68.2

Platform
Linux fedora 6.2.8-200.fsync.fc37.x86_64

Also on Docker -> Debian

Description
When running the latest version with the multipart update, I am getting an Error when making a request at an endpoint like such:

warp::path!("some_path")
        .and(warp::query())
        .and(warp::multipart::form().max_length(1_000_000))
        .and_then(never_reached);

this results in:

form error: Tried to poll data from the not last Part
request body already taken in previous filter

When I simply downgrade to 0.3.3 this bug disappears.

There's a proposal to move to multer, which seems to have much wider usage. Does the bug go away if you try with #1033?

@ChristophHess running the other PR on this code:

use futures_util::StreamExt;
use warp::Filter;
use warp::multipart::FormData;

#[tokio::main]
async fn main() {
    let route = warp::path!("some_path")
        .and(warp::multipart::form().max_length(1_000_000))
        .and_then(|mut b: FormData| async move {
            let r = b.next().await.unwrap().unwrap();
            Ok::<_, warp::Rejection>(format!("{:?}", r))
        });
    warp::serve(route).run(([127, 0, 0, 1], 3030)).await;

}

I get this:

Part { name: Some("file"), filename: ".gitignore", content_type: "application/octet-stream" }

Does that cover your usecase ?

azecm commented

I got the same error as in the first post
and to get the data i started like this

form.try_collect().await

and got an error

form error: Tried to poll data from the not last Part

this way of reading solved the issue

let r = form.next().await.unwrap().unwrap();

now I read the form like this

while let Some(Ok(mut part)) = form.next().await {}

Thank you

@azecm when running the same on multer's branch i see that it fails with failed to lock multipart state but, that is because one must not hold more than one instance of Field.
The fix for that, would be running a map before try_collect to transform that Field into something useful for you.

Instead of:

use futures_util::{StreamExt, TryStreamExt};
use warp::Filter;
use warp::multipart::FormData;

#[tokio::main]
async fn main() {
    let route = warp::multipart::form()
        .and_then(|mut b: FormData| async move {
            let field_names: Vec<_> = b.try_collect().await.unwrap();
            Ok::<_, warp::Rejection>(format!("{:?}", field_names))
        });
    warp::serve(route).run(([0,0,0,0], 3030)).await;
}

one would do

use futures_util::{StreamExt, TryStreamExt};
use warp::Filter;
use warp::multipart::FormData;

#[tokio::main]
async fn main() {
    let route = warp::multipart::form()
        .and_then(|mut b: FormData| async move {
            let field_names: Vec<_> = b.and_then(|field| async move {
                Ok(field.name().to_string())
            }).try_collect().await.unwrap();
            Ok::<_, warp::Rejection>(format!("{:?}", field_names))
        });
    warp::serve(route).run(([0,0,0,0], 3030)).await;
}

But, only if that gets merged into warp.
@seanmonstar would that be something needed to be documented or is it a no go to introduce mutler into warp?

azecm commented

text fields i was able to read
but there is no image data yet
i did this before

part.data().await

now I get only the initial part

azecm commented

figured out

it's even better now
you can immediately write data to the file

great for large files

while let Some(Ok(e)) = part.data().await { stream.write(e.chunk()).await.ok(); }

@seanmonstar @jaysonsantos Terribly sorry for the delayed reply.

I just tested the "use-multer-for-multipart" branch with my build system and it worked perfectly.
I am happy to do more testing if you need me to. Otherwise, I'll wait for #1033 to be merged.

Thank you both very much.

So the and_then requires the Result type to be a warp Error... how do we handle our own errors inside the and_then()

UPDATE:

I ended up wrapping the result with Ok() and nesting a result inside the returned Vec, then iterating through them later after the try_collect()

let results: Vec<Result<(), anyhow::Error>> = form.and_then(|part| async move {
    Ok(process_form_parts(part).await)
})
    .try_collect()
    .await
    .map_err(|e| {
        error!("failed to collect multipart form data: {}", e);
        warp::reject::reject()
    })?;

for result in results {
    if let Err(e) = result {
        return Ok(Box::new(warp::reply::with_status(e.to_string(), StatusCode::BAD_REQUEST)))
    }
}

Ok(Box::new(warp::reply::with_status("files uploaded".to_string(), StatusCode::OK)))

I'm in the same situation and am stuck, if someone could assist with some guidance that would be much appreciated.

I was running warp 0.3.3 and everything was working. I have updated to warp 0.3.5 and now get the error failed to lock multipart state when my code calls try_collect().

This was the original code:

let result: Result<Vec<Part>, ()> = data.try_collect().await.map_err(|e| {
    eprintln!("form error: {}", e);
});

I have tried changing to the following code but I am still getting the failed to lock multipart state error:

let result: Result<Vec<Part>, ()> = data
    .and_then(|part| async move {
        println!("part: {:?}"part);
        Ok(part)
    })
    .try_collect()
    .await
    .map_err(|e| {
        eprintln!("form error: {}", e);
});

@nocker01 that is because multer does not allow multiple instances of multipart data to exist as it tries to avoid copying data over, as you want to do something with the data, it is better if you get the info you need from that and then collect (by discarding the original part), the example here shows that #1034 (comment)

Thank you @jaysonsantos , I did try your suggestion from above like this:

let result: Result<Vec<Part>, ()> = data
            .and_then(|part| async move {
                println!("part: {:?}", part);
                Ok(part.name().to_string())
            })
            .try_collect()
            .await
            .map_err(|e| {
                eprintln!("form error: {}", e);
            });

but that does not compile, I get the following compilation error:

error[E0277]: the trait bound `Vec<warp::multipart::Part>: Extend<std::string::String>` is not satisfied
   --> main\..\warp\example.rs:216:27
    |
216 |               .try_collect()
    |  ___________________________^
217 | |             .await
    | |__________________^ the trait `Extend<std::string::String>` is not implemented for `Vec<warp::multipart::Part>`
    |
    = help: the following implementations were found:
              <Vec<T, A> as Extend<&'a T>>
              <Vec<T, A> as Extend<T>>
    = note: required because of the requirements on the impl of `warp::Future` for `TryCollect<futures::stream::AndThen<FormData, impl warp::Future<Output = [async output]>, [closure@main\..\warp\example.rs:212:23: 215:14]>, Vec<warp::multipart::Part>>`

because I am trying to end up with a Result<Vec<Part>, ()> which I then later iterate over the vector of Parts.

I will keep trying to see what I can do.

@nocker01 I think that happens because you are still asking for a Result<Vec<Part>> where it should be let result: Result<Vec<String>, ()>...

Thank you. I've got it working again now.

This was my original implementation when using warp 0.3.3:

use futures::TryStreamExt;
use warp::multipart::{FormData, Part};

let result: Result<Vec<Part>, ()> = form_data.try_collect().await.map_err(|_| {});

if let Ok(parts) = result {
    for p in parts {
        let p_name = p.name().to_string();
        let p_filename = p.filename().unwrap_or_default().to_string();
        let value = p
            .stream()
            .try_fold(Vec::new(), |mut vec, data| {
                vec.put(data);
                async move { Ok(vec) }
            })
            .await;

        // Handle each of the Parts...

    }
}

And this is my implementation using warp 0.3.5:

use futures::{StreamExt, TryStreamExt};
use warp::multipart::FormData;

// Extract the parts from the stream of multipart/form-data
let mut data = form_data
    .fold(post_data, |mut post_data, part| {
        async move {
            if let Ok(p) = part {
                let p_name = p.name().to_string();
                let p_filename = p.filename().unwrap_or_default().to_string();
                let value = p
            .stream()
            .try_fold(Vec::new(), |mut vec, data| {
                vec.put(data);
                async move { Ok(vec) }
            })
            .await;
            
            // Handle each of the Parts...
                
            post_data
        }
    })
    .await;