rainx/pytdx

粘包问题需要处理不?

abc100m opened this issue · 4 comments

socket的发送和接收tcp报文,都需要处理粘包问题。

粘包问题,除了在报文上加长度外,send和recv这2个系统函数的调用也比较重要。这一点在《unix网络编程》这书上有详细论述。

Python的报文处理,我找了一下,正确处理应该是在这个链接 https://docs.python.org/3/howto/sockets.html#socket-howto

class MySocket:
    """demonstration class only
      - coded for clarity, not efficiency
    """

    def __init__(self, sock=None):
        if sock is None:
            self.sock = socket.socket(
                            socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.sock = sock

    def connect(self, host, port):
        self.sock.connect((host, port))

   # 请注意一个报文需要多次调用才算正确处理
    def mysend(self, msg):
        totalsent = 0
        while totalsent < MSGLEN:
            sent = self.sock.send(msg[totalsent:])
            if sent == 0:
                raise RuntimeError("socket connection broken")
            totalsent = totalsent + sent

    def myreceive(self):
        chunks = []
        bytes_recd = 0
        while bytes_recd < MSGLEN:
            chunk = self.sock.recv(min(MSGLEN - bytes_recd, 2048))
            if chunk == b'':
                raise RuntimeError("socket connection broken")
            chunks.append(chunk)
            bytes_recd = bytes_recd + len(chunk)
        return b''.join(chunks)

再来看一下pytdx/parser/base.py这里

87  nsended = self.client.send(self.send_pkg)
98  if nsended != len(self.send_pkg):
            log.debug("send bytes error")
            raise SendRequestPkgFails("send fails")

self.client.send,没有看到对socket的send重载的代码,故而此处逻辑是有点小问题的:一次调用不能保证把self.send_pkg的报文都发送出去。

同样的recv,并不能保证一次调用就能收到16个字节(虽然这里字节数少,在大多数情况下都能正常接收到)。

102   head_buf = self.client.recv(self.rsp_header_len)

另外看到对协议异常的处理代码

105  if len(head_buf) == self.rsp_header_len:
145  else:
                log.debug("head_buf is not 0x10")
                raise ResponseHeaderRecvFails("head_buf is not 0x10 : " + str(head_buf))

第1次接收前16个字节报文头,再接收消息体,逻辑上没有问错。问题是即使正常报文,第1次调用recv(16)不能保证接收到16个字节。而如果一直等待接收16个字节,如果是异常报文收不到这么长该怎么办?

1)等不到,就等到超时了断开tcp连接(不加超时处理就是等待对方断开),这是默认处理。
2)根据我以往的经验,消息体的第1、第2个字节大概率是个magic_flag,以此判断报文是否异常,是否需要等下去

大概想了一下修改了代码,没有调试和测试(因为刚阅读代码,还没跑过测试)

class TrafficStatSocket(socket.socket):
     # 加一个函数
     def recvall(self, n):
        # Helper function to recv n bytes or return None if EOF is hit
        data = bytearray()
        while len(data) < n:
            packet = self.recv(n - len(data))
            if not packet:
                return None
            self.recv_pkg_num += 1
            self.recv_pkg_bytes += len(packet)         
            data.extend(packet)
        return data
class BaseParser(object):
    #主要是这个函数的处理
    def _call_api(self):
        self.setup()

        if not(self.client):
            raise SocketClientNotReady("socket client not ready")

        if not(self.send_pkg):
            raise SendPkgNotReady("send pkg not ready")

        try:
            self.client.sendall(self.send_pkg) 
        except Exception as e:
            raise SendRequestPkgFails("send fails") 

        nsended = len(self.send_pkg)
        self.client.send_pkg_num += 1
        self.client.send_pkg_bytes += nsended
        self.client.last_api_send_bytes = nsended

        if self.client.first_pkg_send_time is None:
            self.client.first_pkg_send_time = datetime.datetime.now()

        if DEBUG:
            log.debug("send package:" + str(self.send_pkg))

        ##一直收不至16个字节咋办? help:  谁来帮处理一下?
        head_buf = self.client.recvall(self.rsp_header_len)
        if not head_buf:
            ##这里大概率是sock断开了,仅在被攻击或者服务器协议修改了才会出现报文头不一致的情况
            log.debug("head_buf is not 0x10")
            raise ResponseHeaderRecvFails("head_buf is not 0x10 : " + str(head_buf))
        if DEBUG:
            log.debug("recv head_buf:" + str(head_buf)  + " |len is :" + str(len(head_buf)))
        if True:    # if len(head_buf) == self.rsp_header_len:
            _, _, _, zipsize, unzipsize = struct.unpack("<IIIHH", head_buf)
            if DEBUG:
                log.debug("zip size is: " + str(zipsize))

            body_buf = self.client.recvall(zipsize)
            if not body_buf:
                log.debug("接收数据体失败服务器断开连接")
                raise ResponseRecvFails("接收数据体失败服务器断开连接")            
            self.client.last_api_recv_bytes = self.rsp_header_len + len(body_buf)

            if zipsize == unzipsize:
                log.debug("不需要解压")
            else:
                log.debug("需要解压")
                if sys.version_info[0] == 2:
                    unziped_data = zlib.decompress(buffer(body_buf))
                else:
                    unziped_data = zlib.decompress(body_buf)
                body_buf = unziped_data
                ## 解压
            if DEBUG:
                log.debug("recv body: ")
                log.debug(body_buf)

            return self.parseResponse(body_buf)

初次阅读代码,理解可能有偏差,请大佬指正。

如果我理解是对的,请协议大佬帮我看看消息头是否有个magic flag?请帮把异常的处理完善一下。

这个问题,由于代码有重试机制,即使解包错误断掉再重试(可能取数据慢一点),也OK,因此先关掉了