tomerfiliba-org/rpyc

unix-socket unexpectetly closed by client

Opened this issue · 0 comments

Deleting a list of netrefs unexpectedly closes the connection.

  • depending on size of list and net.core.wmem_default, socket write fails
  • the exception is silently ignore, connection closed
  • the next operation fails due to closed connection

sock.send throws an exception (TimeoutError), connection is closed:

write, stream.py:292
send, channel.py:78
_send, protocol.py:308
_async_request, protocol.py:750
async_request, protocol.py:768
asyncreq, netref.py:79
__del__, netref.py:122
<module>, client.py:50
Suggestions/ Workarounds /Questions
  • call syscreq instead of asyncrequest from BaseNetref.__del__
    class BaseNetref(object, metaclass=NetrefMetaclass):
    
        def __del__(self):
            try:
                # syncrequest here???
                asyncreq(self, consts.HANDLE_DEL, self.____refcount__)
            except Exception:
                # raised in a destructor, most likely on program termination,
                # when the connection might have already been closed.
                # it's safe to ignore all exceptions here
                pass
  • TCP-socket connections is not affected
Environment
  • rpyc 6.0.0
  • python 3.10
  • ubuntu 22.04
Minimal example

Server:

import os
import logging
import socket

import rpyc.utils.server
import rpyc.utils.classic

import argparse
Parser = argparse.ArgumentParser()
Parser.add_argument("--unix", action='store_true', help="use unix socket")
args = Parser.parse_args()

hostname = "localhost"
socket_path = "/tmp/rpyc-socket"

if os.path.exists(socket_path):
    os.remove(socket_path)

# setup logger
fmt = logging.Formatter("%(asctime)s [%(process)d] (%(levelname)s) %(module)s.%(funcName)s(%(lineno)d): %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(logging.DEBUG)

Logger = logging.Logger("rpyc_server")
Logger.addHandler(sh)

Logger.info("starting ThreadedServer - rpyc %s" % str(rpyc.__version__))

if args.unix:
    t = rpyc.utils.server.ThreadedServer(rpyc.core.SlaveService, socket_path=socket_path, logger=Logger)
else:
    t = rpyc.utils.server.ThreadedServer(rpyc.core.SlaveService, hostname=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, logger=Logger)
    t.listener.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

# setup socket
t._listen()
# run server
t.start()

Client:

import rpyc.utils.classic
import socket
import logging
import subprocess
import time

import argparse
Parser = argparse.ArgumentParser()
Parser.add_argument("--unix", action='store_true', help="use unix socket")
args = Parser.parse_args()

hostname = "localhost"
socket_path = "/tmp/rpyc-socket"

# setup logger
fmt = logging.Formatter("%(asctime)s [%(process)d] (%(levelname)s) %(module)s.%(funcName)s(%(lineno)d): %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(logging.DEBUG)

Logger = logging.Logger("rpyc_client")
Logger.addHandler(sh)

Logger.info(subprocess.Popen(["sysctl", "net.core.wmem_max"], stdout=subprocess.PIPE).stdout.readline().decode("ASCII").strip())
Logger.info(subprocess.Popen(["sysctl", "net.core.wmem_default"], stdout=subprocess.PIPE).stdout.readline().decode("ASCII").strip())

Logger.info(f"creating Connection - rpyc {rpyc.__version__}")

if args.unix:
    conn = rpyc.classic.factory.unix_connect(path=socket_path, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1, "nodelay": True})
else:
    #conn = rpyc.classic.factory.connect(host=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1})
    s = rpyc.core.stream.SocketStream.connect(host=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, ipv6=False, keepalive=False, nodelay=True)
    conn = rpyc.classic.factory.connect_stream(s, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1})



Logger.info(f"Connection: {conn}")

conn.execute("import time")
rtime = conn.modules["time"]

for i in range(1, 6):
    try:
        Logger.info(f"creating list of strict_time ({i})")
        t0 = time.time()
        tl = [rtime.struct_time([0]*9) for _ in range(1000)]
        print(time.time()-t0)
        Logger.info(f"free list ({i})")
        tl = []
        Logger.info(f"done  ({i})")
    except EOFError as e:
        Logger.info(e)
        raise e

Output (Server):

/usr/bin/python3 /home/wasc/playground/rpyc/server.py --unix 
2024-04-12 11:19:09,668 [1282739] (INFO) server.<module>(28): starting ThreadedServer - rpyc 6.0.0
2024-04-12 11:19:09,668 [1282739] (INFO) server._listen(251): server started on []:/tmp/rpyc-socket
2024-04-12 11:19:21,567 [1282739] (INFO) server.accept(157): accepted  with fd 4
2024-04-12 11:19:21,567 [1282739] (INFO) server._serve_client(200): welcome 
2024-04-12 11:19:21,604 [1282739] (INFO) server._serve_client(207): goodbye 

Output (Client):

/usr/bin/python3 /home/wasc/playground/rpyc/client.py --unix 
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(24): net.core.wmem_max = 212992
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(25): net.core.wmem_default = 212992
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(27): creating Connection - rpyc 6.0.0
Traceback (most recent call last):
  File "/home/wasc/playground/rpyc/client.py", line 30, in <module>
    conn = rpyc.classic.factory.unix_connect(path=socket_path, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1, "nodelay": True})
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 117, in unix_connect
    return connect_stream(s, service, config)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 60, in connect_stream
    return connect_channel(Channel(stream), service=service, config=config)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 48, in connect_channel
    return service._connect(channel, config)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/service.py", line 106, in _connect
    self.on_connect(conn)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/service.py", line 215, in on_connect
    self._install(conn, conn.root)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 777, in root
    self._remote_root = self.sync_request(consts.HANDLE_GETROOT)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 744, in sync_request
    return _async_res.value
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/async_.py", line 109, in value
    self.wait()
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/async_.py", line 51, in wait
    self._conn.serve(self._ttl, waiting=self._waiting)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 472, in serve
    self._dispatch(data)  # Dispatch will unbox, invoke callbacks, etc.
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 421, in _dispatch
    obj = self._unbox(args)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 343, in _unbox
    proxy = self._netref_factory(id_pack)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 363, in _netref_factory
    return cls(self, id_pack)
TypeError: rpyc.core.service.SlaveService() takes no arguments

Process finished with exit code 1