JetStream ACK won't work with custom serializer
Closed this issue · 0 comments
mtmk commented
There is a serialization bug when sending an ACK if a custom serializer used as the default serializer registry.
For example when using MemoryPack:
[PUB $JS.ACK.any-test.any-test-outgoing-oro-328.1.3.7.1701124066146132000.0 8]
MSG_PAYLOAD: ["\x04\x00\x00\x00+ACK"]
^^^^^^^^^^^^^^^^
These shouldn't be here!
In msg.AckAsync()
we're still (wrongly) using the custom serializer serializing +ACK
instead we should use default.
@robertmircea thank you so much for the repro!
// <PackageReference Include="NATS.Net" Version="2.0.0" />
// <PackageReference Include="MemoryPack" Version="1.10.0" />
using System.Buffers;
using MemoryPack;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
var nats = new NatsConnection(new NatsOpts { SerializerRegistry = new NatsMessagePackContextSerializerRegistry() });
var js = new NatsJSContext(nats);
try { await js.DeleteStreamAsync("S1"); } catch { /*ignore*/ }
await js.CreateStreamAsync(new StreamConfig("S1", new[]{ "s1" }));
await js.PublishAsync("s1", new GatewayMessage(777));
var c = await js.CreateConsumerAsync("S1", new ConsumerConfig("C1"));
await (await c.NextAsync<GatewayMessage>())!.Value.AckAsync();
await c.RefreshAsync();
Console.WriteLine(c.Info.NumAckPending == 0 ? "PASS" : "FAIL");
[MemoryPackable(SerializeLayout.Explicit)]
public partial class GatewayMessage(int messageId)
{
[MemoryPackOrder(0)] public int MessageId { get; set; } = messageId;
}
public sealed class NatsMessagePackContextSerializer<T> : INatsSerializer<T>
{
public void Serialize(IBufferWriter<byte> bufferWriter, T value) => MemoryPackSerializer.Serialize(bufferWriter, value);
public T? Deserialize(in ReadOnlySequence<byte> buffer) => MemoryPackSerializer.Deserialize<T>(buffer);
}
public sealed class NatsMessagePackContextSerializerRegistry : INatsSerializerRegistry
{
public INatsSerialize<T> GetSerializer<T>() => new NatsMessagePackContextSerializer<T>();
public INatsDeserialize<T> GetDeserializer<T>() => new NatsMessagePackContextSerializer<T>();
}
Originally posted by @mtmk in #249 (reply in thread)