scylladb/scylladb

The gossiped view update backlog is updated on the wrong shard

wmitros opened this issue · 1 comments

We have 3 sources of the view update backlog:

  1. The view update semaphore on each shard:

    scylladb/replica/database.hh

    Lines 1847 to 1849 in 2d91422

    db::view::update_backlog get_view_update_backlog() const {
    return {max_memory_pending_view_updates() - _view_update_concurrency_sem.current(), max_memory_pending_view_updates()};
    }
  2. The cached view update backlogs for all shards that need to be accessed atomically, used for gossiping and for propagating by responses:

    scylladb/db/view/view.cc

    Lines 2612 to 2627 in 2d91422

    update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog backlog) {
    _backlogs[shard].backlog.store(backlog, std::memory_order_relaxed);
    auto now = clock::now();
    if (now >= _last_update.load(std::memory_order_relaxed) + _interval) {
    _last_update.store(now, std::memory_order_relaxed);
    auto new_max = boost::accumulate(
    _backlogs,
    update_backlog::no_backlog(),
    [] (const update_backlog& lhs, const per_shard_backlog& rhs) {
    return std::max(lhs, rhs.load());
    });
    _max.store(new_max, std::memory_order_relaxed);
    return new_max;
    }
    return std::max(backlog, _max.load(std::memory_order_relaxed));
    }
  3. The max view update backlogs stored for all replicas, which store the maximum backlog across all shards of a node, and are actually used when deciding how much should requests be delayed by the mv flow control
    std::unordered_map<gms::inet_address, view_update_backlog_timestamped> _view_update_backlogs;

The values from backlog 1) are finally stored as 3) in the following way:

  1. In gossip, every 1s each shard checks the current backlog 1), saves it as 2) and if the maximum of backlogs 2) across all shards is different than the previously gossiped value, the new value is gossiped and eventually stored in all replicas as backlog 3)
  2. In mutation responses, when coordinator sends a mutation to another node, and the mutation applying is finished, only the coordinator shard on replica updates its backlog 2) and sends the maximum of backlogs 2) along with the response. The coordinator (node handling the request) then updates the corresponding backlog of type 3)
  3. When the coordinator sends mutation to the same node the coordinator's on (local mutation), some shard performs the update, and then, the coordinator shard updates the value of its backlog 1), takes the maximum of backlogs 2) and updates the backlog 3) for the local node with the resulting value

Both 2) and 3) pathways are incorrect, the updated backlog should come from the shard that performed the mutation, not from the coordinator's shard:

auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable {
return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state())
.then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] {
// make mutation alive until it is processed locally, otherwise it
// may disappear if write timeouts before this future is ready
got_response(response_id, my_address, get_view_update_backlog());
});
};

future<rpc::no_wait_type> handle_write(
netw::messaging_service::msg_addr src_addr, rpc::opt_time_point t,
auto schema_version, auto in, const inet_address_vector_replica_set& forward, gms::inet_address reply_to,
unsigned shard, storage_proxy::response_id_type response_id, const std::optional<tracing::trace_info>& trace_info,
fencing_token fence, auto&& apply_fn1, auto&& forward_fn1) {
auto apply_fn = std::move(apply_fn1);
auto forward_fn = std::move(forward_fn1);
tracing::trace_state_ptr trace_state_ptr;
if (trace_info) {
const tracing::trace_info& tr_info = *trace_info;
trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(tr_info);
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "Message received from /{}", src_addr.addr);
}
auto trace_done = defer([&] {
tracing::trace(trace_state_ptr, "Mutation handling is done");
});
storage_proxy::clock_type::time_point timeout;
if (!t) {
auto timeout_in_ms = _sp._db.local().get_config().write_request_timeout_in_ms();
timeout = clock_type::now() + std::chrono::milliseconds(timeout_in_ms);
} else {
timeout = *t;
}
struct errors_info {
size_t count = 0;
replica::exception_variant local;
};
const auto& m = in;
shared_ptr<storage_proxy> p = _sp.shared_from_this();
errors_info errors;
++p->get_stats().received_mutations;
p->get_stats().forwarded_mutations += forward.size();
if (auto stale = _sp.apply_fence(fence, src_addr.addr)) {
errors.count += (forward.size() + 1);
errors.local = std::move(*stale);
} else {
co_await coroutine::all(
[&] () -> future<> {
try {
auto op = _sp.start_write();
// FIXME: get_schema_for_write() doesn't timeout
schema_ptr s = co_await get_schema_for_write(schema_version, netw::messaging_service::msg_addr{reply_to, shard}, timeout);
// Note: blocks due to execution_stage in replica::database::apply()
co_await apply_fn(p, trace_state_ptr, std::move(s), m, timeout, fence);
// We wait for send_mutation_done to complete, otherwise, if reply_to is busy, we will accumulate
// lots of unsent responses, which can OOM our shard.
//
// Usually we will return immediately, since this work only involves appending data to the connection
// send buffer.
auto f = co_await coroutine::as_future(send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr,
shard, response_id, p->get_view_update_backlog()));

I updated the description - this issue concerns also remote writes in the same capacity