Memory stream provider not found when using TestCluster
Closed this issue · 2 comments
TortillaZHawaii commented
When trying to run integration tests with Orleans.TestingHostTestCluster
and memory streams I receive an error System.Collections.Generic.KeyNotFoundException : Stream provider 'MemoryStream' not found
. Minimal source code to reproduce the issues is available at https://github.com/TortillaZHawaii/orleans-stream-test.
Example:
using Orleans.Streams;
using Orleans.TestingHost;
namespace TestProject1;
file class SiloConfigurator : ISiloConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
{
hostBuilder.AddMemoryGrainStorage("PubSubStore");
// We add memory streams here
hostBuilder.AddMemoryStreams("MemoryStream");
}
}
public class Tests
{
async Task<TestCluster> SetUp()
{
var builder = new TestClusterBuilder()
.AddSiloBuilderConfigurator<SiloConfigurator>();
var cluster = builder.Build();
await cluster.DeployAsync();
return cluster;
}
[Test]
public async Task Test()
{
var cluster = await SetUp();
// This will throw exception
// System.Collections.Generic.KeyNotFoundException : Stream provider 'MemoryStream' not found
var streamProvider = cluster.Client.GetStreamProvider("MemoryStream");
var inputStream = streamProvider.GetStream<string>("InputStream", 0);
var outputStream = streamProvider.GetStream<string>("OutputStream", 0);
var mockReader = new MockReader<string>();
await outputStream.SubscribeAsync(mockReader);
var items = new[] { "Hello", "World" };
foreach (var item in items)
{
// Act
await inputStream.OnNextAsync(item);
}
await Task.Delay(1000);
// Assert
Assert.That(mockReader.ReceivedItems.ToArray(), Is.EqualTo(items));
await TearDown(cluster);
}
async Task TearDown(TestCluster cluster)
{
await cluster.StopAllSilosAsync();
}
}
file class MockReader<T> : IAsyncObserver<T>
{
public List<T> ReceivedItems { get; } = new();
public Task OnNextAsync(T item, StreamSequenceToken? token = null)
{
ReceivedItems.Add(item);
return Task.CompletedTask;
}
public Task OnCompletedAsync()
{
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
return Task.CompletedTask;
}
}
scalalang2 commented
I suggest registering the StreamProvider on both the server and client side
builder.AddClientBuilderConfigurator<MyClientBuilderConfigurator>();
private class MyClientBuilderConfigurator : IClientBuilderConfigurator
{
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder
.AddMemoryStreams("MemoryStream");
}
}
TortillaZHawaii commented
Thank you so much @scalalang2
Closing the issue.