粘包问题需要处理不?
abc100m opened this issue · 4 comments
abc100m commented
socket的发送和接收tcp报文,都需要处理粘包问题。
粘包问题,除了在报文上加长度外,send和recv这2个系统函数的调用也比较重要。这一点在《unix网络编程》这书上有详细论述。
Python的报文处理,我找了一下,正确处理应该是在这个链接 https://docs.python.org/3/howto/sockets.html#socket-howto
class MySocket:
"""demonstration class only
- coded for clarity, not efficiency
"""
def __init__(self, sock=None):
if sock is None:
self.sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
else:
self.sock = sock
def connect(self, host, port):
self.sock.connect((host, port))
# 请注意一个报文需要多次调用才算正确处理
def mysend(self, msg):
totalsent = 0
while totalsent < MSGLEN:
sent = self.sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError("socket connection broken")
totalsent = totalsent + sent
def myreceive(self):
chunks = []
bytes_recd = 0
while bytes_recd < MSGLEN:
chunk = self.sock.recv(min(MSGLEN - bytes_recd, 2048))
if chunk == b'':
raise RuntimeError("socket connection broken")
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
return b''.join(chunks)
再来看一下pytdx/parser/base.py这里
87行 nsended = self.client.send(self.send_pkg)
98行 if nsended != len(self.send_pkg):
log.debug("send bytes error")
raise SendRequestPkgFails("send fails")
self.client.send,没有看到对socket的send重载的代码,故而此处逻辑是有点小问题的:一次调用不能保证把self.send_pkg的报文都发送出去。
同样的recv,并不能保证一次调用就能收到16个字节(虽然这里字节数少,在大多数情况下都能正常接收到)。
102行 head_buf = self.client.recv(self.rsp_header_len)
abc100m commented
另外看到对协议异常的处理代码
105行 if len(head_buf) == self.rsp_header_len:
145行 else:
log.debug("head_buf is not 0x10")
raise ResponseHeaderRecvFails("head_buf is not 0x10 : " + str(head_buf))
第1次接收前16个字节报文头,再接收消息体,逻辑上没有问错。问题是即使正常报文,第1次调用recv(16)不能保证接收到16个字节。而如果一直等待接收16个字节,如果是异常报文收不到这么长该怎么办?
1)等不到,就等到超时了断开tcp连接(不加超时处理就是等待对方断开),这是默认处理。
2)根据我以往的经验,消息体的第1、第2个字节大概率是个magic_flag,以此判断报文是否异常,是否需要等下去
abc100m commented
大概想了一下修改了代码,没有调试和测试(因为刚阅读代码,还没跑过测试)
class TrafficStatSocket(socket.socket):
# 加一个函数
def recvall(self, n):
# Helper function to recv n bytes or return None if EOF is hit
data = bytearray()
while len(data) < n:
packet = self.recv(n - len(data))
if not packet:
return None
self.recv_pkg_num += 1
self.recv_pkg_bytes += len(packet)
data.extend(packet)
return data
class BaseParser(object):
#主要是这个函数的处理
def _call_api(self):
self.setup()
if not(self.client):
raise SocketClientNotReady("socket client not ready")
if not(self.send_pkg):
raise SendPkgNotReady("send pkg not ready")
try:
self.client.sendall(self.send_pkg)
except Exception as e:
raise SendRequestPkgFails("send fails")
nsended = len(self.send_pkg)
self.client.send_pkg_num += 1
self.client.send_pkg_bytes += nsended
self.client.last_api_send_bytes = nsended
if self.client.first_pkg_send_time is None:
self.client.first_pkg_send_time = datetime.datetime.now()
if DEBUG:
log.debug("send package:" + str(self.send_pkg))
##一直收不至16个字节咋办? help: 谁来帮处理一下?
head_buf = self.client.recvall(self.rsp_header_len)
if not head_buf:
##这里大概率是sock断开了,仅在被攻击或者服务器协议修改了才会出现报文头不一致的情况
log.debug("head_buf is not 0x10")
raise ResponseHeaderRecvFails("head_buf is not 0x10 : " + str(head_buf))
if DEBUG:
log.debug("recv head_buf:" + str(head_buf) + " |len is :" + str(len(head_buf)))
if True: # if len(head_buf) == self.rsp_header_len:
_, _, _, zipsize, unzipsize = struct.unpack("<IIIHH", head_buf)
if DEBUG:
log.debug("zip size is: " + str(zipsize))
body_buf = self.client.recvall(zipsize)
if not body_buf:
log.debug("接收数据体失败服务器断开连接")
raise ResponseRecvFails("接收数据体失败服务器断开连接")
self.client.last_api_recv_bytes = self.rsp_header_len + len(body_buf)
if zipsize == unzipsize:
log.debug("不需要解压")
else:
log.debug("需要解压")
if sys.version_info[0] == 2:
unziped_data = zlib.decompress(buffer(body_buf))
else:
unziped_data = zlib.decompress(body_buf)
body_buf = unziped_data
## 解压
if DEBUG:
log.debug("recv body: ")
log.debug(body_buf)
return self.parseResponse(body_buf)
abc100m commented
初次阅读代码,理解可能有偏差,请大佬指正。
如果我理解是对的,请协议大佬帮我看看消息头是否有个magic flag?请帮把异常的处理完善一下。
abc100m commented
这个问题,由于代码有重试机制,即使解包错误断掉再重试(可能取数据慢一点),也OK,因此先关掉了