Thomasdezeeuw/stored

DRY handle_request in peer::dispatcher

Opened this issue · 0 comments

Location:

fn handle_request(
ctx: &mut actor::Context<Response, ThreadSafe>,
remote: &SocketAddr,
db_ref: &ActorRef<db::Message>,
peers: &Peers,
running: &mut HashMap<ConsensusId, ActorRef<VoteResult>, FxBuildHasher>,
request: Request,
) {
debug!("received a request: {:?}", request);
if request.consensus_id == PARTICIPANT_CONSENSUS_ID {
let msg = consensus::Message::Peer {
key: request.key,
op: request.op,
};
peers.send_participant_consensus(msg);
return;
}
// TODO: DRY this.
match request.op {
Operation::AddBlob => {
let consensus_id = request.consensus_id;
if let Some(actor_ref) = running.remove(&consensus_id) {
warn!(
"received conflicting consensus ids, stopping both: consensus_id={}",
consensus_id
);
let msg = VoteResult {
request_id: request.id,
key: request.key, // NOTE: this is the wrong key.
result: ConsensusVote::Abort,
};
// If we fail to send the actor already stopped, so that's
// fine.
let _ = actor_ref.send(msg);
return;
}
let responder = RpcResponder {
id: request.id,
actor_ref: ctx.actor_ref(),
phase: ConsensusPhase::init(),
};
debug!(
"participant dispatcher starting store blob consensus actor: request={:?}, response={:?}",
request, responder
);
let consensus = consensus::store_blob_actor as fn(_, _, _, _, _, _) -> _;
let actor_ref = ctx.spawn(
|err| {
warn!("store blob consensus actor failed: {}", err);
SupervisorStrategy::Stop
},
consensus,
(
db_ref.clone(),
peers.clone(),
*remote,
request.key,
responder,
),
ActorOptions::default().mark_ready(),
);
// Checked above that we don't have duplicates.
let _ = running.insert(consensus_id, actor_ref);
}
Operation::CommitStoreBlob(timestamp) => {
if let Some(actor_ref) = running.get(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::AbortStoreBlob => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Abort,
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::StoreCommitted(timestamp) => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
} else {
warn!("can't find consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
}
Operation::RemoveBlob => {
let consensus_id = request.consensus_id;
if let Some(actor_ref) = running.get(&consensus_id) {
warn!(
"received conflicting consensus ids, stopping both: consensus_id={}",
consensus_id
);
let msg = VoteResult {
request_id: request.id,
key: request.key, // NOTE: this is the wrong key.
result: ConsensusVote::Abort,
};
// If we fail to send the actor already stopped, so that's
// fine.
let _ = actor_ref.send(msg);
return;
}
let responder = RpcResponder {
id: request.id,
actor_ref: ctx.actor_ref(),
phase: ConsensusPhase::init(),
};
debug!(
"participant dispatcher starting remove blob consensus actor: request={:?}, response={:?}",
request, responder
);
let consensus = consensus::remove_blob_actor as fn(_, _, _, _, _, _) -> _;
let actor_ref = ctx.spawn(
|err| {
warn!("remove blob consensus actor failed: {}", err);
SupervisorStrategy::Stop
},
consensus,
(
db_ref.clone(),
peers.clone(),
*remote,
request.key,
responder,
),
ActorOptions::default().mark_ready(),
);
// Checked above that we don't have duplicates.
let _ = running.insert(consensus_id, actor_ref);
}
Operation::CommitRemoveBlob(timestamp) => {
if let Some(actor_ref) = running.get(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::AbortRemoveBlob => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Abort,
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::RemoveCommitted(timestamp) => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
} else {
warn!("can't find consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
}
}
}

Its basically a large match statement turned in to a 300 line monster function.