dotnet/orleans

Memory stream provider not found when using TestCluster

Closed this issue · 2 comments

When trying to run integration tests with Orleans.TestingHostTestCluster and memory streams I receive an error System.Collections.Generic.KeyNotFoundException : Stream provider 'MemoryStream' not found. Minimal source code to reproduce the issues is available at https://github.com/TortillaZHawaii/orleans-stream-test.

Example:

using Orleans.Streams;
using Orleans.TestingHost;

namespace TestProject1;

file class SiloConfigurator : ISiloConfigurator
{
    public void Configure(ISiloBuilder hostBuilder)
    {
        hostBuilder.AddMemoryGrainStorage("PubSubStore");
        // We add memory streams here
        hostBuilder.AddMemoryStreams("MemoryStream");
    }
}

public class Tests
{
    async Task<TestCluster> SetUp()
    {
        var builder = new TestClusterBuilder()
            .AddSiloBuilderConfigurator<SiloConfigurator>();
        var cluster = builder.Build();
        await cluster.DeployAsync();
        return cluster;
    }

    [Test]
    public async Task Test()
    {
        var cluster = await SetUp();
        
        // This will throw exception
        // System.Collections.Generic.KeyNotFoundException : Stream provider 'MemoryStream' not found
        var streamProvider = cluster.Client.GetStreamProvider("MemoryStream");
        
        var inputStream = streamProvider.GetStream<string>("InputStream", 0);
        var outputStream = streamProvider.GetStream<string>("OutputStream", 0);
        var mockReader = new MockReader<string>();
        await outputStream.SubscribeAsync(mockReader);
        var items = new[] { "Hello", "World" };
        
        foreach (var item in items)
        {
            // Act
            await inputStream.OnNextAsync(item);
        }

        await Task.Delay(1000);
        
        // Assert
        Assert.That(mockReader.ReceivedItems.ToArray(), Is.EqualTo(items));
        await TearDown(cluster);
    }

    async Task TearDown(TestCluster cluster)
    {
        await cluster.StopAllSilosAsync();
    }
}

file class MockReader<T> : IAsyncObserver<T>
{
    public List<T> ReceivedItems { get; } = new();

    public Task OnNextAsync(T item, StreamSequenceToken? token = null)
    {
        ReceivedItems.Add(item);
        return Task.CompletedTask;
    }

    public Task OnCompletedAsync()
    {
        return Task.CompletedTask;
    }

    public Task OnErrorAsync(Exception ex)
    {
        return Task.CompletedTask;
    }
}

I suggest registering the StreamProvider on both the server and client side

builder.AddClientBuilderConfigurator<MyClientBuilderConfigurator>();

private class MyClientBuilderConfigurator : IClientBuilderConfigurator
{
    public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
    {
        clientBuilder
            .AddMemoryStreams("MemoryStream");
    }
}

Thank you so much @scalalang2

Closing the issue.