Derecho-Project/derecho

Test `persistent_bw_test` non-deterministically throws std::future_error with many messages (>10,000).

Closed this issue · 45 comments

persistent_bw_test will crash with a std::future_error exception non-deterministically. The error is thrown at line 875 in rpc_utils.hpp.

How to reproduce:

  1. build master commit 503d097 in Release or RelWithDebInfo mode.
  2. use two nodes with the configurations in the appendix (please change the IP, port, and, RDMA devices accordingly)
  3. run rm -rf .plog;persistent_bw_test all 2 20000 on both nodes multiple times. One of the nodes may crash with the following exception:
root@weijia-test-216:~/workspace/derecho/build-RelWithDebInfo/src/applications/tests/performance_tests# rm -rf .plog;./persistent_bw_test all 2 100000
[15:37:39.672996] [derecho_debug] [Thread 7805] [info] Derecho library running version 2.1.0 + 132 commits
[15:37:39.675244] [derecho_debug] [Thread 7805] [info] 593ffe2b6f2721743c5d70377c8357ad60762af8da15702910c4b7e5e26ed627-0-0-0-2a354779021552940aa9afc3fcb3f87c41c84bacd6dd01f1e0066bf8aa78356e-0:new header initialized.
Finished constructing/joining Group                                               
my rank is: 0, and I'm sending: true           
terminate called after throwing an instance of 'std::future_error'
  what():  std::future_error: Promise already satisfied
Aborted (core dumped)         

Many thanks to @ellerre Lorenzo for originally reporting this issue.

Appendix.
The configuration file I used to reproduce this issue.

[DERECHO]
# leader ip - the leader's ip address
leader_ip = 192.168.9.216
# leader gms port - the leader's gms port
leader_gms_port = 33580
# External client
leader_external_port = 42645
# list of leaders to contact during a restart in priority order
restart_leaders = 192.168.9.216
# list of GMS ports of the restart leaders, in the same order
restart_leader_ports = 33580
# my local id - each node should have a different id
local_id = 0
# my local ip address
local_ip = 192.168.9.216
# derecho gms port
gms_port = 33580
# derecho state transfer port
state_transfer_port = 38366
# sst tcp port
sst_port = 47683
# rdmc tcp port
rdmc_port = 41675
# external client port
external_port = 42645
# this is the frequency of the failure detector thread.
# It is best to leave this to 1 ms for RDMA. If it is too high,
# you run the risk of overflowing the queue of outstanding sends.
heartbeat_ms = 1
# sst poll completion queue timeout in millisecond
sst_poll_cq_timeout_ms = 100
# This is the maximum time a restart leader will wait for other nodes to restart
# before proceeding with the restart if it has a quorum; it's a "grace period"
# that allows more nodes to be included in the restart quorum at the cost of
# taking longer to restart.
restart_timeout_ms = 2000
# This setting controls the experimental "backup restart leaders" feature. If
# false, only the first leader in the restart_leaders list will be contacted
# during a restart (the rest are ignored), and the group will fail to restart
# if this leader crashes. If true (enabled), restarting nodes will try
# contacting the backup leaders in order once they detect that the first restart
# leader has failed. The default is false since failure detection during restart
# is unreliable and may cause a slow restart leader to be treated as failed.
enable_backup_restart_leaders = false
# disable partitioning safety
# By disabling this feature, the derecho is allowed to run when active
# members cannot form a majority. Please be aware of the 'split-brain'
# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your
# application is fine with it.
# To help the user play with derecho at beginning, we disabled the
# partitioning safety. We suggest to set it to false for serious deployment
disable_partitioning_safety = false
# maximum payload size for P2P requests
max_p2p_request_payload_size = 10240
# maximum payload size for P2P replies
max_p2p_reply_payload_size = 10240
# window size for P2P requests and replies
p2p_window_size = 300 // I tried to change window size, but the same errors occurred.

# Subgroup configurations
# - The default subgroup settings
[SUBGROUP/DEFAULT]
# maximum payload size
# Any message with size large than this has to be broken
# down to multiple messages.
# Large message consumes memory space because the memory buffers
# have to be pre-allocated.
max_payload_size = 10240 // I tried to change message size, but the same errors occurred.
# maximum smc (SST's small message multicast) payload size
# If the message size is smaller or equal to this size,
# it will be sent using SST multicast, otherwise it will
# try RDMC if the message size is smaller than max_payload_size.
max_smc_payload_size = 10240
# block size depends on your max_payload_size.
# It is only relevant if you are every going to send a message using RDMC.
# In that case, it should be set to the same value as the max_payload_size,
# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine).
block_size = 1048576
# message window size
# the length of the message pipeline
window_size = 100 // I tried to change window size, but the same errors occurred.
# receiver-side batch size
# the size of the batch for every receiver predicate
batch_size = 8
# the send algorithm for RDMC. Other options are
# chain_send, sequential_send, tree_send
rdmc_send_algorithm = binomial_send
# - SAMPLE for large message settings
[SUBGROUP/LARGE]
max_payload_size = 102400
max_smc_payload_size = 102400
block_size = 1048576
window_size = 3
rdmc_send_algorithm = binomial_send
# - SAMPLE for small message settings
[SUBGROUP/SMALL]
max_payload_size = 100
max_smc_payload_size = 100
block_size = 1048576
window_size = 50
rdmc_send_algorithm = binomial_send

# RDMA section contains configurations of the following
# - which RDMA device to use
# - device configurations
[RDMA]
# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|sockets|udp|usnic|verbs
# possible options(only 'sockets' and 'verbs' providers are tested so far):
# bgq   - The Blue Gene/Q Fabric Provider
# gni   - The GNI Fabric Provider (Cray XC (TM) systems)
# mlx   - The MLX Fabric Provider (UCX library)
# netdir  - The Network Direct Fabric Provider (Microsoft Network Direct SPI)
# psm   - The PSM Fabric Provider
# psm2  - The PSM2 Fabric Provider
# rxd   - The RxD (RDM over DGRAM) Utility Provider
# rxm   - The RxM (RDM over MSG) Utility Provider
# shm   - The SHM Fabric Provider
# sockets - The Sockets Fabric Provider (TCP)
# udp   - The UDP Fabric Provider
# usnic   - The usNIC Fabric Provider (Cisco VIC)
# verbs   - The Verbs Fabric Provider
provider = verbs

# 2. domain
# For sockets provider, domain is the NIC name (ifconfig | grep -v -e "^ ")
# For verbs provider, domain is the device name (ibv_devices)
domain = mlx5_33

# 3. tx_depth
# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object.
# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html
tx_depth = 600

# 4. rx_depth:
# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object.
# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html
rx_depth =  600

# Persistent configurations
[PERS]
# persistent directory for file system-based logfile.
file_path = .plog
ramdisk_path = /dev/shm/volatile_t
# Reset persistent data
# CAUTION: "reset = true" removes existing persisted data!!!
reset = true
# Max number of the log entries in each persistent<T>, default to 1048576
max_log_entry = 1048576
# Max data size in bytes for each persistent<T>, default to 512GB
max_data_size = 549755813888

# Logger configurations
[LOGGER]
# default log name
default_log_name = derecho_debug
# default log level
# Available options:
# trace,debug,info,warn,error,critical,off
default_log_level = info

I think I figured out what's going wrong here. The std::future_error means that set_local_persistence() was called more than once on the same PendingResults object by RPCManager::notify_persistence_finished. This should in principle should never happen because notify_persistence_finished removes each PendingResults object from the map results_awaiting_local_persistence as soon as it calls set_local_persistence(), and does this while holding a lock on pending_results_mutex, as you can see here:

void RPCManager::notify_persistence_finished(subgroup_id_t subgroup_id, persistent::version_t version) {
dbg_default_trace("RPCManager: Got a local persistence callback for version {}", version);
std::lock_guard<std::mutex> lock(pending_results_mutex);
//PendingResults in each per-subgroup map are ordered by version number, so all entries before
//the argument version number have been persisted and need to be notified
for(auto pending_results_iter = results_awaiting_local_persistence[subgroup_id].begin();
pending_results_iter != results_awaiting_local_persistence[subgroup_id].upper_bound(version);) {
dbg_default_trace("RPCManager: Setting local persistence on version {}", pending_results_iter->first);
pending_results_iter->second.get().set_local_persistence();
//Move the PendingResults reference to results_awaiting_global_persistence, with the same key
results_awaiting_global_persistence[subgroup_id].emplace(*pending_results_iter);
pending_results_iter = results_awaiting_local_persistence[subgroup_id].erase(pending_results_iter);
}
}

However, I tried inserting a line before the for loop (at line 300) that prints the current size of results_awaiting_local_persistence[subgroup_id], as well as its lowest and highest key (version number), before iterating over it. I found that the std::future_error is always thrown soon after a call to notify_persistence_finished in which the size of results_awaiting_local_persistence was greater than about 4000. Specifically, if the size of the map is greater than 4096, the next call to notify_persistence_finished will crash.

The problem is that RPCManager doesn't actually own the PendingResults objects that it's calling set_local_persistence() on; they are stored in results_awaiting_local_persistence by reference. The PendingResults objects are actually stored in RemoteInvoker, in an array of size 4096:

#define MAX_CONCURRENT_RPCS_PER_INVOKER (4096)
PendingResults<Ret> results_vector[MAX_CONCURRENT_RPCS_PER_INVOKER];
std::atomic<unsigned short> invocation_id_sequencer;

The send function hands out a reference to one of these PendingResults objects, and that reference is what RPCManager endds up storing. Once the invocation IDs wrap around 4096, however, the send function starts re-using older PendingResults objects after calling a reset() function.

results_vector[invocation_id].reset();
PendingResults<Ret>& pending_results = results_vector[invocation_id];
dbg_default_trace("Ready to send an RPC call message with invocation ID {}", invocation_id);
return send_return{size, serialized_args, pending_results.get_future(),
pending_results};

So when the number of PendingResults references stored in results_awaiting_local_persistence is greater than 4096, it means some of those references are actually duplicate references to the same PendingResults object, associated with two different invocation IDs.

Thanks, Edward! MAX_CONCURRENT_RPCS_PER_INVOKER macro limits the maximum number of PendingResults objects. 4096 should be enough (the objects have not been checked by the user) for non-persistent subgroups (or for persistent subgroups without introducing the persistence future into PendingResults.) But when the pending number of versions waiting for persistence piles up, 4096 is not enough. A quick workaround is to make this number bigger enough for evaluation.

A thorough fix is to keep track of the pending result objects. Currently, we don't have a mechanism for a client to report if it has finished with the QueryResults object. I mentioned that a long time ago but it was with lower priority. Since we hit it again, should we design a mechanism to fix that? For example, introducing QueryResults::release() to return it to Derecho?

I think the right answer here is to take advantage of RAII on the user side. The user is returned a future; for as long as that future exists, the client may still be interested in that pending_results. We could tie the destruction of that future to our cleanup by returning a struct which extends future, overloading its destructor to, in addition to cleaning up its future state, also clear out its entry in the pending results table.

Actually, I'm pretty sure we can use the built-in properties of std::promise and std::future to decouple the lifetime of QueryResults (which is what the client gets) from PendingResults (which is what we store in results_vector) without overriding any destructors. According to the definition of std::promise, when it destructs it will release its reference to the state it shares with its corresponding std::future, but the state itself will not be released until the std::future also releases it. This means we can delete a PendingResults object at the point when Derecho is done with it -- namely, when all the replies have been stored in their futures and all of the persistence-event futures have been triggered -- and its corresponding QueryResults will still be able to retrieve results from those futures as long as it's still in scope in the client's code.

We will still need to change the PendingResults array to a vector, in case the client starts more than 4096 RPCs before the earliest RPC has finished persisting (which could happen if persistence is slow). However, we will be able to delete PendingResults objects from that vector once RPCManager is finished with them (i.e. they have reached the completed_pending_results map in our current design). The easiest way to automatically clean up the PendingResults objects when Derecho is done with them is probably to store them on the heap and give both RemoteInvoker and RPCManager shared_ptrs to them; RemoteInvoker can delete its pointer from results_vector when it has delivered the reply for that RPC, and RPCManager can delete its pointer once it has sent all the persistence notifications.

Exactly, I was wondering if the destructor of QueryResults can do something. Since the QueryResult is the non-copyable future end of a promise (a PendingResults object in our vector buffer), it can hold a reference to the promise. QueryResults::~QueryResults() should label the promise as discarded. Now the question is how to reclaim those discarded promises.

There are two cases:

  1. A promise is discarded after it is fulfilled;
  2. A promise is discarded but not fulfilled yet because the client loses interest early, but the p2p message thread will anyhow fulfill it sometimes later.
    Due to case 2, the QueryResults destructor should not be the one who reclaims the promise object.

Here is my fixing proposal:
We keep the promises in a std::queue. The RPC caller thread should do some housekeeping work on an RPC call: it repeatedly pops the promise queue front until the front entry is neither fulfilled nor discarded. In such a case, we avoid promise buffer overflow. And it also avoids the memory leak issue.

What do you think?

Your destructor idea should work, and it's a good idea for reducing overhead: If the client lets its QueryResults object go out of scope, it's not interested in receiving any more replies or persistence notifications, so we can discard the corresponding PendingResults early.

However, while looking at the code just now, I remembered the reason we store the PendingResults objects in a fixed-size array in RemoteInvoker: Multiple threads can concurrently access the same RemoteInvoker (namely, the predicates thread for sending and the P2P thread for receiving replies) and we didn't want to use a lock to guard the PendingResults data structure. A fixed-size array (or vector) is the only structure that can safely allow two threads to concurrently access two different entries, because standard-library containers are generally not thread safe. It would be safe to concurrently push and pop from a queue, but we can't use a queue to store PendingResults in RemoteInvoker because we need random access to them (we need to look them up by invocation ID).

If we want to allow a variable number of PendingResults objects to be owned by RemoteInvoker, rather than a hard-coded fixed number, we will have to come up with a thread-safe variable-sized container for them.

A similar issue crops up with Edward's shared_ptr suggestion --- while it's clean, shared_ptr is by default thread safe and so actually emits an interlocked expression on copy or access. We'd have to be careful that this does not significantly impact performance.

A point in favor of shared_ptr also exists; however --- do we have an opportunity to just use a weak_ptr directly to the associated PendingResults object, instead of an index into some vector? This would entirely bypass the requirement to have a concurrent container.

If this code hasn't changed much from when I wrote it, the reason this is not done is that the thread processing incoming cooked requests must effectively reconstruct the PendingResults reference from the response message it receives; hence receiving an index and looking up the result in some collection. But there's still an opportunity here to do better; in particular, it may be safe to simply have distinct collections for the distinct threads which need access to PendingResults. I don't remember quite enough about the architecture to be sure of this, but the idea here would be to just share the PendingResults themselves by reference, and effectively replicate table which contains them.

Ah, this reminds me of an associated cleanup issue. Let's assume that the client has deleted their QueryResults object, indicating that it is no longer interested in a response. The thread which is processing incoming responses needs to be notified of this; its behavior is to simply receive a message, deserialize it, look up the PendingResults associated with the deserialized state, and mark it as fulfilled. We'd need to change that logic to include some way to mark the PendingResults as no longer interested in fulfillment, which is easy enough; probably what we'd do is just map the vector entry to nullptr or some other designated value. But we wouldn't be able to actually remove that entry until the response is really received; at that point, we would be free to mark the entry as reusable. There isn't a good answer to cleaning up the residual entry that results from a long-pending result, even if the client is no longer interested in it.

I like the idea of using a weak_ptr for performance. Or even just use a point as the invocation_id? The downside is a buggy node could crash health peers. Is there any boundary check mechanism possible?

A similar issue crops up with Edward's shared_ptr suggestion --- while it's clean, shared_ptr is by default thread safe and so actually emits an interlocked expression on copy or access. We'd have to be careful that this does not significantly impact performance.

A point in favor of shared_ptr also exists; however --- do we have an opportunity to just use a weak_ptr directly to the associated PendingResults object, instead of an index into some vector? This would entirely bypass the requirement to have a concurrent container.

If this code hasn't changed much from when I wrote it, the reason this is not done is that the thread processing incoming cooked requests must effectively reconstruct the PendingResults reference from the response message it receives; hence receiving an index and looking up the result in some collection. But there's still an opportunity here to do better; in particular, it may be safe to simply have distinct collections for the distinct threads which need access to PendingResults. I don't remember quite enough about the architecture to be sure of this, but the idea here would be to just share the PendingResults themselves by reference, and effectively replicate table which contains them.

So, a hybrid proposal. There are two shared concurrent queues: reusable-keys and newly-generated PendingResults.

The thread which is creating PendingResults objects consumes a key from the reusable-keys queue, and then associates it with the newly-generated PendingResults and pushed the pair of (key,PendingResults) on the PendingResults queue.

The thread which is waiting for responses maintains a private random-access collection associating keys with PendingResults, which is initially empty. Upon receipt of a response, it first checks the collection for the appropriate PendingResult, and if it is absent empties the PendingResults queue into the collection and checks again.

Once the actual response has been received, the associated key may be placed on the reusable-keys queue.

A similar issue crops up with Edward's shared_ptr suggestion --- while it's clean, shared_ptr is by default thread safe and so actually emits an interlocked expression on copy or access. We'd have to be careful that this does not significantly impact performance.

A point in favor of shared_ptr also exists; however --- do we have an opportunity to just use a weak_ptr directly to the associated PendingResults object, instead of an index into some vector? This would entirely bypass the requirement to have a concurrent container.

If this code hasn't changed much from when I wrote it, the reason this is not done is that the thread processing incoming cooked requests must effectively reconstruct the PendingResults reference from the response message it receives; hence receiving an index and looking up the result in some collection. But there's still an opportunity here to do better; in particular, it may be safe to simply have distinct collections for the distinct threads which need access to PendingResults. I don't remember quite enough about the architecture to be sure of this, but the idea here would be to just share the PendingResults themselves by reference, and effectively replicate table which contains them.

You're right about the reason we need to use indexes instead of pointers to retrieve PendingResults: When a node receives a response message (in the P2P thread), RemoteInvoker needs to find the PendingResults object that corresponds to that message using only the contents of the message. Thus, when RemoteInvoker sends a request it includes an invocation_id, which is an integer that can be used as an index into the PendingResults vector, and when the remote node responds to that request it copies the same invocation_id into the response message so that the sender can match the reply to the request.

I like the idea of using a weak_ptr for performance. Or even just use a point as the invocation_id? The downside is a buggy node could crash health peers. Is there any boundary check mechanism possible?

Surprisingly enough, I think this would work about as reliably as our current setup because we're using a plain array to store the PendingResults. Just to clarify, your idea is that the send method could simply take the address of a (new, unused) PendingResults and stick it in the request message as the "invocation ID." The receive_response method could then take that invocation ID, cast it to PendingResults*, and expect to find a PendingResults at that memory address, because the send method must have previously put one there. This is certainly dangerous because a slightly garbled message would cause the response-receiving node to jump into a random memory location and crash. However, our current implementation has a similar risk, because receive_response uses the invocation ID to blindly index a plain C array: if the message is slightly garbled and the invocation ID is outside the array bounds, the node will jump into a random memory location and crash.

So, a hybrid proposal. There are two shared concurrent queues: reusable-keys and newly-generated PendingResults.

The thread which is creating PendingResults objects consumes a key from the reusable-keys queue, and then associates it with the newly-generated PendingResults and pushed the pair of (key,PendingResults) on the PendingResults queue.

The thread which is waiting for responses maintains a private random-access collection associating keys with PendingResults, which is initially empty. Upon receipt of a response, it first checks the collection for the appropriate PendingResult, and if it is absent empties the PendingResults queue into the collection and checks again.

Once the actual response has been received, the associated key may be placed on the reusable-keys queue.

I'm pretty sure I understand this design, but it won't quite work because there are actually three different threads that need to access the PendingResults objects after they are created: the predicates thread, the P2P handler thread, and the persistence thread. Obviously the P2P handler thread needs to access PendingResults objects to deliver a response when it receives a reply message. The SST predicates thread calls RPCManager's rpc_message_handler when it delivers an ordered_send that contains an RPC request message, but if this message is actually a self-receive of a request sent by the local node, rpc_message_handler will need to access the PendingResults array to call fulfill_map on the PendingResults associated with this request. Finally, PendingResults objects are also used to fulfill the "persistence finished" futures, so the persistence thread needs to access them (through RPCManager's notify_persistence_finished).

Currently, only the P2P handler thread uses the invocation ID and the PendingResults array to access PendingResults objects; the predicates thread and the persistence thread use PendingResults& references stored in RPCManager to access them. This only works because the PendingResults objects stay in a fixed memory location and are never deleted. I don't think it's compatible with a design where PendingResults objects move into a "private collection" owned by the P2P handler thread once the first response is received.

That was comprehensive! I think I'm actually liking the design which uses a literal pointer instead of the vector index, since it is (as you point out) only about as risky as our current approach. Am I right in thinking this would eliminate the need for a collection in the first place?

Sorry for being a bit slow on the uptake here, but why do we need a collection at all if we're just going to use raw pointers? Who is left to index it? If it's just about the memory management, then whatever would have emptied that bit of the collection could just delete the PendingResults directly instead, right?

I understand the urge to never implement your own memory management, and am usually a big proponent of it! But right now our memory management is going via the collection, which winds up looking like "leak for a while, then optimistically reuse" and that's ... not a great approach.

If we want to go with both "automatic memory management" and "use a raw pointer value", then what we could do is put the PendingResults in a heap-allocated weak_ptr whose corresponding shared_ptr is owned by the associated QueryResults object. We use the literal address of that weak_ptr as our "index", and store another weak_ptr in the place persistence needs to look. Whenever message receipt or persistence fulfillment wants to use their weak_ptr, they call lock() to convert it into a shared_ptr, and if it's null can just proceed to skip fulfillment entirely (as this happens exactly when the client's QueryResults have gone out of scope). The receiver needs to remember that its weak_ptr is dynamically allocated and handle that accordingly, of course

No collection, and only a tiny bit of manual memory management. The cost of screwing up? We leak a pointer; there are worse sins.

Since this was probably pretty hard to follow, consider this sample code:

struct QueryResults{
   shared_ptr<PendingResults> pr_lifetime_lock;
};

{///in a scope somewhere....
QueryResults qr = ....; //create them as usual
//....
char outgoing_msg[/*something*/];
//...
*((weak_ptr<PendingResults>**) (outgoing_msg + i)) = new weak_ptr<PendingResults>{qr.pr_lifetime_lock};
//...
}

{ // at the receiver
//...
std::unique_ptr<weak_ptr<PendingResults>> maybe_results{*(weak_ptr<PendingResults>** (incoming_msg + i))};
shared_ptr<PendingResults> sp_results = maybe_results->lock();
if (sp_results){
   PendingResults &results = *sp_results;
   //use the results as desired; all clean-up is automatic. 
}
}

Updated the last comment on github because I'm pretty sure it needed accompanying code in order to make sense. You might need to actually click the "view it on github" link to see the updates, I'm not clear on how github mail reacts to edits. Note also that the code is buggy as hell, but hopefully conveys the idea. (To the kids watching at home, don't cast literal characters to pointers! Pointers are not the same size as characters! Use our serialization framework instead of pretending messages are arrays! etc!)

@mpmilano, I think I get what you're suggesting, but note that that the way we create QueryResults right now is to ask the corresponding PendingResults to construct one, with pending_results.get_future() -- the PendingResults has to exist first. So we might have to create the PendingResults with make_shared, then call get_future() on it to make a QueryResults with an empty pointer, then put the shared_ptr<PendingResults> into the QueryResults.

Regarding memory management, right now we actually never delete a PendingResults, we just allocate 4096 of them when the RemoteInvoker is created and then re-use them when the invocation_id gets to the end of the array. (That was the cause of the bug we were originally talking about: The invocation_id wrapped around before we were done with the oldest PendingResults, and we reused a PendingResults that was still in use). My idea to use a shared_ptr that is shared between RemoteInvoker and RPCManager also would have introduced memory management by automatically deleting the PendingResults when both RemoteInvoker and RPCManager were done with it (indicated by dropping their shared_ptr out of a collection). However, that setup would not have accounted for the situation where the user thread discards their QueryResults before RPCManager is done with its corresponding PendingResults; it would have kept the PendingResults alive until RPCManager was done with it regardless of what the user did.

Our current design also returns the QueryResults object to the user by std::moving it as a value, so if we want QueryResults to own the "controlling" shared_ptr to the PendingResults, we might want to return it as a unique_ptr instead. (Do weak_ptrs get invalidated if their corresponding shared_ptr moves around in memory?)

Here's a link to the send method that shows how PendingResults and QueryResults are normally created:

send_return send(const std::function<char*(int)>& out_alloc,
const std::decay_t<Args>&... remote_args) {
// auto invocation_id = mutils::long_rand();
std::size_t invocation_id = invocation_id_sequencer++;
invocation_id %= MAX_CONCURRENT_RPCS_PER_INVOKER;
std::size_t size = mutils::bytes_size(invocation_id);
size += (mutils::bytes_size(remote_args) + ... + 0);
char* serialized_args = out_alloc(size);
{
auto v = serialized_args + mutils::to_bytes(invocation_id, serialized_args);
auto check_size = mutils::bytes_size(invocation_id) + serialize_all(v, remote_args...);
assert_always(check_size == size);
}
// lock_t l{map_lock};
results_vector[invocation_id].reset();
PendingResults<Ret>& pending_results = results_vector[invocation_id];
dbg_default_trace("Ready to send an RPC call message with invocation ID {}", invocation_id);
return send_return{size, serialized_args, pending_results.get_future(),
pending_results};
}

Ok, that all makes sense to me! Also std::moveing a shared_ptr shouldn't break anything with the corresponding weak_ptrs; the address of the shared_ptr doesn't matter at all to the weak_ptrs, the only requirement is that the object to which it points does not move. We need to heap-allocate the weak_ptr that the receiver-thread will retrieve from its response message because there we actually do care about the address of the weak pointer being stable.

Now the whole process looks very clear to me! With this, the PendingResults object is released at the right time: when the corresponding QueryResults object disappears.

A trivial question: In rare cases, node failure, for example, the receiver thread might not get a reply with a valid invocation id. Will we lose track of that sizeof(std::weak_ptr) memory?

In the most simple version of this code we'd leak a weak_ptr on node failure -- which we can afford to do a lot of times before it's really going to be a problem. I think it'll be up to us whether leaking a couple words per node failure is within our memory overhead budget; I will say that it seems unlikely a node will outlive thousands of its peers' failures, and we had previously argued that keeping an extra few thousand of these around in a fixed-size vector was very affordable.

During a meeting with Weijia today, I realized that the dangerous operation of "blindly dereference a pointer from a message and hope that there's a std::weak_ptr<PendingResults> at that address" can be made safer with placement new. Each RemoteInvoker can allocate a large fixed memory region for PendingResults pointers (very similar to our current fixed array of PendingResults), and create the std::weak_ptr<PendingResults> with placement new in this memory region. Then, when we retrieve a std::weak_ptr<PendingResults>* from a response message, we can check that its address is within the correct memory region (with pointer arithmetic) before attempting to dereference it. Since pointers are even smaller than PendingResults objects, we can make this region large enough that we will never need to expand it even if we occasionally leak a std::weak_ptr<PendingResults>*.

In fact you can even do more than that --- since weak_ptr is usually implemented as a fixed-size type (at least with non-primitive Ts) and you know the alignment information, you can actually guarantee that it's a valid object in that region by comparing it with the bump pointer you're using for allocation tracking.

I'm still going to try and throw a small bucket of cold water on the idea, though. The trouble is PendingResult isn't actually guaranteeing any particular type for T, and you're (so far) planning to use one arena that can refer to any possible T. So if you wind up getting a bad index into that arena, rather than segfaulting (as you likely would on a bad address), you'll just get a valid pointer to a different type and hopefully segfault [potentially much] later when trying to use the contained type on the client side. You'll also need to remember to actually call the destructors of these placement-allocated weak_ptrs, as the associated control blocks are dynamically allocated and cannot be cleaned up otherwise. Plus, you have to track free slots in this arena so that our reuse logic doesn't just fall back to the old "assume we can just wrap" reasoning that got us into this mess in the first place.

I have a proposal that is very spiritually similar, much more complicated, and does not suffer from these problems as much.

If we're thinking of making this easier to debug, you could take this idea and guarantee that the received pointer really does correspond to a real weak_ptr, which would in turn mean that getting the "wrong pointer" would at worst mean that you see a null weak_ptr when you shouldn't have. To do this, you could:

  • create a thread_local static-allocated std::array<weak_ptr<PendingResults<T>>> at the point where those structs are generated.
  • Have a static thread_local bitmap associated with this std::array which indicates which slots are in use
  • Have the code which initializes this array also stick a reference to it into a global map from typeID to pointers to both the array and the bitmap. We'll use this map to find the arena in which our received type was allocated, allowing debug code to deduce if the received pointer really does come from the target arena and to mark the slot as free afterwards.

concretely:

//where the weak_ptr<PendingResults> are created
static thread_local std::array<std::weak_ptr<PendingResults<T>>, N> arena; //all init to nullptr
static thread_local std::bitmap<N> used_slots; //all init to 0
static const thread_local bool run_once = [&]{
  //so that we can have code that runs exactly once
  global_arena_map[typeid(T)] = std::make_pair(&arena,&used_slots);
  return true;
}();

weak_ptr<PendingResults<T>> wp = // however we get it.
auto slotindex = used_slots.get_min_zero();
used_slots[slotindex] = 1;
arena[slotindex] = std::move(wp);
weak_ptr<PendingResults<T>> &use_this = arena[slotindex];
//use use_this however you normally would

Later, on deserialize, something like:

weak_ptr<PendingResults<T>> *wp = ((weak_ptr<PendingResults<T>>**)(in_buf + offset))[0];

(static thread_local std::array<std::weak_ptr<PendingResults<T>>, N> *arena, 
static thread_local std::bitmap<N> *used_slots
) = global_arena_map.at(typeid(T));

assert(contains(*arena,wp));
auto slotindex = offset_of(*arena,wp);
assert(used_slots->at(slotindex) == 1);
weak_ptr<PendingResults<T>> use_this = *wp; 
wp->reset();
//now we no longer depend on the arena-allocated one
(*used_slots)[slotindex] = 0;

This is waaaay more complicated than just new weak_ptr on the creation end and deserializing directly into a unique_ptr<weak_ptr> on the receiving one, and as such there's potential for bugs within this code, which would be avoided by not having it. But if you want to be able to have a manually-managed, type-safe arena for debugging purposes, this is roughly what you should do IMO.

I'm still going to try and throw a small bucket of cold water on the idea, though. The trouble is PendingResult isn't actually guaranteeing any particular type for T, and you're (so far) planning to use one arena that can refer to any possible T. So if you wind up getting a bad index into that arena, rather than segfaulting (as you likely would on a bad address), you'll just get a valid pointer to a different type and hopefully segfault [potentially much] later when trying to use the contained type on the client side.

I haven't quite figured out how your proposed system works yet, but I'd like to make a correction here and ask if that changes anything. My idea was for each RemoteInvoker<Tag, std::function<Ret(Args...)> to allocate its own memory region for PendingResults pointers, in the same way that each RemoteInvoker currently allocates its own array of 4096 PendingResults objects. The arena allocated by RemoteInvoker<Tag, std::function<Ret(Args...)> would thus only contain instances of std::weak_ptr<PendingResults<Ret>>, not any other template specialization of PendingResults<T>. When receive_response for a particular RemoteInvoker instance retrieves a std::weak_ptr<PendingResults<Ret>>* from an incoming message, it can check if that pointer points to an address within this RemoteInvoker's arena, and if so, it can be assured that the address contains a std::weak_ptr<PendingResults<Ret>> that this RemoteInvoker previously created.

That's good. I forgot we had access to the specific RemoteInvoker on both creation and receipt of this weak pointer; no need to use a global map then, you can just put the arena into the invoker itself. Also no need to mess with static or thread-local storage durations.

But what happens when you run out of arena space? How are you planning to track which parts of the arena are holding live objects, and which are not? And, given that you need to track this anyway and the fact that null-initialized weak pointers have no extraneous memory impact, why not just use a std::array of these weak pointers and avoid risking alignment errors or bad initialization/destruction logic in the arena?

I've implemented the "basic" version of this change, without placement new operators, on the branch fix_pending_results and run some tests on it. No new bugs seem to have appeared, and the future_error no longer shows up in persistent_bw_test. Note, however, that persistent_bw_test doesn't actually use the QueryResults objects from its ordered_sends, so the PendingResults objects are cleaned up almost right away (and I verified from inspecting the logs that RPCManager doesn't attempt to fulfill any persistence notifications for them when it receives its persistence callbacks). The simple_replicated_objects demo worked, though, so a more "normal" use of QueryResults appears to work fine.

I realized while testing persistent_bw_test, though, that we haven't considered how to clean up the weak_ptrs when calling a void function, like the change_pers_bytes function in this test. Right now, send will create a weak_ptr<PendingResults<void>> when sending the RPC call, but it will never be able to delete the weak_ptr because it will never receive a response. The actual PendingResults object still gets cleaned up when the user is done with the QueryResults (and QueryResults<void> is still useful for tracking persistence events), but the weak_ptr gets leaked in this case.

EDIT: The obvious solution to this problem is to just delete the weak_ptr right away if the return type is void, i.e. add this to the end of the send method:

if constexpr(std::is_same_v<Ret, void>) {
    delete results_heap_ptr;
}

I think this is safe, as long as we never expect to receive any kind of reply for a void RPC function, but on the other hand there's this comment in the receive_call implementation for void functions:

inline recv_ret receive_call(std::true_type const* const,
mutils::DeserializationManager* dsm,
const node_id_t&, const char* _recv_buf,
const std::function<char*(int)>&) {
//TODO: Need to catch exceptions here, and possibly send them back, since void functions can still throw exceptions!
auto recv_buf = _recv_buf + sizeof(void*);
mutils::deserialize_and_run(dsm, recv_buf, remote_invocable_function);
return recv_ret{reply_opcode, 0, nullptr};
}

If we ever get around to reporting exceptions for void functions, then the sender might get a response for a void RPC function after all, but in most cases it will not.

At some point in this design, one could still block on the void-specialized QueryResults to ensure execution was halted until the remote call completed. I take it we removed this feature?

If there is truly no chance of a void-returning RPC responding even with a simple "I'm done" bit, then the thing to do is just put all the code that manages creation of this weak_ptr into an if constexpr(std::is_void_v<T>) block.

To the best of my knowledge, there was never a time when a QueryResults<void> could be used to determine when a remote RPC completed; until I added the persistence-event futures, the only thing it could do was block until the RPC message was delivered locally (i.e. the reply map was fulfilled by RPCManager). The receive_response specialization for void functions in RemoteInvoker has always contained assert(false). I believe our reasoning was that the user shouldn't have to pay for anything they don't need, and so functions that return void should not waste message buffer slots sending an empty reply. In all our test cases, if we want the remote node to send a reply when the RPC function has completed, we change the return type of the RPC function to bool and return true at the end (in other words, explicitly send the "I'm done" bit).

On the other hand, as shown by that comment in the receive_call function, we've always had the problem that there's no way for a caller to know that a void RPC function threw an exception, even though we promise to report errors for all other types of RPC function with remote_exception_occurred. Solving this problem would require adding a way for void functions to sometimes send a reply (just to report the exception) even though in the common case they don't send a reply. The new way of tracking PendingResults objects with weak_ptrs will make this problem even harder to solve if we ever want to solve it (since we can't know when it's safe to delete a weak_ptr<PendingResults<void>> for an RPC call that didn't throw an exception), but I guess for now we can leave it unsolved and never create heap-allocated weak_ptrs for PendingResults<void>.

While writing up this change in 19dcdbe, I realized we have a slightly worse memory-leak problem than simply leaking a std::weak_ptr<PendingResults<Ret>>* when a node fails: If the user thread regularly discards the QueryResults object before every reply has arrived, we will leak a std::weak_ptr<PendingResults<Ret>>* on every RPC call. This is because we need to keep the RemoteInvoker's weak_ptr on the heap until all the replies for that RPC message have arrived, but the way we keep track of how many replies have arrived is inside the PendingResults object itself. Thus, once the PendingResults/QueryResults pair is deleted, there's no way of knowing whether a reply processed by receive_response is the last reply for that message, and we can't safely delete the std::weak_ptr<PendingResults<Ret>>* from the heap.

To help clarify, here is the part of receive_response that I'm talking about:

std::shared_ptr<PendingResults<Ret>> results = results_heap_ptr->lock();
if(results) {
if(is_exception) {
auto exception_info = mutils::from_bytes_noalloc<remote_exception_info>(nullptr, response + 1 + sizeof(results_heap_ptr));
dbg_default_trace("Received an exception from node {} in response to invocation ID {}", nid, fmt::ptr(results_heap_ptr));
rls_default_error("Received an exception from node {}. Exception message: {}", nid, exception_info->exception_what);
results->set_exception(nid, std::make_exception_ptr(
remote_exception_occurred{nid, exception_info->exception_name, exception_info->exception_what}));
} else {
dbg_default_trace("Received an RPC response for invocation ID {} from node {}", fmt::ptr(results_heap_ptr), nid);
results->set_value(nid, *mutils::from_bytes<Ret>(dsm, response + 1 + sizeof(results_heap_ptr)));
}
//If this was the last RPC reponse, the heap-allocated weak_ptr will not be used again
if(results->all_responded()) {
delete results_heap_ptr;
}

It's entirely possible, and much more common than a node failure, for a client to let its QueryResults go out of scope (and thus delete the PendingResults) before all the replies have arrived, since most clients only care about receiving one reply to an ordered_send -- all the other replies will be the same. A client might also submit several RPC requests in a batch, and then only wait for replies from the last one, since that would be sufficient to guarantee that all the previous RPCs had completed.

I think if we want to avoid leaking a std::weak_ptr<PendingResults<Ret>>* on every non-void RPC call, we'll have to ensure that the PendingResults object's lifetime lasts at least until all the replies are received (or some replies have been received and the other replicas have failed, which would also make results->all_responded() true). Alternatively, it might be possible to duplicate the tracking of responses in both PendingResults and RemoteInvoker, so that receive_response can decide on its own whether the pointer can be freed, but this would be difficult because right now there's no way to tell RemoteInvoker which nodes will be responding to a given RPC call (that's the whole reason we have PendingResults::fulfill_map()). Does anyone have a better idea?

Huh. Well, then I suppose we have a choice here. We could either use a shared_ptr on the reply processing instead of a weak_ptr, and then leak the whole shebang on node failure. Or we could refactor the logic a bit so that the response tracking metadata lived long enough to actually do the weak_ptr cleanup. I think just using a shared_ptr might be best here --- nodes don't fail all that often anyway --- and if we're worried about the leak, we can add timeout logic that resets the shared pointer if enough time has passed, which would revert to the leak overhead we originally anticipated for the weak_ptr solution.

I think I like Ken's suggestion, and what's especially nice here is that the "accept the leak for now" version of the code is literally just replacing the word "weak" with "shared" in a few places.

Thinking more about this, I'm wondering if something closer to Edward's original suggestion is actually more appropriate.

If I recall correctly, Futures and Promises need not be cleaned up in any particular sequence, and jointly own the memory associated with fulfilment. We've also discovered that the receiver side of the PendingResults puzzle may (often) need to outlive the sender side of that puzzle.

There are other places which currently maintain a reference to PendingResults. What can we say about their lifetimes? Is it the case that those references will never be used if the client has discarded their QueryResults, or if the receiver has finished its fulfillment?

If so, then we probably don't need to wrap things in complex smart pointers anymore, and can instead allow the controlling lifetime to own the PendingResults object directly. More generally, if we have an understanding of which of these references can be used based on when other references are dead, we can avoid just punting it all to an interlocked runtime refcount.

If I recall correctly, Futures and Promises need not be cleaned up in any particular sequence, and jointly own the memory associated with fulfilment.

This is correct. PendingResults and QueryResults can be deleted in any order, since they only contain Promises and Futures; as long as either "end" is live, the Promises and Futures will still be valid.

There are other places which currently maintain a reference to PendingResults. What can we say about their lifetimes? Is it the case that those references will never be used if the client has discarded their QueryResults, or if the receiver has finished its fulfillment?

The only other place that maintains a reference to PendingResults is RPCManager, although it is accessed by three different threads. The PendingResults references in RPCManager (which are actually weak_ptrs to PendingResults right now) are used for (1) fulfilling reply maps, (2) delivering node_removed_from_group_exceptions, and (3) fulfilling persistence-related futures. All of these actions will have no effect if the client has discarded their QueryResults, so RPCManager does not need to extend the lifetime of PendingResults objects, and can simply drop its references to PendingResults that have ceased to exist. The way it's currently implemented on the fix_pending_results branch, RPCManager deletes its weak_ptr to a PendingResults as soon as the lock() method fails to return a shared_ptr.

If so, then we probably don't need to wrap things in complex smart pointers anymore, and can instead allow the controlling lifetime to own the PendingResults object directly.

Because of the way receive_response is implemented, there are actually two "controlling lifetimes" for PendingResults objects: the lifetime of the client's QueryResults object, and the time until the last RPC reply has been handled by receive_response (or, equivalently, the last node that hasn't replied is marked failed). The PendingResults needs to live at least as long as receive_response will need it for, but whether it lives any longer than that depends on whether the client is still interested in the QueryResults object -- there's no point in fulfilling the persistence futures if the client doesn't care, but if the client does care, the PendingResults needs to stay alive so that RPCManager can fulfill them.

I think the fact that we have two different objects that need to extend the lifetime of PendingResults (RemoteInvoker and QueryResults), and they have no way of communicating with each other, the safest way to handle this is to use shared_ptrs. Converting RemoteInvoker's weak_ptr<PendingResults<Ref>> to a shared_ptr<PendingResults<Ref>> would accurately represent the shared lifetime of the PendingResults.

Yeah, I think you're right about this. I was briefly hopeful that we could do something clever, like separate out the reply map + promise from the rest of PendingResults and have these two managed only by the receive_response thread. If we did this, it would mean that the parts of the PendingResults which the RPCManager exposes would be fully controlled by the QueryResults lifetime, and the parts the receive_response managed would be fully controlled by receive_response. But now that I'm looking at it more I don't think this is likely worth it.

I think I found a fix for the "PendingResults leaks on node failure" problem, while talking about this with Weijia, and I implemented it in the fix_pending_results branch. The idea is to give the PendingResults object itself a copy of the shared_ptr<PendingResults>* that RemoteInvoker uses as an invocation ID, thus allowing PendingResults to "delete itself" (indirectly). This means that RPCManager, inside its view-change callback, can clean up any PendingResults objects associated with failed nodes without involving RemoteInvoker. I'm pretty confident it's safe for RPCManager to delete the shared_ptr<PendingResults>* at this point because RemoteInvoker will never need to use it again: any RPCs in progress at the time of a view change will either be completed (all their replies are delivered during cleanup) or aborted (the call message was trimmed, so no replies are coming).

Does anyone see a potential problem with this design?

This has been fixed in #211, which is now merged.