film42/sidekiq-rs

Sidekiq-rs in a Rails app

Closed this issue · 3 comments

Hello,

Great project. Thanks.

I'm currently looking at alternatives to Sidekiq, which is functionally great, with better performance.
Sidekiq-rs seems great for that.
However I have some point that aren't yet clear for me.

  1. Can I do something like this
Rails app --> Active Job (ruby) Enqueuing --> Sidekiq-rs --> Active Job (ruby) Worker
  1. In the synergie example
    From the PR
    A job is created in Ruby to be processed in Rust. But you're still launching sidekiq in the Procfile. Is it necessary

  2. Rust main
    What would be the minimum main.rs to launch the server and the middleware please ?

So for all the questions, I'm discovering Rust.
Thanks for your help,

Hey @jbpfran ! Sorry for the slow reply.

You can have your rails app enqueue a job to be processed by this crate. You can also enqueue a job with this crate to be processed by a ruby worker.

So, if you want to only have sidekiq-rs process workers with no sidekiq.rb process running, this is for sure doable.

If you want to create a sequence that looks like:

  1. Ruby app creates a job for rust app to process
  2. Rust processes the job and enqueues a new job for ruby app to process
  3. Ruby processes the new job

It would probably look something like (untested):

# Rails App
# Create stub class to make enqueueing easy to the rust worker
class Step1Worker
  include ::Sidekiq::Job 

  def perform(_state); end # handled by rust
end

class Step2Worker
  include ::Sidekiq::Job

  def perform(state)
    some_custom_logic_to_finish_the_process(state["step2_id"])
  end
end

def start
  # I'm starting the flow by enqueueing a job for rust to process
  Step1Worker.perform_async("user_id" => 42)
end
// Rust
use async_trait::async_trait;
use bb8::Pool;
use serde::{Deserialize, Serialize};
use sidekiq::{Processor, RedisConnectionManager, RedisPool, Worker};
use slog::{o, Drain};

#[derive(Deserialize, Debug, Serialize)]
struct StepperState {
    user_id: i32,
    step2_id: Option<i32>,
}

#[derive(Clone)]
struct Step1Worker {
    redis: RedisPool,
}

#[async_trait]
impl Worker<StepperState> for Step1Worker {
    async fn perform(&self, state: StepperState) -> Result<(), Box<dyn std::error::Error>> {
        self.do_something();

        // Queue the next job in the flow. The ruby app will process this.
        let mut redis = self.redis.clone();
        sidekiq::opts()
            .queue("some_flow".to_string())
            .perform_async(
                &mut redis,
                "Step2Worker".into(),
                StepperState {
                    user_id: state.user_id,
                    step2_id: Some(1337),
                },
            )
            .await?;

        Ok(())
    }
}

impl Step1Worker {
    fn do_something(&self) {
        println!("I did something!");
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Logger
    let decorator = slog_term::PlainSyncDecorator::new(std::io::stdout());
    let drain = slog_term::FullFormat::new(decorator).build().fuse();
    let logger = slog::Logger::root(drain, o!());

    // Redis
    let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
    let redis = Pool::builder().build(manager).await?;

    // Sidekiq server
    let mut p = Processor::new(redis.clone(), logger.clone(), vec!["some_flow".to_string()]);

    // Add known workers
    p.register(Step1Worker { redis });

    p.run().await;
    Ok(())
}

Hope this helps!

Thanks a lot for the explanation and examples.
Gotta a lot to explore now.

Let me know if you have any other questions ❤️ !