nats-io/nats.net.v2

JetStream ACK won't work with custom serializer

Closed this issue · 0 comments

mtmk commented

There is a serialization bug when sending an ACK if a custom serializer used as the default serializer registry.

For example when using MemoryPack:

[PUB $JS.ACK.any-test.any-test-outgoing-oro-328.1.3.7.1701124066146132000.0 8]
MSG_PAYLOAD: ["\x04\x00\x00\x00+ACK"]
               ^^^^^^^^^^^^^^^^
               These shouldn't be here!

In msg.AckAsync() we're still (wrongly) using the custom serializer serializing +ACK instead we should use default.

@robertmircea thank you so much for the repro!

// <PackageReference Include="NATS.Net" Version="2.0.0" />
// <PackageReference Include="MemoryPack" Version="1.10.0" />
using System.Buffers;
using MemoryPack;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

var nats = new NatsConnection(new NatsOpts { SerializerRegistry = new NatsMessagePackContextSerializerRegistry() });
var js = new NatsJSContext(nats);

try { await js.DeleteStreamAsync("S1"); } catch { /*ignore*/ }

await js.CreateStreamAsync(new StreamConfig("S1", new[]{ "s1" }));
await js.PublishAsync("s1", new GatewayMessage(777));

var c = await js.CreateConsumerAsync("S1", new ConsumerConfig("C1"));
await (await c.NextAsync<GatewayMessage>())!.Value.AckAsync();

await c.RefreshAsync();
Console.WriteLine(c.Info.NumAckPending == 0 ? "PASS" : "FAIL");

[MemoryPackable(SerializeLayout.Explicit)]
public partial class GatewayMessage(int messageId)
{
    [MemoryPackOrder(0)] public int MessageId { get; set; } = messageId;
}

public sealed class NatsMessagePackContextSerializer<T> : INatsSerializer<T>
{
    public void Serialize(IBufferWriter<byte> bufferWriter, T value) => MemoryPackSerializer.Serialize(bufferWriter, value);
    public T? Deserialize(in ReadOnlySequence<byte> buffer) => MemoryPackSerializer.Deserialize<T>(buffer);
}

public sealed class NatsMessagePackContextSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => new NatsMessagePackContextSerializer<T>();
    public INatsDeserialize<T> GetDeserializer<T>() => new NatsMessagePackContextSerializer<T>();
}

Originally posted by @mtmk in #249 (reply in thread)