cloudevents/sdk-go

NATS Jetstream optimistic concurrency headers

Opened this issue · 2 comments

Hi, folks! Is it possible to access the special NATS headers on send, for example Nats-Expected-Last-Sequence

These headers would allow event sourcing with CloudEvents. The neat thing is that consumers don't need to do anything. It's more like a quality of service between NATS client and the server.

If not currently possible, are there workarounds? Could we find a way to add support without breaking the current API?

If I understand the current nats_jetstream protocol implementation correctly, it ignores this header during the CE conversion:

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.encoding != binding.EncodingStructured {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data))
}
// ReadBinary transfers a binary-mode event to an BinaryWriter.
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
if m.encoding != binding.EncodingBinary {
return binding.ErrNotBinary
}
version := m.GetVersion()
if version == nil {
return binding.ErrNotBinary
}
var err error
for k, v := range m.Msg.Header {
headerValue := v[0]
if strings.HasPrefix(k, prefix) {
attr := version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, headerValue)
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue)
}
} else if k == contentTypeHeader {
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue)
}
if err != nil {
return err
}
}
if m.Msg.Data != nil {
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data))
}
return err

IMHO it can be added without breaking the API similar to what we do in the Kafka protocol bindings.

I'd be happy with a change to enable this, but not sure on the correct approach. I'm not too familiar to how other protocol bindings work, but http just has an exported struct field:

// Message holds the Header and Body of a HTTP Request or Response.
// The Message instance *must* be constructed from NewMessage function.
// This message *cannot* be read several times. In order to read it more times, buffer it using binding/buffering methods
type Message struct {
Header nethttp.Header
BodyReader io.ReadCloser
OnFinish func(error) error
ctx context.Context
format format.Format
version spec.Version
}

If you did something similar, users can then just do the following:

var msg binding.Message

if msg, ok := msg.(*jetstreamv2.Message); ok {
  fmt.Println(msg.Header)
}