krojew/cdrs-tokio

Issue when using parallel tokio tasks with a shared arc

Closed this issue · 4 comments

This test below fails, but I think it should work fine. The original CDRS has exactly the same problem.

mod test {
    use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig, session};
    use cdrs_tokio::load_balancing::RoundRobin;
    use cdrs_tokio::query::QueryExecutor;
    use cdrs_tokio::authenticators::NoneAuthenticator;
    use std::sync::Arc;

    #[tokio::test]
    async fn goo() {
        let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {})
            .build();
        let cluster_config = ClusterTcpConfig(vec![node]);
        let session = session::new(&cluster_config, RoundRobin::new()).await.unwrap();

        // Set the keyspace
        session.query("use benjamin").await.unwrap();

        // This works!
        session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();

        let session = Arc::new(session);

        let f = (0..=10)
            .into_iter()
            .map(|_| {
                let session = Arc::clone(&session);

                tokio::spawn(async move {
                    // Scylla complaining about not having a keyspace
                    session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();
                })
            })
            .collect::<Vec<_>>();

        for result in futures::future::join_all(f).await {
            assert!(result.is_ok());
        }
    }
}

Error:

assertion failed: result.is_ok()
thread 'test::goo' panicked at 'called `Result::unwrap()` on an `Err` value: Server(CDRSError { error_code: 8704, message: CString { string: "No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename" }, additional_info: Invalid(SimpleError) })', src\main.rs:27:71
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\std\src\panicking.rs:493
   1: core::panicking::panic_fmt
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\panicking.rs:92
   2: core::option::expect_none_failed
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\option.rs:1266
   3: core::result::Result<cdrs_tokio::frame::Frame, cdrs_tokio::error::Error>::unwrap<cdrs_tokio::frame::Frame,cdrs_tokio::error::Error>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\result.rs:969
   4: fortesting::test::goo::{{closure}}::{{closure}}::{{closure}}
             at .\src\main.rs:27
   5: core::future::from_generator::{{impl}}::poll<generator-0>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\future\mod.rs:80
   6: tokio::runtime::task::core::{{impl}}::poll::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:235
   7: tokio::loom::std::unsafe_cell::UnsafeCell<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>>::with_mut<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>,core::task::poll::Poll<tup
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\loom\std\unsafe_cell.rs:14
   8: tokio::runtime::task::core::CoreStage<core::future::from_generator::GenFuture<generator-0>>::poll<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:225
   9: tokio::runtime::task::harness::poll_future::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:422
  10: std::panic::{{impl}}::call_once<core::task::poll::Poll<tuple<>>,closure-0>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:322
  11: std::panicking::try::do_call<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:379
  12: std::panicking::try::do_catch<std::panic::AssertUnwindSafe<closure-0>,tuple<>>
  13: std::panicking::try<core::task::poll::Poll<tuple<>>,std::panic::AssertUnwindSafe<closure-0>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:343
  14: std::panic::catch_unwind<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:396
  15: tokio::runtime::task::harness::poll_future<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:409
  16: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll_inner<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:89
  17: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_sched
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:59
  18: tokio::runtime::task::raw::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:104
  19: tokio::runtime::task::raw::RawTask::poll
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:66
  20: tokio::runtime::task::Notified<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::run<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\mod.rs:171
  21: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
  22: tokio::coop::with_budget::{{closure}}<tuple<>,closure-3>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:121
  23: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::try_with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:272
  24: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:248
  25: tokio::coop::with_budget
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:114
  26: tokio::coop::budget
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:98
  27: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
  28: tokio::runtime::basic_scheduler::enter::{{closure}}<closure-0,tuple<>,tokio::runtime::driver::Driver>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
  29: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::basic_scheduler::Context>::set<tokio::runtime::basic_scheduler::Context,closure-0,tuple<>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\macros\scoped_tls.rs:61
  30: tokio::runtime::basic_scheduler::enter<closure-0,tuple<>,tokio::runtime::driver::Driver>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
  31: tokio::runtime::basic_scheduler::Inner<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:176
  32: tokio::runtime::basic_scheduler::InnerGuard<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:405
  33: tokio::runtime::basic_scheduler::BasicScheduler<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:136
  34: tokio::runtime::Runtime::block_on<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\mod.rs:450
  35: fortesting::test::goo
             at .\src\main.rs:8
  36: fortesting::test::goo::{{closure}}
             at .\src\main.rs:8
  37: core::ops::function::FnOnce::call_once<closure-0,tuple<>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\ops\function.rs:227
  38: core::ops::function::FnOnce::call_once
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\library\core\src\ops\function.rs:227

Will take a look, but for now you can work around it by prepending keyspace to table names.

@krojew This didn't fix the problem. The test below fails:

#[cfg(feature = "e2e-tests")]
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig};
#[cfg(feature = "e2e-tests")]
use std::sync::Arc;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::load_balancing::RoundRobin;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::authenticators::NoneAuthenticator;
use cdrs_tokio::query::QueryExecutor;

#[tokio::test]
#[cfg(feature = "e2e-tests")]
async fn multithread() {
    let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", Arc::new(NoneAuthenticator {})).build();
    let cluster_config = ClusterTcpConfig(vec![node]);
    let no_compression =
        cdrs_tokio::cluster::session::new(&cluster_config, RoundRobin::new()).await.expect("session should be created");

    no_compression.query("CREATE KEYSPACE IF NOT EXISTS test_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await.expect("Could not create ks");
    no_compression.query("use test_ks;").await.expect("Keyspace create error");
    no_compression.query("create table if not exists user (user_id int primary key) WITH compaction = { 'class' : 'LeveledCompactionStrategy' };").await.expect("Could not create table");

    let arc = Arc::new(no_compression);
    let mut handles = vec![];

    for _ in 0..100 {
        let c = Arc::clone(&arc);

        handles.push(tokio::spawn(async move {
            let result = c.query("select * from user").await;

            result
        }));
    }

    for task in handles {
        let result = task.await.unwrap();

        match result {
            Ok(_) => {
                println!("Query went OK");
            }
            Err(e) => {
                panic!("Query error: {:#?}", e);
            }
        }
    }
}

@krojew Can you please re-open this issue? I don't have the option. Do you know why this issue occurs?

Thanks for the fix @krojew