whatyouhide/xandra

Cluster-aware retry strategy

Closed this issue · 13 comments

The following scenario is not possible with the current Xandra.RetryStrategy:

On a cluster with n nodes, we want to retry once if the current node fails, and we want to retry n-1 times on different nodes. Since we have 2 different retry_states (one for Xandra.Cluster level retry_state, one for Xandra connection pool level retry_state), it makes it impossible to pass the state between retries from cluster to the connection pool level.

Bonus would be of course, if the next retried node wouldn't be according to purely DCAwareRoundRobinPolicy in Xandra.Cluster state, but wouldn't select the node that it already had tried from the caller process for this specific query that is being retried.

Making RetryStrategy cluster aware is pretty straightforward. By calling RetryStrategy.run_with_retrying/2 with an additional parameter retry_level which can be either :cluster or :node for instance would solve the problem for instance. Question is, would there also be a use case for passing the entire retry_state of the Xandra.Cluster level retry strategy to the Xandra level retry strategy?

Making RetryStrategy be able to remember which nodes it already tried before is a bit more tricky. Currently, the retry_state resides in the client process (i.e. that calls Xandra.execute). So either we'd need to hold which nodes were already tried on the callers side in the retry_state and do sth. along Pool.checkout(pool, retry_state), or the retry_state lives somehow in Xandra.Cluster pool. Neither of them sounds like an easy approach.

I’m looking at the "retry policies" exposed by the Python driver, which are documented here. The interesting thing there is that the way a retry strategy signals how to retry is by returning a (retry_type, consistency) tuple, where retry_type can be RETRY, RETRY_NEXT_HOST, and so on. So, the retry strategy cannot really pick a specific new host in the cluster.

However, the retry strategy does get a list of alive replicas, which I guess it can use to decide whether to retry on the next host or not.

Our retry strategy is "more capable" on one hand, because it can return new options to pass to the query. So, we can change the consistency but also other things.

My gut feeling is that the right things to do here are:

  1. Bubble up the retry strategy at the cluster level. What I mean by this is that instead of a retry strategy calling Xandra.execute (and friends) again, we retry Xandra.Cluster.execute instead. This way, we can pass the options to the cluster.
  2. Once we do this, we can actually expose a :target_node option (type Xandra.Cluster.Host.t/0) that you can pass to Xandra.Cluster.<execute|prepare|...>. This can be useful in general, and retry strategies can use it to actually pick a node.
  3. We can add a :retry_next_host return value too to c:Xandra.RetryStrategy.retry/3, but I’m not sure we need this with the :target_node option. We can maybe store the tried nodes in retry strategy's state, and that could be enough.

My general approach to this is that if we're doing this, we need to do this Good™ and have fully-fledged support for clusters. 🙃

Thoughts?

I think it makes a lot of sense to have the retry logic on cluster level.

The part that I have questions about is how we're going to pass these options. Currently, the caller process that calls Xandra.Cluster.execute executes the RetryStrategy code, and can only make decisions about how to retry by solely looking into what Xandra.Cluster.execute returns. We would need to call Xandra.Cluster to get node information before executing the query (to make the second point work).

On point 3, you mention that we could store the tried nodes information in the retry strategy's state, which means the caller's state (since RetryStrategy isn't a process). Do we return the info about which node was executed in Xandra.Cluster.execute again together with all of the nodes available, so that we can make an informed decision about where to call next?

Alternative would be to call Xandra.execute from Xandra.Cluster.Pool process, which I think would be wrong in many levels, since we wouldn't want to create a bottleneck and not hold all of the retry info in the Xandra.Cluster.Pools state.

My proposal would be the following interaction:

  1. Caller calls Xandra.Cluster.execute with the retry_strategy of choice.
  2. Xandra.Cluster.Pool.checkout returns a {pid, host} tuple where host is of Xandra.Cluster.Host.t() type. We store it in the current context until we have a reply.
  3. We call Xandra.execute from callers process.
  4. On error, Xandra.execute returns {:error, reason}
  5. If retry_strategy is available,RetryStrategy.new is invoked with conn, host and the options that we already can pass.
  6. From within RetryStrategy.new we can call a Xandra.Cluster.Pool.load_balancing_state function that returns the current load balancing state. With this information, the caller can implement a retry strategy based on the snapshot of the load balancing state. We wouldn't be altering the actual load balancing state in Xandra.Cluster.Pool's state, but could inform us about which hosts are up, what is the local DC etc. to pick a node to call next.

Ideas?

More detailed version of how this could look like:

sequenceDiagram
    actor C as Caller
    participant P as Xandra.Cluster.Pool
    participant D as Xandra
    C->>+P: Xandra.Cluster.Pool.checkout(conn)
    P-->>-C: {db_conn_pool, host}
    C->>+D: Xandra.execute(db_conn_pool)
    Note over C,D: All of the back and forth with DBConnection happens and it fails
    D-->>C: {:error, reason}
    C->>+P: Xandra.Cluster.Pool.load_balancing_state()
    P-->>-C: load_balancing_mod.state.t()
    loop Until RetryStrategy gives up
        C->>+P: Xandra.Cluster.Pool.checkout(conn, host)
        P-->>-C: {db_conn_pool, host}
        C->>+D: Xandra.execute(db_conn_pool)
        Note over C,D: All of the back and forth with DBConnection happens and it fails
        D-->>C: {:error, reason}
    end
Loading

Do we return the info about which node was executed in Xandra.Cluster.execute again together with all of the nodes available, so that we can make an informed decision about where to call next?

Yes, something like this. Consider that Xandra.Cluster.execute/4 (and other Xandra.Cluster functions) all call Xandra.Cluster.Pool.checkout/1, which returns one connection. However, that's private API that we can change: we could have checkout/1 return a list of pool PIDs together with their node, so that the retry strategy has all the info it needs to make an informed decision.

Alternative would be to call Xandra.execute from Xandra.Cluster.Pool process

We can't do this for performance reasons, no.

If retry_strategy is available, RetryStrategy.new is invoked with conn, host and the options that we already can pass.

This is a new callback that I don't think we need 🙃 That's what I was trying to avoid when I mentioned we could shove this information in the options somehow.

From within RetryStrategy.new we can call a Xandra.Cluster.Pool.load_balancing_state function

If we return this info from the checkout/1 call I mentioned, we don't need to call this, right?

If we return this info from the checkout/1 call I mentioned, we don't need to call this, right?

yes that is correct. My thinking about that was that we only need the load_balancing info if the query fails, which shouldn't be the case in a healthy cluster most of the times. I thought maybe we could save some copying, by explicitly asking the Xandra.Cluster.Pool for the load_balancing_state only if the query fails.

So if I'm understanding you correctly, this would be the interaction between the processes:

sequenceDiagram
    actor C as Caller Proccess
    participant P as Xandra.Cluster.Pool
    participant D as Xandra
    C->>+P: Xandra.Cluster.Pool.checkout(conn)
    P-->>-C: {db_conn_pool, current_host, load_balancing_state}
    C->>+D: Xandra.execute(db_conn_pool)
    Note over C,D: All of the back and forth with DBConnection happens and it fails
    D-->>C: {:error, reason}
    loop Until RetryStrategy gives up
        C->>+P: Xandra.Cluster.Pool.checkout(conn, host)
        P-->>-C: {db_conn_pool, current_host, load_balancing_state}
        C->>+D: Xandra.execute(db_conn_pool)
        Note over C,D: All of the back and forth with DBConnection happens and it fails
        D-->>C: {:error, reason}
    end
Loading

That's almost what I had in mind. The difference is in checkout/1's spec:

@spec checkout(cluster_pid :: pid()) ::
        {:ok, [host_with_pool]} | {:error, :empty}
        when host_with_pool: {Host.t(), pid()}

If we do this, then we don't have to surface the load_balancing_state and we don't have to do multiple calls to Xandra.Cluster.Pool.checkout/2.

My thinking about that was that we only need the load_balancing info if the query fails, which shouldn't be the case in a healthy cluster most of the times. I thought maybe we could save some copying

Yes, I hear this concern, but I don't think it's an issue for now. Eventually, I want pool checkout to not go through the cluster altogether, which might require changes to Xandra.Cluster.LoadBalancingPolicy. For example, it'd be awesome to be able to choose pools out of a registry or an ETS table, you know 🙃

Does this make sense?

If we do this, then we don't have to surface the load_balancing_state and we don't have to do multiple calls to Xandra.Cluster.Pool.checkout/2.

I see one particular problem with this approach, namely, what happens when say we retrieved a list of nodes from the Xandra.Cluster that were :connected in that point of time? Now, we tried to query first node in that list, we got a timeout after a couple of seconds. While we were doing so, the second node got disconnected and the Xandra (DBConnectionPool) got shut down. Now, we want to retry and we are trying to call a non-existing pid, we get an exit on caller process level (if I'm not mistaken). Caller process gets shut down.

I like the registry/ETS table approach. But I guess that's out of the scope for this issue.

I like the registry/ETS table approach. But I guess that's out of the scope for this issue.

Yeah, out of scope.

Now, we want to retry and we are trying to call a non-existing pid, we get an exit on caller process level (if I'm not mistaken). Caller process gets shut down.

Yeah sure, this can happen but it's not a big deal IMO. If we do get exits, we'll have to fix that yeah, but otherwise it's a small race condition that doesn't really bring much damage. You'll always have a race condition anyways, because even if you ask the cluster for alive nodes, they could go down between the time the cluster gives them to you and the time you reach out to them. Does that make sense?

You'll always have a race condition anyways, because even if you ask the cluster for alive nodes, they could go down between the time the cluster gives them to you and the time you reach out to them.

True, we'll always have that race condition. But the time intervals we're speaking about here is milliseconds, whereas for the case that we're introducing, we're talking about >10 seconds. So, if there's a network issue > 15s while reaching cassandra nodes, and we're doing multiple 100s of insertions per second, we're almost guaranteed to have EXITs.

If we do get exits, we'll have to fix that yeah

I have no objections to this approach as long as we handle EXITs, so that we can try on another node when that happens. Or at least make it optional to catch EXITs.

But the time intervals we're speaking about here is milliseconds, whereas for the case that we're introducing, we're talking about >10 seconds. So, if there's a network issue > 15s while reaching cassandra nodes, and we're doing multiple 100s of insertions per second, we're almost guaranteed to have EXITs.

Sure, but we can easily handle those EXITs (with try/catch). Plus, if we wanna move to an ETS/Registry-based pooling strategy, we'll have to do that anyways. So, for now let's go with what I proposed so that we don't have to talk back and forth to the Xandra.Cluster.Pool process 🙃

Alright, so I'm gonna start implementing the following then:

sequenceDiagram
    actor C as Caller Proccess
    participant P as Xandra.Cluster.Pool
    participant D as Xandra
    C->>+P: Xandra.Cluster.Pool.checkout(conn)
    Note over P, C: Cluster returns list of :connected hosts ordered by load balancing policy
    P-->>-C: {:ok, [{host, pool}]}
    loop Until RetryStrategy gives up
        C->>+D: Xandra.execute(pool)
        Note over C,D: All of the back and forth with DBConnection happens and it fails
        D-->>C: {:error, reason}
    end
Loading

@harunzengin yes that's it! 👍

Closed by #335.