Cysharp/MessagePipe

[REQUEST] Allow graceful shutdown for IPC with UDP

Opened this issue · 0 comments

I have a client/server implementation using MessagePipe with UDP as the IPC transport, as per the documentation:

Host.CreateDefaultBuilder()
    .ConfigureServices((ctx, services) =>
    {
        services.AddMessagePipe()
            .AddUdpInterprocess("127.0.0.1", 3215, configure); // setup host and port.
    })

The app has a global CancellationTokenSource server-side that is passed around to all objects to allow graceful shutdown.

When the Generic Host detects the token cancellation, all objects shutdown except the UdpWorker that is created by the library, which stays waiting for a message to be published and prevents the application from shutting down completely.

private async void RunPublishLoop()
{
	ChannelReader<byte[]> reader = channel.Reader;
	CancellationToken token = cancellationTokenSource.Token;
	SocketUdpClient udpClient = client.Value;
	while (await reader.WaitToReadAsync(token).ConfigureAwait(continueOnCapturedContext: false)) // STUCK HERE INDEFINITELY
	{
		byte[] item;
		while (reader.TryRead(out item))
		{
			try
			{
				await udpClient.SendAsync(item, token).ConfigureAwait(continueOnCapturedContext: false);
			}
			catch (Exception ex)
			{
				if (!(ex is OperationCanceledException) && !token.IsCancellationRequested)
				{
					options.UnhandledErrorHandler("network error, publish loop will terminate." + Environment.NewLine, ex);
				}
				return;
			}
		}
	}
}

Problem is that the object's CancellationTokenSource is created at its constructor, thus it doesn't interact in any way with the external application lifecycle.

[Preserve]
public UdpWorker(MessagePipeInterprocessUdpOptions options, IAsyncPublisher<IInterprocessKey, IInterprocessValue> publisher)
{
	MessagePipeInterprocessUdpOptions options2 = options;
	base._002Ector();
	cancellationTokenSource = new CancellationTokenSource(); //HERE
	this.options = options2;
	this.publisher = publisher;
	server = new Lazy<SocketUdpServer>(() => SocketUdpServer.Bind(options2.Port, 65536));
	client = new Lazy<SocketUdpClient>(() => SocketUdpClient.Connect(options2.Host, options2.Port, 65536));
	channel = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions
	{
		SingleReader = true,
		SingleWriter = false,
		AllowSynchronousContinuations = true
	});
}

I could submit a PR but I am not sure the best way to work around that. Perhaps passing an optional CancellationToken as options in the builder?

Thank you for your work.