Error during unpacking (ExtraData): unpack(b) received extra data.
Closed this issue · 1 comments
haarsh85 commented
Hi all,
I am working with serf and trying to use RPC to get details of a running serf details using stats. So far I am getting the response as below; but it gives an error while unpacking.
Stats Response:
Sent stats header: {'Command': 'stats', 'Seq': 1}
Waiting for stats response...
Received stats response: b'\x82\xa5Error\xa0\xa3Seq\x01\x85\xa5agent\x81\xa4name\xb2serf2\xa7runtime\x86\xa4arch\xa5amd64\xa7version\xa8go1.23.4\xa9max_procs\xa14\xaagoroutines\xa224\xa9cpu_count\xa14\xa2os\xa5linux\xa4serf\x8c\xa9encrypted\xa5false\xa7members\xa210\xa6failed\xa10\xa4left\xa10\xaaquery_time\xa11\xabevent_queue\xa10\xb1coordinate_resets\xa10\xachealth_score\xa10\xabmember_time\xa210\xaaevent_time\xa11\xacintent_queue\xa10\xabquery_queue\xa10\xa4tags\x80\xaeevent_handlers\x80'
Error during Unpacking
Unpacking stats response...
Error during unpacking (ExtraData): unpack(b) received extra data.
Stats Response: {'Error': 'ExtraData during unpacking'}
The code I am using (python):
import socket
import msgpack
def send_handshake(sock):
"""
Performs a handshake with the Serf RPC server to initialize the connection.
:param sock: The socket object connected to the Serf RPC server.
:return: The response from the handshake command.
"""
try:
# Send handshake header
print("Sending handshake header...")
header = {"Command": "handshake", "Seq": 0}
header_payload = msgpack.packb(header)
sock.sendall(header_payload)
print(f"Sent handshake header: {header}")
# Send handshake body
print("Sending handshake body...")
body = {"Version": 1}
body_payload = msgpack.packb(body)
sock.sendall(body_payload)
print(f"Sent handshake body: {body}")
# Receive and decode response
print("Waiting for handshake response...")
response = sock.recv(4096) # Arbitrary buffer size
print(f"Received handshake response: {response}")
unpacked_response = msgpack.unpackb(response, strict_map_key=False)
print(f"Unpacked handshake response: {unpacked_response}")
# Check for errors
if "Error" in unpacked_response and unpacked_response["Error"]:
raise Exception(f"Handshake error: {unpacked_response['Error']}")
print("Handshake successful.")
return unpacked_response
except Exception as e:
print(f"Error during handshake: {e}")
return {"Error": str(e)}
def get_stats(sock):
"""
Sends the 'stats' command to the Serf RPC server to retrieve debugging information.
:param sock: The socket object connected to the Serf RPC server.
:return: The response from the 'stats' command.
"""
try:
# Send stats command header (no body required)
print("Sending stats command header...")
header = {"Command": "stats", "Seq": 1}
header_payload = msgpack.packb(header)
sock.sendall(header_payload)
print(f"Sent stats header: {header}")
# Wait for the response and receive it
print("Waiting for stats response...")
response = sock.recv(4096) # Arbitrary buffer size
print(f"Received stats response: {response}")
if not response:
raise Exception("No response received for stats request.")
# Unpack and process the response
print("Unpacking stats response...")
try:
# First unpack using raw=True to handle all data without issues
unpacked_response = msgpack.unpackb(response, raw=True, strict_map_key=False)
print(f"Unpacked stats response (raw): {unpacked_response}")
# If we want to get a clear picture of the response, print as raw
print("Raw data:", unpacked_response)
# After seeing the raw data, you can proceed with your detailed processing
# Optionally, you can add decoding of raw bytes to specific data structures here
print("Stats retrieved successfully.")
return unpacked_response
except msgpack.exceptions.ExtraData as e:
print(f"Error during unpacking (ExtraData): {e}")
return {"Error": "ExtraData during unpacking"}
except Exception as e:
print(f"General error during unpacking: {e}")
return {"Error": f"Unpacking failed: {e}"}
except Exception as e:
print(f"Error during stats request: {e}")
return {"Error": str(e)}
# Main Execution
if __name__ == "__main__":
rpc_address = "172.20.20.7" # Update with the correct address
rpc_port = 7373
print(f"Connecting to {rpc_address}:{rpc_port}...")
try:
with socket.create_connection((rpc_address, rpc_port), timeout=30) as sock:
# Step 1: Perform handshake
handshake_response = send_handshake(sock)
print("Handshake Response:", handshake_response)
if handshake_response.get("Error") == "":
# Step 2: Get stats
stats_response = get_stats(sock)
print("Stats Response:", stats_response)
except Exception as e:
print(f"Connection error: {e}")
I sincerely appreciate any advice or guidance on getting this working.
Thank you!
methane commented
You need to use streaming unpacking.
>>> up=msgpack.Unpacker(raw=True,strict_map_keys=False)
>>> up.feed(data)
>>> for x in up:
... print(repr(x))
...
>>> up=msgpack.Unpacker(raw=True,strict_map_key=False)
>>> up.feed(data)
>>> for x in up:
... print(repr(x))
...
{b'Error': b'', b'Seq': 1}
{b'agent': {b'name': b'serf2\xa7runtime\x86\xa4arc'}, 104: b'amd64', b'version': b'go1.23.4', b'max_procs': b'4', b'goroutines': b'24'}
b'cpu_count'
b'4'
b'os'
b'linux'
b'serf'
{b'encrypted': b'false', b'members': b'10', b'failed': b'0', b'left': b'0', b'query_time': b'1', b'event_queue': b'0', b'coordinate_resets': b'0', b'health_score': b'0', b'member_time': b'10', b'event_time': b'1', b'intent_queue': b'0', b'query_queue': b'0'}
b'tags'
{}
b'event_handlers'
{}