Amqp Connector don't work with RestartSource
sleipnir opened this issue · 11 comments
HI guys!
I am trying to create a Reconnectable Source with RestartSource using Akka Streams Amqp.V1 and some strange things happen. When I disconnect the network, RestartSource does not try to reconnect again and no message is printed in the logs. However, when I receive an invalid message (type incompatible) it restarts my source. I am not sure why this is so and I would like to know if you have already been there and if you could help me. Below is the Source creation code:
using System.Numerics;
using Akka.Serialization;
using Akka.Streams.Amqp.V1.Dsl;
using Akka.Streams.Dsl;
using Amqp;
using System;
using Akka.Actor;
using Address = Amqp.Address;
using Akka.Streams;
using Akka.Streams.Amqp.V1;
using System.Text;
namespace image_processor_akkanet
{
class Program
{
private static Connection connection;
private static Session session;
static void Main(string[] args)
{
var sys = ActorSystem.Create("AMQP-System");
var materializer = ActorMaterializer.Create(sys);
var serialization = sys.Serialization;
var serializer = serialization.FindSerializerForType(typeof(byte[]));
try
{
var address = new Address("0.0.0.0", 61616, "admin", "admin", scheme: "AMQP");
connection = new Connection(address);
session = new Session(connection);
var queueName = "akka.teste";
var receiverlinkName = "amqp-conn-test-sender";
//create source
var amqpSource = RestartSource.OnFailuresWithBackoff(
() => {
Console.WriteLine("Start/Restart...");
return AmqpSource
.Create(new NamedQueueSourceSettings<byte[]>(session, receiverlinkName, queueName, 200, serializer));
},
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(3),
0.2,
Int16.MaxValue
);
//run source
var result = amqpSource
.Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)
.Select(elem => Encoding.UTF8.GetString(elem, 0, elem.Length))
.RunForeach(elem => Console.WriteLine(elem), materializer);
result.Wait(TimeSpan.FromSeconds(Int16.MaxValue));
}
catch (Exception ex)
{
Console.WriteLine("Error " + ex.Message);
}
finally
{
session.Close();
connection.Close();
}
}
}
}
I tried using both RestartSource.OnFailuresWithBackoff and RestartSource.WithBackoff both to no avail
My project and dependencies:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>image_processor_akkanet</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.4.11" />
<PackageReference Include="Akka.Streams" Version="1.4.11" />
<PackageReference Include="Akka.Streams.Amqp.V1" Version="1.0.0-beta2" />
</ItemGroup>
</Project>
I'm using ActiveMQ Artemis as a AMQP Broker.
Ping
This is actually by design. because the Source
object does not have any reference to the AMQP Connection
object, it does not have a direct way of reconnecting with the server.
Try moving the Connection
and Session
initialization into the source factory Func
in the RestartSource.OnFailuresWithBackoff
method argument.
I might be mistaken, there might be some problem with the way connections were made inside the connector that prevents retry
The PR should address the problem. I've included examples on how to make both Sink
and Source
resistant to transient connection problems.
Hi @Arkatufus Can you tell me when we will have a version with this correction published via nuget?
As soon as someone reviewed the changes and approved it.
@Aaronontheweb can you take a look at this? I look forward to putting the first software made using Akka.Net in my company and that is a deterrent
I'm reviewing it now
Thanks @Aaronontheweb
@sleipnir this is now live on NuGet as Alpakka v1.0.0-beta3 https://github.com/akkadotnet/Alpakka/releases/tag/1.0.0-beta3
Awesome work guys! Thank you @Aaronontheweb and @Arkatufus