krojew/cdrs-tokio

increase in size for a byte array

Closed this issue · 23 comments

I am trying to insert a compress string of size ~5Mb after compression as a byte array (Blob). when insert the actual size of the row is 23Mb which pass the database hard limit (16Mb).

I check the size of the array right before I send it to the driver. There could be an increase of size during the drive processing the data?

Thank you.

Do you happen to have a simple test case? Which compression are you using?

I have a service in rust which received the compressed data from a python code. I use lz4. I was able to insert the same exact data using python driver.

Can you share your code how you insert the data (with the types used)?

I cannot share my code since it for my company.
I use type of blob in the Cassandra table.
In the code I insert it as Vec<u8>

it is something like this:

struct CassandraItem {
    id_number: String,
    blob: BinaryData,
}

impl CassandraItem {
    fn into_query_values(self) -> QueryValues {
        query_values!(
            self.id_number,
            self.blob.inner().to_vec(), // convert the Bytes obj to Vec<u8>
        )
    }
}


async fn insert_row(
    session: Arc<CurrentSession>,
    row: CassandraItem,
) -> Result<Envelope, String> {
    match session
        .query_with_values("INSERT INTO key_space.table1 (id_number,blob) VALUES (?,?)", row.into_query_values())
        .await
    {
        Ok(res) => Ok(res),
        Err(err) => {
            ...
        }
    }
}

Thanks for the code snippet - now it's clear why it's failing. Cassandra contains the list type which is represented by Vec in Rust. When you pass a Vec<u8> as a query value, you are creating a list of u8 values, not a blob of binary data. You should either use the Blob type from the crate, or implement Into<Bytes> for the type you wish to pass as a query value.

Can you confirm it works by doing that?

Also, please update to the latest patch release due to a bug in compressing large envelopes.

Thank you for the clarification.

when I do:

impl CassandraItem {
    fn into_query_values(self) -> QueryValues {
        query_values!(
            self.id_number,
            Blob::from(self.blob.inner())
        )
    }
}

it stack, doesn't panic but also doesn't return back, looks like infinite loop.

For the other option, how can I implementing into<Bytes> if it is private and not in the same trait as Bytes?

Are you sure you're using the patched version from today? It fixed some problems with compressed messages. As for Bytes - it's public: https://docs.rs/cdrs-tokio/latest/cdrs_tokio/types/value/struct.Bytes.html

Sorry I didn't realized it is pub.
So, I tried Bytes::new(self.blob.inner().to_vec()), still stack.
Since I update to 7.0.1 it doesn't work - it appears to be getting into a infinite loop and doesn't return at all.
The problem is not in the db since I can contact it with python driver.
Any thoughts?

The tests at https://github.com/krojew/cdrs-tokio/blob/master/cdrs-tokio/tests/compression.rs send 5MB blob to the db and read it back, with compression. Therefore it generally works, so something is specific in your case, but it's hard to tell what without details. Let's try to dig deeper.

  1. How do you know it's an infinite loop?
  2. Can you run these tests with your db configuration and see if they work?
  3. Are there any logs present?
  4. Can you use a debugger and see where it's stuck?

apparently 7.0.1 broke my code, when I used 7.0.0 it worked. Any thoughts?
Using Blob::from(self.blob.inner()) worked. Thank you so much!

Can you run those linked tests with your db setup on 7.0.1?

Please let me know how I should run library built in test

  1. Run your cluster in the configuration you're having problems with.
  2. Edit compression.rs and change cluster settings to the ones you're having problems with.
  3. Run cargo test --test compression encode_decode_test_v4 --no-fail-fast --all-features -- --test-threads=1 and cargo test --test compression encode_decode_test_v5 --no-fail-fast --all-features -- --test-threads=1.

I tried it, and I don't get response as before.

That's good - that confirms something is specific with your cluster configuration. Can you paste your changes to compression.rs and how you run your cluster?

I just changed the session creation. Unfortunately, I cannot paste it here.

it is something like that:

let cluster_config = NodeTcpConfigBuilder::new()
        .with_contact_points(seeds)
        .with_authenticator_provider(Arc::new(authenticator))
        .build()
        .await
        .unwrap();

let session =
        TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config).build().unwrap();

What kind of authenticator are you using? Maybe the cluster is stuck on authentication.

let authenticator = StaticPasswordAuthenticatorProvider::new("username", "password");

Can you disable authentication and try again without an authenticator?

I have to provide one

Just tested with an static password authentication and everything works. I ran the cluster with:

authenticator: PasswordAuthenticator
role_manager: CassandraRoleManager

The only changes to compression.rs tests are:

.with_authenticator_provider(Arc::new(StaticPasswordAuthenticatorProvider::new(
            "cassandra",
            "cassandra",
        )))

Therefore something is probably wrong with your cluster. Unless you can provide more details, e.g. how do you know it's in infinite loop or the cluster configuration, I cannot provide a solution.

Works now. dont know why it didnt work before. Thank you!