Python Driver fails to update existing `PreparedStatement`s after table alter
Opened this issue · 2 comments
The following test written for Scylla's test.py framework fails:
import logging
import pytest
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_alter_prepared(manager: ManagerClient) -> None:
s1 = await manager.server_add()
cql, _ = await manager.get_ready_cql([s1])
logger.info("ks")
await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await cql.run_async("create table ks.t (pk int primary key, x int)")
stmt = cql.prepare("update ks.t set x = ? where pk = ?")
await cql.run_async(stmt, [0, 0])
await cql.run_async("alter table ks.t alter x type blob")
#stmt = cql.prepare("update ks.t set x = ? where pk = ?")
#await cql.run_async(stmt, [0, 0])
await cql.run_async(stmt, [b'', 0])
with:
E TypeError: Received an argument of invalid type for column "x". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'bytes'>; (required argument is not an integer)
In this case (I investigated), the Python driver does not attempt a statement reprepare at all.
This is problem number 1.
If we uncomment:
await cql.run_async(stmt, [0, 0])
(second-to-last statement),
this triggers a reprepare (which I checked by adding some logs to the driver), because driver sends a request and gets a "query not prepared" response from Scylla. This statement passes. However, the following statement:
await cql.run_async(stmt, [b'', 0])
still fails, even after repreparation.
This is problem number 2.
If we uncomment the explicit
stmt = cql.prepare("update ks.t set x = ? where pk = ?")
(after the alter
),
then the last statement passes (the one which sends [b'', 0]
), whether or not we uncomment the the previous statement (the one which sends [0, 0]
).
I found an easy way to solve problem number 2, by adjusting the reprepare code to update column_metadata
inside the PreparedStatement
object (following the logic used when PreparedStatement
is first created by prepare
: column_metadata
is set to the response bind_metadata
):
diff --git a/cassandra/cluster.py b/cassandra/cluster.py
index 5f2669c0..aaff3432 100644
--- a/cassandra/cluster.py
+++ b/cassandra/cluster.py
@@ -4963,6 +4963,7 @@ class ResponseFuture(object):
)
))
self.prepared_statement.result_metadata = response.column_metadata
+ self.prepared_statement.column_metadata = response.bind_metadata
new_metadata_id = response.result_metadata_id
if new_metadata_id is not None:
self.prepared_statement.result_metadata_id = new_metadata_id
(this is in _execute_after_prepare
function in cassandra/cluster.py).
(BTW. the names are weird, yes)
However I don't know how to solve problem 1. In problem 1, the driver does not even attempt a reprepare, because it fails on serialization stage before even sending the request, due to outdated column_metadata
which it apparently uses to do the serialization.
The problems have different symptoms (although the root causes are the same), if instead changing the type of a column using alter ks.t alter x type ...
, we introduce a user-defined type, and then add a column to this type. Consider this test:
import logging
import pytest
from dataclasses import dataclass
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@dataclass
class Typ1:
a: int
@dataclass
class Typ2:
a: int
b: int
@pytest.mark.asyncio
async def test_alter_prepared(manager: ManagerClient) -> None:
s1 = await manager.server_add()
cql, _ = await manager.get_ready_cql([s1])
logger.info("ks")
await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await cql.run_async("create type ks.typ (a int)")
await cql.run_async("create table ks.t (pk int primary key, x frozen<typ>)")
stmt = cql.prepare("update ks.t set x = ? where pk = ?")
await cql.run_async(stmt, [Typ1(0), 0])
await cql.run_async("alter type ks.typ add b int")
#stmt = cql.prepare("update ks.t set x = ? where pk = ?")
await cql.run_async(stmt, [Typ2(0, 0), 1])
await cql.run_async(stmt, [Typ2(0, 0), 2])
rs = await cql.run_async("select pk, x from ks.t")
logger.info(list(rs))
There are no failures. However, the result is:
[Row(pk=1, x=typ(a=0, b=None)), Row(pk=0, x=typ(a=0, b=None)), Row(pk=2, x=typ(a=0, b=None))]
So b
is None
for pk=1
and pk=2
, even though we bound Typ2(0, 0)
when updating those keys.
After my column_metadata
fix, the result changes to:
[Row(pk=1, x=typ(a=0, b=None)), Row(pk=0, x=typ(a=0, b=None)), Row(pk=2, x=typ(a=0, b=0))]
So for pk=1
the result is still None
. Even though this statement causes a repreparation; the driver should in theory resend the query with updated column_metadata
so b
should get updated for pk=1
too.
But for pk=2
the result is correct -- apparently this time the updated column_metadata
helped.
Uncommenting the explicit prepare
also helps.
So is this a bug?
Well, at least Rust driver does provide a much better user experience. This (executed against pre-existing single-node cluster):
use anyhow::Result;
use scylla::{SessionBuilder};
use std::env;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new() .known_node(uri) .build() .await?;
let session = Arc::new(session);
session.query("DROP KEYSPACE IF EXISTS ks", &[],).await?;
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", &[],).await?;
session.query("CREATE TABLE ks.t ( a int primary key, b int)", &[],).await?;
let insert = Arc::new(session.prepare("UPDATE ks.t SET b = ? WHERE a = ? ").await?);
if let Err(e) = session.execute(&insert, (0 as i32, 0 as i32)).await {
println!("{}", e);
std::process::exit(1);
}
session.query("ALTER TABLE ks.t alter b type blob", &[],).await?;
if let Err(e) = session.execute(&insert, (Vec::<u8>::new(), 1 as i32)).await {
println!("{}", e);
std::process::exit(1);
}
std::process::exit(0);
}
works out of the box, and gives
cqlsh> select * from ks.t;
a | b
---+------------
1 | 0x
0 | 0x00000000
(2 rows)
looking at the code, apparently Rust driver does not have a corresponding structure to Python driver's column_metadata
/ bind_metadata
to be used for serialization; instead, it seems to use the types coming with the tuple. And the first statement after the ALTER reprepares, but does not seem to update the insert
object, which apparently is not necessary for this driver.
Similarly the experience is better with UDTs:
use anyhow::Result;
use scylla::{SessionBuilder};
use std::env;
use std::sync::Arc;
use scylla::macros::{FromUserType, IntoUserType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new() .known_node(uri) .build() .await?;
let session = Arc::new(session);
session.query("DROP KEYSPACE IF EXISTS ks", &[],).await?;
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", &[],).await?;
session.query("CREATE TYPE ks.typ (a int,); ", &[],).await?;
session.query("CREATE TABLE ks.t ( a int primary key, x frozen<typ>)", &[],).await?;
#[derive(Debug, IntoUserType, FromUserType, Clone)]
struct Typ1 {
a: i32,
}
#[derive(Debug, IntoUserType, FromUserType, Clone)]
struct Typ2 {
a: i32,
b: i32,
}
let insert = Arc::new(session.prepare("UPDATE ks.t SET x = ? WHERE a = ? ").await?);
if let Err(e) = session.execute(&insert, (&Typ1{a: 0}, 0 as i32)).await {
println!("{}", e);
std::process::exit(1);
}
session.query("ALTER type ks.typ add b int", &[],).await?;
if let Err(e) = session.execute(&insert, (&Typ2{a: 0, b: 0}, 1 as i32)).await {
println!("{}", e);
std::process::exit(1);
}
std::process::exit(0);
}
result:
cqlsh> select * from ks.t;
a | x
---+-----------------
1 | {a: 0, b: 0}
0 | {a: 0, b: null}
(2 rows)
cc @Lorak-mmk
First, I think you are using quite an old Rust Driver version. Current is 0.13 and 0.11 released in December introduced new serialization traits that perform type checking on the client side.
Currently your first Rust example fails with:
Serializing values failed: SerializationError: Failed to serialize query arguments (alloc::vec::Vec<u8>, i32): failed to serialize column b: SerializationError: Failed to type check Rust type alloc::vec::Vec<u8> against CQL type Int: expected one of the CQL types: [Blob]
and second (after slight modifications to make it work with current version) fails with:
Serializing values failed: SerializationError: Failed to serialize query arguments (&testrow::main::{{closure}}::Typ2, i32): failed to serialize column x: SerializationError: Failed to type check Rust type testrow::main::{{closure}}::Typ2 against CQL type UserDefinedType { type_name: "typ", keyspace: "ks", field_types: [("a", Int)] }: the field b is missing in the Rust data but is required by the CQL UDT type
which btw seems like a problem with error message: it says that field b is missing in Rust data, but it's missing in CQL UDT - so it's the other way around than message says.
Now to address the issue: to the best of my knowledge what you described is how all the drivers work - metadata in prepared statements is constant and user needs to take care of updating this metadata by creating the prepared statement again.
I see some reasons for that:
- Serialization happens once, and driver would need to re-serialize before sending again. In Rust Driver we had multiple questions and issues regarding serialization and having more control over it. We also plan to introduce BoundStatement that allows serialization before calling execute.
- Schema change is imo always connected to application code change. If you remove column then queries using this column will stop working - you need to change them. If you add a column then you need to modify you queries to make use of it. If you change type of column then you need to change type of what you use in queries.
- Schema change is not an atomic event iiuc and I don't see how exactly should the driver deal with different nodes having temporarily different schema versions.
We could discuss doing it in different way if you want, but I suspect it would require significant, possibly backwards incompatible, changes in the driver.
Regarding your proposed fix: prepared statement is a shared object that can be used concurrently in multiple queries. It's not hard to imagine scenario where driver sends a query to node A, updates metadata, concurrently sends to B which has older schema and updates metadata back - not allowing you to send statement using new data type.
If you add a column then you need to modify you queries to make use of it. If you change type of column then you need to change type of what you use in queries.
That would mean disruption in availability.
Adding column: obviously you don't have to change the application, you can keep omitting this column from INSERT or UPDATE, until you are ready to deploy new version of your app. But this takes time. Schema change can be done ahead of time.
Changing column type to a compatible one, in particular, adding new field to a user-defined-type: same argument.
Schema change is not an atomic event iiuc and I don't see how exactly should the driver deal with different nodes having temporarily different schema versions.
Driver has a pool of connections. A given shard has a single schema version at a given time. Given connections acts according to that version.