msgpack/msgpack-python

Error during unpacking (ExtraData): unpack(b) received extra data.

Closed this issue · 1 comments

Hi all,
I am working with serf and trying to use RPC to get details of a running serf details using stats. So far I am getting the response as below; but it gives an error while unpacking.

Stats Response:

Sent stats header: {'Command': 'stats', 'Seq': 1}
Waiting for stats response...
Received stats response: b'\x82\xa5Error\xa0\xa3Seq\x01\x85\xa5agent\x81\xa4name\xb2serf2\xa7runtime\x86\xa4arch\xa5amd64\xa7version\xa8go1.23.4\xa9max_procs\xa14\xaagoroutines\xa224\xa9cpu_count\xa14\xa2os\xa5linux\xa4serf\x8c\xa9encrypted\xa5false\xa7members\xa210\xa6failed\xa10\xa4left\xa10\xaaquery_time\xa11\xabevent_queue\xa10\xb1coordinate_resets\xa10\xachealth_score\xa10\xabmember_time\xa210\xaaevent_time\xa11\xacintent_queue\xa10\xabquery_queue\xa10\xa4tags\x80\xaeevent_handlers\x80'

Error during Unpacking

Unpacking stats response...
Error during unpacking (ExtraData): unpack(b) received extra data.
Stats Response: {'Error': 'ExtraData during unpacking'}

The code I am using (python):

import socket
import msgpack

def send_handshake(sock):
    """
    Performs a handshake with the Serf RPC server to initialize the connection.
    :param sock: The socket object connected to the Serf RPC server.
    :return: The response from the handshake command.
    """
    try:
        # Send handshake header
        print("Sending handshake header...")
        header = {"Command": "handshake", "Seq": 0}
        header_payload = msgpack.packb(header)
        sock.sendall(header_payload)
        print(f"Sent handshake header: {header}")

        # Send handshake body
        print("Sending handshake body...")
        body = {"Version": 1}
        body_payload = msgpack.packb(body)
        sock.sendall(body_payload)
        print(f"Sent handshake body: {body}")

        # Receive and decode response
        print("Waiting for handshake response...")
        response = sock.recv(4096)  # Arbitrary buffer size
        print(f"Received handshake response: {response}")

        unpacked_response = msgpack.unpackb(response, strict_map_key=False)
        print(f"Unpacked handshake response: {unpacked_response}")

        # Check for errors
        if "Error" in unpacked_response and unpacked_response["Error"]:
            raise Exception(f"Handshake error: {unpacked_response['Error']}")

        print("Handshake successful.")
        return unpacked_response

    except Exception as e:
        print(f"Error during handshake: {e}")
        return {"Error": str(e)}

def get_stats(sock):
    """
    Sends the 'stats' command to the Serf RPC server to retrieve debugging information.
    :param sock: The socket object connected to the Serf RPC server.
    :return: The response from the 'stats' command.
    """
    try:
        # Send stats command header (no body required)
        print("Sending stats command header...")
        header = {"Command": "stats", "Seq": 1}
        header_payload = msgpack.packb(header)
        sock.sendall(header_payload)
        print(f"Sent stats header: {header}")

        # Wait for the response and receive it
        print("Waiting for stats response...")
        response = sock.recv(4096)  # Arbitrary buffer size
        print(f"Received stats response: {response}")

        if not response:
            raise Exception("No response received for stats request.")

        # Unpack and process the response
        print("Unpacking stats response...")
        try:
            # First unpack using raw=True to handle all data without issues
            unpacked_response = msgpack.unpackb(response, raw=True, strict_map_key=False)
            print(f"Unpacked stats response (raw): {unpacked_response}")

            # If we want to get a clear picture of the response, print as raw
            print("Raw data:", unpacked_response)

            # After seeing the raw data, you can proceed with your detailed processing
            # Optionally, you can add decoding of raw bytes to specific data structures here

            print("Stats retrieved successfully.")
            return unpacked_response

        except msgpack.exceptions.ExtraData as e:
            print(f"Error during unpacking (ExtraData): {e}")
            return {"Error": "ExtraData during unpacking"}

        except Exception as e:
            print(f"General error during unpacking: {e}")
            return {"Error": f"Unpacking failed: {e}"}

    except Exception as e:
        print(f"Error during stats request: {e}")
        return {"Error": str(e)}


# Main Execution
if __name__ == "__main__":
    rpc_address = "172.20.20.7"  # Update with the correct address
    rpc_port = 7373

    print(f"Connecting to {rpc_address}:{rpc_port}...")
    try:
        with socket.create_connection((rpc_address, rpc_port), timeout=30) as sock:
            # Step 1: Perform handshake
            handshake_response = send_handshake(sock)
            print("Handshake Response:", handshake_response)

            if handshake_response.get("Error") == "":
                # Step 2: Get stats
                stats_response = get_stats(sock)
                print("Stats Response:", stats_response)

    except Exception as e:
        print(f"Connection error: {e}")

I sincerely appreciate any advice or guidance on getting this working.

Thank you!

You need to use streaming unpacking.

>>> up=msgpack.Unpacker(raw=True,strict_map_keys=False)
>>> up.feed(data)
>>> for x in up:
...  print(repr(x))
...
>>> up=msgpack.Unpacker(raw=True,strict_map_key=False)
>>> up.feed(data)
>>> for x in up:
...  print(repr(x))
...
{b'Error': b'', b'Seq': 1}
{b'agent': {b'name': b'serf2\xa7runtime\x86\xa4arc'}, 104: b'amd64', b'version': b'go1.23.4', b'max_procs': b'4', b'goroutines': b'24'}
b'cpu_count'
b'4'
b'os'
b'linux'
b'serf'
{b'encrypted': b'false', b'members': b'10', b'failed': b'0', b'left': b'0', b'query_time': b'1', b'event_queue': b'0', b'coordinate_resets': b'0', b'health_score': b'0', b'member_time': b'10', b'event_time': b'1', b'intent_queue': b'0', b'query_queue': b'0'}
b'tags'
{}
b'event_handlers'
{}