scylladb/scylla-rust-driver

Enhancement Proposal: Simplifying Token Calculation for High-Frequency Append-Log Style Operations

Opened this issue · 8 comments

Enhancement Proposal: Simplifying Token Calculation for High-Frequency Append-Log Style Operations

Overview:

In our current project utilizing ScyllaDB, we're implementing a high-frequency, append-log style architecture to handle concurrent append requests. To optimize performance and minimize network traffic, we're batching these requests similar to how Kafka API operates, sending batches to ScyllaDB every 10 milliseconds.

To ensure efficient batching and minimize network overhead, it's crucial to group insert requests that will ultimately end up on the same node within ScyllaDB. This necessitates the computation of tokens for each insert statement, enabling us to determine their placement within the token ring.

Current Challenge:

Presently, the existing API poses challenges in efficiently computing the token of a PreparedStatement without incurring significant performance overhead. The process involves invoking Session::calculate_token, which necessitates serializing a row (resulting in memory allocation), extracting the partition key, and then computing the token. Subsequently, when batching these statements using Session::batch, each row undergoes serialization again, effectively doubling memory allocation and serialization overhead.

Immediate Solution

To streamline this process and enhance performance, we propose making Session::calculate_token_untyped public instead of keeping it pub(crate). By exposing this method publicly, we can pre-serialize every row, thereby reusing the serialization results to compute tokens and seamlessly integrate them into our batching process.

Additionnal Note

In addition to the proposed enhancement of making Session::calculate_token_untyped public, we suggest making the PartitionHasher publicly accessible as well. This would empower users to compute results in advance without having to go through the serialization process of SerializeRow and PreparedStatement.

Considering that many ScyllaDB use cases involve key-value stores where the partition key is often known early on, exposing PartitionHasher would facilitate more efficient pre-computation of tokens, enhancing overall performance and developer experience.

I'm reluctant to expose such functions publicly - I'd rather think about the solution that avoids the allocation while preserving type safety.
BoundStatement could possibly be such a solution ( #941 ) - it would allow the values to be serialized only once and you could compute token of such statement without serializing again.

In order to decrease the impact for now you could use ClusterData::compute_token - that way you'll only need to allocate and serialize partition key twice, not all values for the query.

By the way, did you benchmark the performance impact of batches? Does it actually improve performance compared to executing the queries in parallel?
If so, did you benchmark grouping queries on the client side vs skipping grouping and letting Scylla deal with it?

Enhancement Proposal: Simplifying Token Calculation for High-Frequency Append-Log Style Operations

Overview:

In our current project utilizing ScyllaDB, we're implementing a high-frequency, append-log style architecture to handle concurrent append requests. To optimize performance and minimize network traffic, we're batching these requests similar to how Kafka API operates, sending batches to ScyllaDB every 10 milliseconds.

To ensure efficient batching and minimize network overhead, it's crucial to group insert requests that will ultimately end up on the same node within ScyllaDB. This necessitates the computation of tokens for each insert statement, enabling us to determine their placement within the token ring.

Current Challenge:

Presently, the existing API poses challenges in efficiently computing the token of a PreparedStatement without incurring significant performance overhead. The process involves invoking Session::calculate_token, which necessitates serializing a row (resulting in memory allocation), extracting the partition key, and then computing the token. Subsequently, when batching these statements using Session::batch, each row undergoes serialization again, effectively doubling memory allocation and serialization overhead.

You are talking about PreparedStatement::calculate_token, not Session::calculate_token (there is no such thing), right?

Immediate Solution

To streamline this process and enhance performance, we propose making Session::calculate_token_untyped public instead of keeping it pub(crate). By exposing this method publicly, we can pre-serialize every row, thereby reusing the serialization results to compute tokens and seamlessly integrate them into our batching process.

Additionnal Note

In addition to the proposed enhancement of making Session::calculate_token_untyped public, we suggest making the PartitionHasher publicly accessible as well. This would empower users to compute results in advance without having to go through the serialization process of SerializeRow and PreparedStatement.

Considering that many ScyllaDB use cases involve key-value stores where the partition key is often known early on, exposing PartitionHasher would facilitate more efficient pre-computation of tokens, enhancing overall performance and developer experience.

One more thing: does your proposed change actually let you do what you want? In order to use calculate_token_untyped you need SerializedValues which don't implement SerializeRow and so can't be passed as values to a query.

One more thing: does your proposed change actually let you do what you want? In order to use calculate_token_untyped you need SerializedValues which don't implement SerializeRow and so can't be passed as values to a query.

@Lorak-mmk yes, what I do is a serialize first my rows and want to send it over calculate_token_untyped.

By the way, did you benchmark the performance impact of batches? Does it actually improve performance compared to executing the queries in parallel?
If so, did you benchmark grouping queries on the client side vs skipping grouping and letting Scylla deal with it?

In our system, we are tasked with achieving a throughput requirement of 125MB/s. It is imperative for us to maintain low latency throughout our operations. We have identified that any delays introduced by ScyllaDB in rerouting have a significant impact on our overall latency performance.

To address this concern and optimize our system for efficiency, we are exploring the implementation of parallel batch processing. Our strategy involves assigning dedicated threads to handle batch inserts for specific nodes. By segmenting the workload in this manner, we aim to minimize the impact of rerouting on latency and ensure consistent performance across the system.

In scylladb protocol, each batch frame is capable of accommodating up to 256MB of data and supports Lz4 compression. Batching data reduces entropy since we batch related data together thus enhancing compression ratios when utilizing algorithms such as LZ4.

@Lorak-mmk

In order to decrease the impact for now you could use ClusterData::compute_token - that way you'll only need to allocate and serialize partition key twice, not all values for the query

I tried to use this function, however, it requires a serialized partition key which is only accessible through a PreparedStatement. The overall driver API makes partition key handling quite convoluting :

I have to go through a PreparedStatement because I can not forge my own RowSerializationContext (since everything is pub(crate)) which is required for serialization.

@Lorak-mmk

In order to decrease the impact for now you could use ClusterData::compute_token - that way you'll only need to allocate and serialize partition key twice, not all values for the query

I tried to use this function, however, it requires a serialized partition key which is only accessible through a PreparedStatement. The overall driver API makes partition key handling quite convoluting :

I have to go through a PreparedStatement because I can not forge my own RowSerializationContext (since everything is pub(crate)) which is required for serialization.

ClusterData::compute_token needs SerializedValues. SerializedValues can be created manually, without PreparedStatement / RowSerializationContext: first you create it with new method and then add partition key values using add_value method.

In scylladb protocol, each batch frame is capable of accommodating up to 256MB of data and supports Lz4 compression. Batching data reduces entropy since we batch related data together thus enhancing compression ratios when utilizing algorithms such as LZ4.

nit: LZ4 does not have entropy encoding.

In scylladb protocol, each batch frame is capable of accommodating up to 256MB of data and supports Lz4 compression. Batching data reduces entropy since we batch related data together thus enhancing compression ratios when utilizing algorithms such as LZ4.

nit: LZ4 does not have entropy encoding.

I meant that as data have less entropy, the compression ratio will be higher. I was not refering the LZ4 entropyless encoding.
In my use case, data in same partition will be extremely similar (low entropy) therefore I will benefits from compression.

Ten0 commented

To ensure efficient batching and minimize network overhead, it's crucial to group insert requests that will ultimately end up on the same node within ScyllaDB. This necessitates the computation of tokens for each insert statement, enabling us to determine their placement within the token ring.

It looks like #738 may be a more direct API than #975 for what you're trying to achieve.
Could you please confirm that is indeed the case?
(See the tests/integration/shard_aware_batching.rs file for a complete usage example.)