AsyncLazy blocks thread until concurrent factory returns asynchronously
ugumba opened this issue · 3 comments
I've experienced thread starvation in these cases (using AsyncLazy to populate entries in MemoryCache).
It's only an issue with factory methods which have a long CPU bound lead-in before their first "await", and are not explicitly started on thread pool. (In my use case, I can't predict these cases, and I prefer to avoid the thread overhead.)
To resolve, I made my own AsyncLazy using SemaphoreSlim instead of lock.
I'd much prefer to use a reputable nuget, though😊.
Are you using ExecuteOnCallingThread
? Because by default AsyncLazy
will execute your factory delegate on a thread pool thread.
Sorry, it's been a while, but yes - I'd tried both. I don't control the factories - they might be synchronously heavy or not. In the quickly-async case i didn't want the overhead of Task.Run()
. In the heavy-sync case, I don't want the sync lock potentially blocking 1 thread per consumer.
Lazy<T>
just seems like the wrong building block.
This is my take on it - none of your options are implemented (as I haven't needed them).
/// <summary>
/// This implementation differs from Nito.Async's AsyncLazy in one important aspect:
/// Lazy<T>.Value blocks any subsequent accessing threads while the first accessing thread is initializing Value.
/// GetValue() below is fully async.
/// Any exception will be cached by _task.
/// No retries.
/// </summary>
public class AsyncLazy<T>
{
private Func<CancellationToken, Task<T?>>? _taskFactory;
private SemaphoreSlim? _semaphore = new(1);
private Task<T?>? _task;
public AsyncLazy(Func<CancellationToken, ValueTask<T?>> factory)
{
_taskFactory = async ct => await factory(ct).ConfigureAwait(false);
}
public AsyncLazy(Func<CancellationToken, Task<T?>> factory)
{
_taskFactory = factory;
}
public bool IsValueCreated => _semaphore == null;
public ValueTask<T?> GetValue(CancellationToken ct = default)
{
var semaphore = _semaphore;
if (semaphore == null)
// Fast track
return new(_task!);
else
return EnsureHasValueTask(semaphore);
async ValueTask<T?> EnsureHasValueTask(SemaphoreSlim semaphore)
{
using (await semaphore.LockAsync(ct).ConfigureAwait(false))
{
if (_semaphore != null)
{
try
{
_task = _taskFactory!(ct);
// _task is now set, but may not yet be complete
_taskFactory = null; // Release delegate, captured context etc to GC
}
catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
{
// Cache synchronous cancellation
_task = Task.FromCanceled<T?>(ct);
throw;
}
catch (Exception ex)
{
// Cache synchronous exception
_task = Task.FromException<T?>(ex);
throw;
}
finally
{
_semaphore = null; // Release to GC, enable fast track above
}
}
}
// Asynchronous exception will be cached here
return await _task!.ConfigureAwait(false);
}
}
}