Issue when using parallel tokio tasks with a shared arc
Closed this issue · 4 comments
Jasperav commented
This test below fails, but I think it should work fine. The original CDRS has exactly the same problem.
mod test {
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig, session};
use cdrs_tokio::load_balancing::RoundRobin;
use cdrs_tokio::query::QueryExecutor;
use cdrs_tokio::authenticators::NoneAuthenticator;
use std::sync::Arc;
#[tokio::test]
async fn goo() {
let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {})
.build();
let cluster_config = ClusterTcpConfig(vec![node]);
let session = session::new(&cluster_config, RoundRobin::new()).await.unwrap();
// Set the keyspace
session.query("use benjamin").await.unwrap();
// This works!
session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();
let session = Arc::new(session);
let f = (0..=10)
.into_iter()
.map(|_| {
let session = Arc::clone(&session);
tokio::spawn(async move {
// Scylla complaining about not having a keyspace
session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();
})
})
.collect::<Vec<_>>();
for result in futures::future::join_all(f).await {
assert!(result.is_ok());
}
}
}
Error:
assertion failed: result.is_ok()
thread 'test::goo' panicked at 'called `Result::unwrap()` on an `Err` value: Server(CDRSError { error_code: 8704, message: CString { string: "No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename" }, additional_info: Invalid(SimpleError) })', src\main.rs:27:71
stack backtrace:
0: std::panicking::begin_panic_handler
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\std\src\panicking.rs:493
1: core::panicking::panic_fmt
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\panicking.rs:92
2: core::option::expect_none_failed
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\option.rs:1266
3: core::result::Result<cdrs_tokio::frame::Frame, cdrs_tokio::error::Error>::unwrap<cdrs_tokio::frame::Frame,cdrs_tokio::error::Error>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\result.rs:969
4: fortesting::test::goo::{{closure}}::{{closure}}::{{closure}}
at .\src\main.rs:27
5: core::future::from_generator::{{impl}}::poll<generator-0>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\future\mod.rs:80
6: tokio::runtime::task::core::{{impl}}::poll::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:235
7: tokio::loom::std::unsafe_cell::UnsafeCell<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>>::with_mut<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>,core::task::poll::Poll<tup
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\loom\std\unsafe_cell.rs:14
8: tokio::runtime::task::core::CoreStage<core::future::from_generator::GenFuture<generator-0>>::poll<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:225
9: tokio::runtime::task::harness::poll_future::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:422
10: std::panic::{{impl}}::call_once<core::task::poll::Poll<tuple<>>,closure-0>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:322
11: std::panicking::try::do_call<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:379
12: std::panicking::try::do_catch<std::panic::AssertUnwindSafe<closure-0>,tuple<>>
13: std::panicking::try<core::task::poll::Poll<tuple<>>,std::panic::AssertUnwindSafe<closure-0>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:343
14: std::panic::catch_unwind<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:396
15: tokio::runtime::task::harness::poll_future<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:409
16: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll_inner<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:89
17: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_sched
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:59
18: tokio::runtime::task::raw::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:104
19: tokio::runtime::task::raw::RawTask::poll
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:66
20: tokio::runtime::task::Notified<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::run<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\mod.rs:171
21: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
22: tokio::coop::with_budget::{{closure}}<tuple<>,closure-3>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:121
23: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::try_with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:272
24: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:248
25: tokio::coop::with_budget
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:114
26: tokio::coop::budget
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:98
27: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
28: tokio::runtime::basic_scheduler::enter::{{closure}}<closure-0,tuple<>,tokio::runtime::driver::Driver>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
29: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::basic_scheduler::Context>::set<tokio::runtime::basic_scheduler::Context,closure-0,tuple<>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\macros\scoped_tls.rs:61
30: tokio::runtime::basic_scheduler::enter<closure-0,tuple<>,tokio::runtime::driver::Driver>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
31: tokio::runtime::basic_scheduler::Inner<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:176
32: tokio::runtime::basic_scheduler::InnerGuard<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:405
33: tokio::runtime::basic_scheduler::BasicScheduler<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:136
34: tokio::runtime::Runtime::block_on<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\mod.rs:450
35: fortesting::test::goo
at .\src\main.rs:8
36: fortesting::test::goo::{{closure}}
at .\src\main.rs:8
37: core::ops::function::FnOnce::call_once<closure-0,tuple<>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\ops\function.rs:227
38: core::ops::function::FnOnce::call_once
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\library\core\src\ops\function.rs:227
krojew commented
Will take a look, but for now you can work around it by prepending keyspace to table names.
Jasperav commented
@krojew This didn't fix the problem. The test below fails:
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig};
#[cfg(feature = "e2e-tests")]
use std::sync::Arc;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::load_balancing::RoundRobin;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::authenticators::NoneAuthenticator;
use cdrs_tokio::query::QueryExecutor;
#[tokio::test]
#[cfg(feature = "e2e-tests")]
async fn multithread() {
let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", Arc::new(NoneAuthenticator {})).build();
let cluster_config = ClusterTcpConfig(vec![node]);
let no_compression =
cdrs_tokio::cluster::session::new(&cluster_config, RoundRobin::new()).await.expect("session should be created");
no_compression.query("CREATE KEYSPACE IF NOT EXISTS test_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await.expect("Could not create ks");
no_compression.query("use test_ks;").await.expect("Keyspace create error");
no_compression.query("create table if not exists user (user_id int primary key) WITH compaction = { 'class' : 'LeveledCompactionStrategy' };").await.expect("Could not create table");
let arc = Arc::new(no_compression);
let mut handles = vec![];
for _ in 0..100 {
let c = Arc::clone(&arc);
handles.push(tokio::spawn(async move {
let result = c.query("select * from user").await;
result
}));
}
for task in handles {
let result = task.await.unwrap();
match result {
Ok(_) => {
println!("Query went OK");
}
Err(e) => {
panic!("Query error: {:#?}", e);
}
}
}
}