cetra3/tmq

tmq::request_reply::RequestSender does not implement Copy?

spiralw opened this issue · 3 comments

I'm a Rust beginner so I may have missed something basic, but:

I'm using a different crate that gives me events in this format:

while let Some((_, event)) = events.next().await {
    // do something with event
}

I would like to send the event data to the message queue, however if I try to use the send_sock inside the loop I get this error:

use of moved value: `send_sock`

value moved here, in previous iteration of looprustc(E0382)
main.rs(12, 9): move occurs because `send_sock` has type `tmq::request_reply::RequestSender`, which does not implement the `Copy` trait

I have tried wrapping it in an Arc but I get a similar error.

What am I missing here?

This struct definitely cannot implement Copy. You have to use the sender in a different way.

Please send the code that produces the error (or ideally the whole project), otherwise we won't know where the problem is.

Sure. Here is the full code:

Expand
use std::{env, error::Error, sync::Arc};
use futures::stream::StreamExt;
use twilight_cache_inmemory::{InMemoryCache, ResourceType};
use twilight_gateway::{cluster::{Cluster, ShardScheme}, Event, EventTypeFlags};
use twilight_http::Client as HttpClient;
use twilight_model::gateway::Intents;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    let token = env::var("DISCORD_TOKEN")?;

    let mut send_sock = tmq::request(&tmq::Context::new()).connect("tcp://127.0.0.1:7897")?;

    // This is the default scheme. It will automatically create as many
    // shards as is suggested by Discord.
    let scheme = ShardScheme::Auto;

    // Use intents to only receive guild message events.
    let (cluster, mut events) = Cluster::builder(token.to_owned(), Intents::GUILD_MESSAGES)
        .event_types(
              EventTypeFlags::SHARD_PAYLOAD
            | EventTypeFlags::MESSAGE_CREATE
        )
        .shard_scheme(scheme)
        .build()
        .await?;
    let cluster = Arc::new(cluster);

    // Start up the cluster.
    let cluster_spawn = Arc::clone(&cluster);

    // Start all shards in the cluster in the background.
    tokio::spawn(async move {
        cluster_spawn.up().await;
    });

    // HTTP is separate from the gateway, so create a new client.
    let http = Arc::new(HttpClient::new(token));

    // Since we only care about new messages, make the cache only
    // cache new messages.
    let cache = InMemoryCache::builder()
        .resource_types(ResourceType::MESSAGE)
        .build();

    // Process each event as they come in.

    let mq = Arc::new(send_sock);
    while let Some((shard_id, event)) = events.next().await {
        // Update the cache with the event.
        cache.update(&event);

        match event {
            Event::MessageCreate(msg) if msg.content == "rs;test" => {
                http.create_message(msg.channel_id)
                    .content("meow")?
                    .exec()
                    .await?;
            }
            Event::ShardPayload(mut payload) => {
                match serde_json::from_slice::<PayloadInfo>(payload.bytes.as_mut_slice()) {
                    Ok(data) => {
                        match data.t {
                            Some(t) => {
                                let message: tmq::Message = "hi".as_bytes().into();
                                Arc::clone(&mq).send(message.into()).await?;

                                // println!("t {} c {}", t, data.d["channel_id"]);
                                match t.as_str() {
                                    "MESSAGE_CREATE" => {
                                        if data.d["channel_id"] == "471388251102380042" {
                                            println!("content {} author {}", data.d["content"], data.d["author"]["id"]);
                                        }
                                    }
                                    _ => {
                                        println!("unknown op {}", t);
                                    }
                                }
                            }
                            None => {}
                        }
                    }
                    Err(err) => {
                        println!("Could not deserialize payload: {}", err);
                    }
                }
            }
            Event::ShardConnected(_) => {
                println!("Connected on shard {}", shard_id);
            }
            // Other events here...
            _ => {}
        }
    
        // Ok(())

        // tokio::spawn(handle_event(shard_id, event, Arc::clone(&http), Arc::clone(&mq)));
    }

    Ok(())
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct PayloadInfo {
    pub op: i32,
    pub t: Option<String>,
    pub d: serde_json::Value,
}

Oops, I completely missed the part of the example where it assigns back to send_sock. Doing that fixed the issue.