How to get the sender of a message?
Closed this issue · 9 comments
In the scenario I'm working on, whenever a Worker joins the cluster, it sends a message with some information to the Manager. This Manager needs to store the address of the Worker who sent the message to send messages later. However, it was not clear in the documentation how to do this.
Hey @lucasmsoares96!
Currently, there's no way to access the sender of the message but you could attach the actor ref as part of the message itself.
Example code:
pub struct Worker {
manager: LocalActorRef<Manager>,
}
pub struct Hello {
worker_ref: LocalActorRef<Worker>,
}
#[async_trait]
impl Actor for Worker {
async fn on_actor_started(&mut self, ctx: &mut ActorContext) {
let worker_ref = self.actor_ref();
let _ = self.manager.notify(Hello { worker_ref });
}
}
Thanks,
Leon
Thank you very much @LeonHartley . In this case, both actors are in the same process. Will this also work for actors in different processes? Wouldn't RemoteActorRef be necessary?
You can do something similar with RemoteActorRef too, in that case you can just pass in the actor's ID as a string, and the node ID they are created on - which you can then convert to a RemoteActorRef inside the manager itself.
Something very similar is done inside the actor sharding module:
Coerce-rs/coerce/src/sharding/host/mod.rs
Line 105 in 74b13e8
Coerce-rs/coerce/src/sharding/coordinator/mod.rs
Lines 113 to 125 in 74b13e8
Amazing. Thank you very much.
No problem at all, let me know if you have any more questions! :)
I think I didn't understand something. Follow my example below:
Worker main
let remote_actor = remote
.actor_ref::<Manager>("echo-actor".to_actor_id())
.await
.expect("unable to get echo actor");
let local_actor = Worker
.into_actor(Some("worker".to_string()), remote.actor_system())
.await
.expect("Unable to start Worker Actor");
let _result = remote_actor
.send(Register {
id: local_actor.id.to_actor_id(),
node_id: remote.node_id(),
})
.await;
Worker Actor
#[async_trait]
impl Handler<Register> for Worker {
async fn handle(&mut self, message: Register, _ctx: &mut ActorContext) {
println!("{:?}", message);
}
}
Manager
#[derive(Debug, Clone, JsonMessage, Serialize, Deserialize)]
#[result("()")]
pub struct Register {
pub id: ActorId,
pub node_id: NodeId,
}
#[async_trait]
impl Handler<Register> for Manager {
async fn handle(&mut self, message: Register, ctx: &mut ActorContext) {
println!("{:?}", message);
let actor = self.get_actor(ctx, message.clone()).await;
let _ = actor
.send(Register {
id: ctx.id().into_actor_id(),
node_id: ctx.system().remote().node_id(),
})
.await;
}
}
impl Manager {
pub async fn get_actor(&self, ctx: &ActorContext, message: Register) -> ActorRef<Worker> {
let actor_id = format!("{}", &message.id).into_actor_id();
let remote = ctx.system().remote();
let leader = remote.current_leader();
if leader == Some(remote.node_id()) {
ctx.system()
.get_tracked_actor::<Worker>(actor_id)
.await
.expect("get local coordinator")
.into()
} else {
RemoteActorRef::<Worker>::new(actor_id, message.node_id, remote.clone()).into()
}
}
}
In the code leader.unwrap()
unwarp returned None
. That's why I change to message.node_id
. In this case, the Manager receives the first Register message but the Worker does not receive the second.
That depends on how you are designing your cluster. Do you only start the Manager actor on the leader node? Or are you doing something different?
What exactly is the leader node? I'm following the coerce-cluster-example and I didn't see anything related to this in the examples. Follow my complete code at the link. The Manager runs on one node and the Worker on another I think..