Cysharp/MessagePipe

am i right? for my test code TCP and TCPWithUds InterProcess

sgf opened this issue · 2 comments

sgf commented

TcpTest.cs

what's the problem for the code.
im add providerClient Subscribe by Key "hogemogeman"
add the reulst to result2
but its give me a error

[Fact]
        public async Task MoreHugeSizeUdsTest()
        {
            var filePath = System.IO.Path.GetTempFileName();
            if (System.IO.File.Exists(filePath))
            {
                System.IO.File.Delete(filePath);
            }
            try
            {
                var provider = TestHelper.BuildServiceProviderTcpWithUds(filePath, helper);
                var providerClient = TestHelper.BuildServiceProviderTcpWithUds(filePath, helper, asServer:false);
                using (provider as IDisposable)
                {
                    var p1 = provider.GetRequiredService<IDistributedPublisher<string, string>>();
                    var s1 = provider.GetRequiredService<IDistributedSubscriber<string, string>>();
                    var s2= providerClient.GetRequiredService<IDistributedSubscriber<string, string>>();

                    var result = new List<string>();
                    var result2 = new List<string>();
                    await s1.SubscribeAsync("hogemogeman", x =>
                    {
                        result.Add(x);
                    });
                    await s2.SubscribeAsync("hogemogeman", x =>
                    {
                        result2.Add(x);
                    });

                    var ldata1 = new string('a', 99999);
                    var ldata2 = new string('b', 99999);
                    var ldata3 = new string('c', 99999);
                    var ldata = string.Concat(ldata1, ldata2, ldata3);
                    await Task.Delay(TimeSpan.FromSeconds(1)); // wait for receive data...
                    await p1.PublishAsync("hogemogeman", ldata);

                    await Task.Delay(TimeSpan.FromSeconds(1)); // wait for receive data...

                    result.Should().Equal(ldata);
                    result2.Should().Equal(ldata); 
                }
            }
            finally
            {
                if (System.IO.File.Exists(filePath))
                {
                    System.IO.File.Delete(filePath);
                }
            }
        }
 MessagePipe.Interprocess.Tests.TcpTest.MoreHugeSizeUdsTest
   源: TcpTest.cs 行 234
   持续时间:75 毫秒

  消息: 
System.Net.Sockets.SocketException : Only one usage of each socket address (protocol/network address/port) is normally permitted

  堆栈跟踪: 
Socket.UpdateStatusAfterSocketErrorAndThrowException(SocketError error, String callerName)
Socket.DoBind(EndPoint endPointSnapshot, SocketAddress socketAddress)
Socket.Bind(EndPoint localEP)
SocketTcpServer.ListenUds(String domainSocketPath, Nullable`1 sendBufferSize, Nullable`1 recvBufferSize) 行 52
<.ctor>b__0() 行 83
Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
Lazy`1.CreateValue()
Lazy`1.get_Value()
TcpWorker.StartReceiver() 行 170
TcpDistributedSubscriber`2.ctor(TcpWorker worker, MessagePipeInterprocessTcpUdsOptions options, IAsyncSubscriber`2 subscriberCore, FilterAttachedMessageHandlerFactory syncHandlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory) 行 55
RuntimeMethodHandle.InvokeMethod(Object target, Span`1& arguments, Signature sig, Boolean constructor, Boolean wrapExceptions)
RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
CallSiteRuntimeResolver.VisitScopeCache(ServiceCallSite callSite, RuntimeResolverContext context)
CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
<>c__DisplayClass2_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
ServiceProvider.GetService(Type serviceType)
ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
TcpTest.MoreHugeSizeUdsTest() 行 249
--- End of stack trace from previous location ---
sgf commented

Soloved,the code likes follow:
TCP or TCPWithUDS all work fines.

class Program : ConsoleAppBase
{

    public static IServiceProvider BuildServiceProviderTcpWithUds(string domainSocketPath, bool asServer = true)
    {
        var sc = new ServiceCollection();
        sc.AddMessagePipe();
        sc.AddMessagePipeTcpInterprocessUds(domainSocketPath, x =>
        {
            x.HostAsServer = asServer;
            x.UnhandledErrorHandler = (msg, e) => Console.WriteLine(msg + e);
        });
        return sc.BuildServiceProvider();
    }

    public static IServiceProvider BuildServiceProviderTcp(string host, int port,bool asServer = true)
    {
        var sc = new ServiceCollection();
        sc.AddMessagePipe();
        sc.AddMessagePipeTcpInterprocess(host, port, x =>
        {
            x.HostAsServer = asServer;
            x.UnhandledErrorHandler = (msg, e) => Console.WriteLine(msg + e);
        });
        return sc.BuildServiceProvider();
    }

    //Server
    static void Main(string[] args)
    { 
        var udsPath = "sock.ss";
        //var server = BuildServiceProviderTcp("127.0.0.1", 9878, true);
        if (File.Exists(udsPath)) File.Delete(udsPath);

        var server = BuildServiceProviderTcpWithUds("sock.ss", true);
        var subscriber= server.GetRequiredService<IDistributedSubscriber<string, int>>();

        // subscribe remote-message with "foobar" key.
        subscriber.SubscribeAsync("foobar", x =>
       {
           Console.WriteLine(x);
       }).GetAwaiter().GetResult();
        Console.WriteLine("Hello, World!");
        Console.ReadLine();
    }

}
public class Program : ConsoleAppBase
{

    public static IServiceProvider BuildServiceProviderTcpWithUds(string domainSocketPath,  bool asServer = true)
    {
        var sc = new ServiceCollection();
        sc.AddMessagePipe();
        sc.AddMessagePipeTcpInterprocessUds(domainSocketPath, x =>
        {
            x.HostAsServer = asServer;
            x.UnhandledErrorHandler = (msg, e) => Console.WriteLine(msg + e);
        });
        return sc.BuildServiceProvider();
    }

    public static IServiceProvider BuildServiceProviderTcp(string host, int port, bool asServer = true)
    {
        var sc = new ServiceCollection();
        sc.AddMessagePipe();
        sc.AddMessagePipeTcpInterprocess(host, port, x =>
        {
            x.HostAsServer = asServer;
            x.UnhandledErrorHandler = (msg, e) => Console.WriteLine(msg + e);
        });
        return sc.BuildServiceProvider();
    }

    //Client
    static void Main(string[] args)
    {
        //var client = BuildServiceProviderTcp("127.0.0.1", 9878, false);
        var client = BuildServiceProviderTcpWithUds("sock.ss", false);
        var p1 = client.GetRequiredService<IDistributedPublisher<string, int>>();

        while (true)
        {
            Task.Delay(800).GetAwaiter().GetResult();
            // publish value to remote process.
             p1.PublishAsync("foobar", 100).GetAwaiter().GetResult();
            Console.WriteLine("published");
        }

        Console.WriteLine("Hello, World!");
        Console.ReadLine();
    }
sgf commented

im change the title,hope this clould be help any others.

pls note,UDS mode need the Server and Client use same DomainSockPath(same file path)