DRY handle_request in peer::dispatcher
Opened this issue · 0 comments
Thomasdezeeuw commented
Location:
stored/src/peer/participant.rs
Lines 335 to 606 in d3604d0
fn handle_request( | |
ctx: &mut actor::Context<Response, ThreadSafe>, | |
remote: &SocketAddr, | |
db_ref: &ActorRef<db::Message>, | |
peers: &Peers, | |
running: &mut HashMap<ConsensusId, ActorRef<VoteResult>, FxBuildHasher>, | |
request: Request, | |
) { | |
debug!("received a request: {:?}", request); | |
if request.consensus_id == PARTICIPANT_CONSENSUS_ID { | |
let msg = consensus::Message::Peer { | |
key: request.key, | |
op: request.op, | |
}; | |
peers.send_participant_consensus(msg); | |
return; | |
} | |
// TODO: DRY this. | |
match request.op { | |
Operation::AddBlob => { | |
let consensus_id = request.consensus_id; | |
if let Some(actor_ref) = running.remove(&consensus_id) { | |
warn!( | |
"received conflicting consensus ids, stopping both: consensus_id={}", | |
consensus_id | |
); | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, // NOTE: this is the wrong key. | |
result: ConsensusVote::Abort, | |
}; | |
// If we fail to send the actor already stopped, so that's | |
// fine. | |
let _ = actor_ref.send(msg); | |
return; | |
} | |
let responder = RpcResponder { | |
id: request.id, | |
actor_ref: ctx.actor_ref(), | |
phase: ConsensusPhase::init(), | |
}; | |
debug!( | |
"participant dispatcher starting store blob consensus actor: request={:?}, response={:?}", | |
request, responder | |
); | |
let consensus = consensus::store_blob_actor as fn(_, _, _, _, _, _) -> _; | |
let actor_ref = ctx.spawn( | |
|err| { | |
warn!("store blob consensus actor failed: {}", err); | |
SupervisorStrategy::Stop | |
}, | |
consensus, | |
( | |
db_ref.clone(), | |
peers.clone(), | |
*remote, | |
request.key, | |
responder, | |
), | |
ActorOptions::default().mark_ready(), | |
); | |
// Checked above that we don't have duplicates. | |
let _ = running.insert(consensus_id, actor_ref); | |
} | |
Operation::CommitStoreBlob(timestamp) => { | |
if let Some(actor_ref) = running.get(&request.consensus_id) { | |
// Relay the message to the correct actor. | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, | |
result: ConsensusVote::Commit(timestamp), | |
}; | |
if let Err(..) = actor_ref.send(msg) { | |
warn!("failed to send to consensus actor for commit request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
// In case we fail we send ourself a message to relay to | |
// the coordinator that the actor failed. | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} else { | |
warn!("can't find consensus actor for commit request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} | |
Operation::AbortStoreBlob => { | |
if let Some(actor_ref) = running.remove(&request.consensus_id) { | |
// Relay the message to the correct actor. | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, | |
result: ConsensusVote::Abort, | |
}; | |
if let Err(..) = actor_ref.send(msg) { | |
warn!("failed to send to consensus actor for abort request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
// In case we fail we send ourself a message to relay to | |
// the coordinator that the actor failed. | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} else { | |
warn!("can't find consensus actor for abort request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} | |
Operation::StoreCommitted(timestamp) => { | |
if let Some(actor_ref) = running.remove(&request.consensus_id) { | |
// Relay the message to the correct actor. | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, | |
result: ConsensusVote::Commit(timestamp), | |
}; | |
if let Err(..) = actor_ref.send(msg) { | |
warn!("failed to send to consensus actor for committed request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
} | |
} else { | |
warn!("can't find consensus actor for committed request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
} | |
} | |
Operation::RemoveBlob => { | |
let consensus_id = request.consensus_id; | |
if let Some(actor_ref) = running.get(&consensus_id) { | |
warn!( | |
"received conflicting consensus ids, stopping both: consensus_id={}", | |
consensus_id | |
); | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, // NOTE: this is the wrong key. | |
result: ConsensusVote::Abort, | |
}; | |
// If we fail to send the actor already stopped, so that's | |
// fine. | |
let _ = actor_ref.send(msg); | |
return; | |
} | |
let responder = RpcResponder { | |
id: request.id, | |
actor_ref: ctx.actor_ref(), | |
phase: ConsensusPhase::init(), | |
}; | |
debug!( | |
"participant dispatcher starting remove blob consensus actor: request={:?}, response={:?}", | |
request, responder | |
); | |
let consensus = consensus::remove_blob_actor as fn(_, _, _, _, _, _) -> _; | |
let actor_ref = ctx.spawn( | |
|err| { | |
warn!("remove blob consensus actor failed: {}", err); | |
SupervisorStrategy::Stop | |
}, | |
consensus, | |
( | |
db_ref.clone(), | |
peers.clone(), | |
*remote, | |
request.key, | |
responder, | |
), | |
ActorOptions::default().mark_ready(), | |
); | |
// Checked above that we don't have duplicates. | |
let _ = running.insert(consensus_id, actor_ref); | |
} | |
Operation::CommitRemoveBlob(timestamp) => { | |
if let Some(actor_ref) = running.get(&request.consensus_id) { | |
// Relay the message to the correct actor. | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, | |
result: ConsensusVote::Commit(timestamp), | |
}; | |
if let Err(..) = actor_ref.send(msg) { | |
warn!("failed to send to consensus actor for commit request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
// In case we fail we send ourself a message to relay to | |
// the coordinator that the actor failed. | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} else { | |
warn!("can't find consensus actor for commit request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} | |
Operation::AbortRemoveBlob => { | |
if let Some(actor_ref) = running.remove(&request.consensus_id) { | |
// Relay the message to the correct actor. | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, | |
result: ConsensusVote::Abort, | |
}; | |
if let Err(..) = actor_ref.send(msg) { | |
warn!("failed to send to consensus actor for abort request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
// In case we fail we send ourself a message to relay to | |
// the coordinator that the actor failed. | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} else { | |
warn!("can't find consensus actor for abort request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
let response = Response { | |
request_id: request.id, | |
vote: ConsensusVote::Fail, | |
}; | |
// We can always send ourselves a message. | |
ctx.actor_ref().send(response).unwrap(); | |
} | |
} | |
Operation::RemoveCommitted(timestamp) => { | |
if let Some(actor_ref) = running.remove(&request.consensus_id) { | |
// Relay the message to the correct actor. | |
let msg = VoteResult { | |
request_id: request.id, | |
key: request.key, | |
result: ConsensusVote::Commit(timestamp), | |
}; | |
if let Err(..) = actor_ref.send(msg) { | |
warn!("failed to send to consensus actor for committed request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
} | |
} else { | |
warn!("can't find consensus actor for committed request: request_id={}, consensus_id={}", | |
request.id, request.consensus_id); | |
} | |
} | |
} | |
} |
Its basically a large match
statement turned in to a 300 line monster function.