reactor/reactor-addons

Cache Stampeding

dave-fl opened this issue · 14 comments

With the current Cache Mono implementation is there anyway to avoid cache stampeding?

Additionally when the cache is updated by the writer, shouldn’t this value be passed on rather than using original value that is to be inserted from the supplier. This would allow for putIfAbsent.

Perhaps an additional lookup operation needs to be added which can also take a parameter to add a signal iff it does not exist in an atomic operation.

Not really, but I guess the technique of acquiring a lock, even though it sounds a bit orthogonal to the purpose of reactive programming, could be applied at the point where the MonoCache.lookup() call happens...

Do you have any pseudocode (or real code even) to illustrate what you mean with the additional operation and the enabling of putIfAbsent?

Example of a writer function where its output would feed the value that ends up being returned. Currently Mono.Runnable is used. I have switched this to Mono.Callable with the intent that the caller can re-use this signal rather than the original generated signal. This still doesn't solve the stampede issue.

private static <K, V> BiFunction<K, Signal<? extends V>, Mono<? super Signal<? extends V>>> writer(Map<K, ? super Signal<? extends V>> cache, Predicate<Signal<? extends V>> allowCacheInsert) {
	return (key, value) -> {
		if (allowCacheInsert.test(value)) {
			Object object = cache.putIfAbsent(key, value);
			return object != null ? Mono.just(object) : Mono.just(value);
		}
		return Mono.just(value);
	};
}

Caffeine has the ability to compute on a get iff the value does not exist. Could something like this be achieved without a block? Maybe a block is to be expected in this case.

Cache<String, Signal<? extends String>> cache2 = Caffeine.newBuilder().build();
Mono<String> test = Mono.just("test");
cache2.get("test", s -> test.materialize().block());

Thanks. For writers that support atomic operations in the like of putIfAbsent, that could indeed make sense. Maybe as you suggested as a second method on MonoCacheBuilderCacheWriter, with a default implementation of delegating to andWriteWith ?

@dave-fl if the Mono is not to be invoked unless the get fails, then there's no real alternative to blocking here. Otherwise you'd start from the Mono and map.

For the secondary write with, something like this, you end up guaranteeing that the first to enter is the value shared among all callers, but you still might stampede.

I guess the question is, does it make sense to use block in this case of the underlying Cache can guarantee that it will insert iff the value does not exist? Is there a way to make this generic.

    public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(
            Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key) {
        return otherSupplier -> writer -> Mono.defer(() ->
                reader.apply(key)
                        .switchIfEmpty(otherSupplier.get()
                                .materialize()
                                .flatMap(signal -> writer.apply(key, signal)
                                )
                        )
                        .dematerialize());
    }

    public interface MonoCacheBuilderCacheWriter<KEY, VALUE> {

        Mono<VALUE> andWriteWith(BiFunction<KEY, Signal<? extends VALUE>, Mono<Signal<? extends VALUE>>> writer);
    }

// Which allows for a writer like so

	private static <K, V> BiFunction<K, Signal<? extends V>, Mono<Signal<? extends V>>> specialwriter(Map<K, Signal<? extends V>> cache) {
		return (key, value) -> {
			Signal<? extends V> retSignal = cache.putIfAbsent(key, value);
			if (retSignal == null) {
				retSignal = value;
			}
			return Mono.just(retSignal);
		};
	}

Generic lookup and write, no stampede.

public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
	Map<KEY, Signal<? extends VALUE>> cacheMap, KEY key, Mono<VALUE> mono) {
	return Mono.defer(() -> Mono.just(cacheMap.computeIfAbsent(key, k -> mono.materialize().block())).dematerialize());
}

@simonbasle If the concern here is that subsequently the onCacheMissResume will happen too many times (during the period before the Cache is repopulated) is it worth considering a means to group the subsequent elements (by key), perhaps forcing things to operate synchronously with a second check if the cache has been populated.

E.g. All elements come in with same key.

A5, A4, A3, A2, A1 - Group Same Key

A1 in progress

A2 cannot go until A1 completes. A2 checks if cache has been populated before proceeding, if it has than return cache.

@dave-fl the problem here is that there are 2 levels: the one where you obtain a Mono, which doesn't need any mutual exclusion since it is lazy at this point, and the one where said Mono is subscribed to, which triggers the cache read and cache write.

Seems that starts to point us toward a locking mechanism where the Mono would acquire the lock upon subscription and release it when it terminates or cancels... which is not viable because the net result is that the Mono could actually block its subscribing Thread if the computation takes too long 😞

In the light of all this, I wonder if the CacheMono and CacheFlux approach are entirely misguided. It stemmed from attempting to "rehydrate" a Mono from an existing cache of values, but maybe it would be better to "double-cache": put mySourceMono.cache() in the underlying cache directly...

Cache<String, Mono<T>> underlyingCache; //pseudocode for cache

public Mono<T> toBeCached(String key) {
    return WebClient.get("/" + key).retrieve().bodyToMono(T.class);
}

public Mono<T> cachedFooCall() {
    String key = "foo";
    return underlyingCache.computeIfAbsent(key, k -> toBeCached(k).cache());
}

@smaldini any thought?

@simonbasle I'm not sure why it was decided to use Signal's instead of the Mono. But this looks a lot more simple than the locking code that I was experimenting with.

Just something to point out.

If Mono's result in errors or completions rather than next, the user might not want to cache these values and will want to invalidate the key.

During that brief period (between when the invalid mono was stored and its key was invalidated) there will be the possibility to get a cached invalid Mono. It might be desirable to automatically switch under these conditions and try to compute the mono again (ignoring the cache).

Just answering my own question - a valid reason to use a Signal instead of a Mono might be that there is time sensitive information e.g. token that by the time the Mono is subscribed to could become invalid. Using the signal forces the item to have been completed before being inserted. I could see both scenarios being valid.

@dave-fl see the discussion in #131 (I needed to re-update my brain with that discussion BTW 😄).

The base rationale for CacheMono and CacheFlux was to be able to cache the result of a method like Flux<T> method(String arg1, String arg2), including the "negative results" (error/empty). This is why Signal was used.

For positive results only, the approach I described above is good enough (and has the added benefit of better handling stampedes?).

superseded by #237