Why not verify waiting process is alive when send pool member to it ?
redink opened this issue · 3 comments
When a process (called as ProcessA) call checkout, poolboy will take it in waiting queue if there is no member, then poolboy monitor ProcessA .
If ProcessA crash down, poolboy will receive the 'DOWN' message and take it out from waiting queue. Other process(called as ProcessB) call checkin, poolboy will check waiting queue, and send the member to one. Here is no problem.
But, consider a scenario:
----------------------------------------|-------------|
| |
message1 ---> message2 | poolboy |
| |
----------------------------------------|-------------|
message queue gen_server process
The message1
is ProcessA 'DOWN', the message2
is ProcessB checkin, and poolboy call queue:out(Waiting)
get result
{{value, {{ProcessA, _}, CRef, MRef}}, Left}
The problem is here, if we not verify the ProcessA alive before send member to it, the member will not return because the ProcessA already DOWN. Poolboy gen_server process just not handle the 'DOWN' message(message1) though the message has been put into process mailbox.
Yes,the member will not return because the ProcessA already down.But the DOWN message from ProcessA has been put into process mailbox will be received by the poll manager.That is to say,the member came back now!It just not be used.
Checked code again, my worry is unnecessary.
If the ProcessA crash during checkout
, will call cancel_waiting
:
checkout(Pool, Block, Timeout) ->
CRef = make_ref(),
try
gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
catch
Class:Reason ->
gen_server:cast(Pool, {cancel_waiting, CRef}),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end.
handle_cast callback for cancel_waiting
will call checkin
:
handle_cast({cancel_waiting, CRef}, State) ->
case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
[[Pid, MRef]] ->
demonitor(MRef, [flush]),
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
demonitor(MRef, [flush]),
false;
(_) ->
true
end,
Waiting = queue:filter(Cancel, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
So, my problem is solved. Thanks .
The ProcessA has been monitored by the poolboy server when it be put into the waitting queue ,so it will send back a 'DOWN' message to poolboy when it crash.So like the code:
handle_info({'DOWN', MRef, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
[[Pid]] ->
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
If there isn't the try code:
checkout(Pool, Block, Timeout) ->
CRef = make_ref(),
try
gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
catch
Class:Reason ->
gen_server:cast(Pool, {cancel_waiting, CRef}),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end.
The worker also be checkin.