tychedelia/kafka-protocol-rs

Version intersection logic generating incorrect decode

tychedelia opened this issue · 8 comments

Using kafka-topics.sh --create --bootstrap-server 127.0.0.1:8844 --topic test first sends an ApiVersionsRequest, after decoding header, there is a trailing byte that isn't decoded, which causes decoding ApiVersionsRequest to error:

0x00, 0x12, 0x00, 0x03, 0x00, 0x00, 0x00, 0x65, 0x00, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00, 0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61, 0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30, 0x00

The second byte 0x12 is the length of the first string in ApiVersionsRequest and the beginning of the message. See here, where this is covered by existing test: https://github.com/0x1991babe/kafka-protocol-rs/blob/main/tests/api_versions.rs#L8

It seems like the extra byte should be decoded here: https://github.com/0x1991babe/kafka-protocol-rs/blob/main/src/messages/request_header.rs#L98

kafka-topics.sh is sending API version 3, so we don't hit this block.

Need to investigate, but it looks like https://github.com/0x1991babe/kafka-protocol-rs/blob/main/protocol_codegen/src/generate_messages/spec.rs#L107 should return Since(b), which would cover 2+ as designated by the spec here https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/RequestHeader.json#L25

Confirmed by looking at Kafka's generated protocol classses:

        if (_version >= 2) {
            int _numTaggedFields = _readable.readUnsignedVarint();
            for (int _i = 0; _i < _numTaggedFields; _i++) {
                int _tag = _readable.readUnsignedVarint();
                int _size = _readable.readUnsignedVarint();
                switch (_tag) {
                    default:
                        this._unknownTaggedFields = _readable.readUnknownTaggedField(this._unknownTaggedFields, _tag, _size);
                        break;
                }
            }
        }

Looks like this also fixed some bugs with compact representations.

I think this indicates a problem elsewhere, since Range(x, x) should be equivalent to Exact(x), unless I'm completely misremembering...

Yeah, if you look here: https://github.com/Diggsey/franz/blob/master/messages/RequestHeader.json#L24

It says that the highest valid version is "2", so this message should not even be sent with version "3".

(Although maybe my interpretation of "validVersions" is wrong)

Yeah, the semantics were confusing figuring this out. I can't find this explicitly called out in the spec in terms of the meaning of validVersions, but the generated code seems to confirm forward compatibility for all behavior, not just the unknown fields. For example, a random request, valid 0-3 flex 3+: https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json#L26-L27

Here's what it looks like in ours:

if version >= 3 {
    types::CompactString.encode(buf, &self.transactional_id)?;
} else {
    types::String.encode(buf, &self.transactional_id)?;
}

And here's what it looks like in the generated Java source (AddOffsetsToTxnRequestData):

byte[] _stringBytes = _cache.getSerializedValue(transactionalId);
if (_version >= 3) {
    _writable.writeUnsignedVarint(_stringBytes.length + 1);
} else {
    _writable.writeShort((short) _stringBytes.length);
}

So I think the >= on all open ranges is correct. Range(b, b) is kinda awkward. I tried to fix this by mapping the intersection to Since(x), since that mirrors the Java implementation, which represents "since" as a range up to i16:MAX, but that seemed to break some other stuff.

So the reason your change fixed it is that there's some code to detect when a range touches the valid-versions range and automatically omits the upper or lower bound check in that case.

See here:
https://github.com/0x1991babe/kafka-protocol-rs/blob/4dff974a199bb2921f4682538fa24d281139ac3b/protocol_codegen/src/generate_messages/generate.rs#L251-L252

And here:
https://github.com/0x1991babe/kafka-protocol-rs/blob/4dff974a199bb2921f4682538fa24d281139ac3b/protocol_codegen/src/generate_messages/generate.rs#L265-L266

I think the proper fix would be to implement the same logic for Exact(v) when v == min or v == max, instead of changing how versions are intersected.

Definitely the right call (see #5). Thanks!