Cysharp/MessagePipe

interprocess multi-sub/pub

wanga7 opened this issue · 3 comments

Hi, does any of the messagePipes for InterprocessPubSub supports multiple subscribers (or multiple pubs) to the same port/uds path? Whenever I try adding multiple processes subscribing to the same port, or uds path, it throws the following exception. I've tried AddMessagePipeUdpInterprocess, AddMessagePipeTcpInterprocess and AddMessagePipeTcpInterprocessUds.

Unhandled exception. System.Net.Sockets.SocketException (10048): Only one usage of each socket address (protocol/network address/port) is normally permitted.
   at System.Net.Sockets.Socket.UpdateStatusAfterSocketErrorAndThrowException(SocketError error, String callerName)
   at System.Net.Sockets.Socket.DoBind(EndPoint endPointSnapshot, SocketAddress socketAddress)
   at System.Net.Sockets.Socket.Bind(EndPoint localEP)
   at MessagePipe.Interprocess.Workers.SocketTcpServer.ListenUds(String domainSocketPath, Nullable`1 sendBufferSize, Nullable`1 recvBufferSize)
   at MessagePipe.Interprocess.Workers.TcpWorker.<>c__DisplayClass12_0.<.ctor>b__0()
   at System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
   at System.Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
   at System.Lazy`1.CreateValue()
   at System.Lazy`1.get_Value()
   at MessagePipe.Interprocess.Workers.TcpWorker.StartReceiver()
   at MessagePipe.Interprocess.TcpDistributedSubscriber`2..ctor(TcpWorker worker, MessagePipeInterprocessTcpUdsOptions options, IAsyncSubscriber`2 subscriberCore, FilterAttachedMessageHandlerFactory syncHandlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory)
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor, Boolean wrapExceptions)
   at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitScopeCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.CreateServiceAccessor(Type serviceType)
   at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at CysharpMessagePipe.ServiceCollectionExtensions.UpdateServiceRegistry(IServiceProvider services) in C:\source\4_helix\helix-core\samples\CysharpMessagePipe\CysharpMessagePipe\ServiceCollectionExtensions.cs:line 63
   at CysharpMessagePipe.Program.Main(String[] args) in C:\source\4_helix\helix-core\samples\CysharpMessagePipe\CysharpMessagePipe\Program.cs:line 29

I also encountered this problem.
In my understanding, Current Message Pipe's implementations do not support inter-multi-process pubsub.
Subscriber is UDP Server and hold the specified port, and Publisher is connect to this.
I don't know what happens when multiple AddMessagePipeUdpInterprocess are added.
Maybe, Redis or other solution is available.

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 7 days.

sgf commented

#89

my code support multi pubs,but multi subs not test yet.