unix-socket unexpectetly closed by client
Opened this issue · 0 comments
walterschneider commented
Deleting a list of netrefs unexpectedly closes the connection.
- depending on size of list and
net.core.wmem_default
, socket write fails - the exception is silently ignore, connection closed
- the next operation fails due to closed connection
sock.send throws an exception (TimeoutError), connection is closed:
write, stream.py:292
send, channel.py:78
_send, protocol.py:308
_async_request, protocol.py:750
async_request, protocol.py:768
asyncreq, netref.py:79
__del__, netref.py:122
<module>, client.py:50
Suggestions/ Workarounds /Questions
- call syscreq instead of asyncrequest from
BaseNetref.__del__
class BaseNetref(object, metaclass=NetrefMetaclass): def __del__(self): try: # syncrequest here??? asyncreq(self, consts.HANDLE_DEL, self.____refcount__) except Exception: # raised in a destructor, most likely on program termination, # when the connection might have already been closed. # it's safe to ignore all exceptions here pass
- TCP-socket connections is not affected
Environment
- rpyc 6.0.0
- python 3.10
- ubuntu 22.04
Minimal example
Server:
import os
import logging
import socket
import rpyc.utils.server
import rpyc.utils.classic
import argparse
Parser = argparse.ArgumentParser()
Parser.add_argument("--unix", action='store_true', help="use unix socket")
args = Parser.parse_args()
hostname = "localhost"
socket_path = "/tmp/rpyc-socket"
if os.path.exists(socket_path):
os.remove(socket_path)
# setup logger
fmt = logging.Formatter("%(asctime)s [%(process)d] (%(levelname)s) %(module)s.%(funcName)s(%(lineno)d): %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(logging.DEBUG)
Logger = logging.Logger("rpyc_server")
Logger.addHandler(sh)
Logger.info("starting ThreadedServer - rpyc %s" % str(rpyc.__version__))
if args.unix:
t = rpyc.utils.server.ThreadedServer(rpyc.core.SlaveService, socket_path=socket_path, logger=Logger)
else:
t = rpyc.utils.server.ThreadedServer(rpyc.core.SlaveService, hostname=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, logger=Logger)
t.listener.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# setup socket
t._listen()
# run server
t.start()
Client:
import rpyc.utils.classic
import socket
import logging
import subprocess
import time
import argparse
Parser = argparse.ArgumentParser()
Parser.add_argument("--unix", action='store_true', help="use unix socket")
args = Parser.parse_args()
hostname = "localhost"
socket_path = "/tmp/rpyc-socket"
# setup logger
fmt = logging.Formatter("%(asctime)s [%(process)d] (%(levelname)s) %(module)s.%(funcName)s(%(lineno)d): %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(logging.DEBUG)
Logger = logging.Logger("rpyc_client")
Logger.addHandler(sh)
Logger.info(subprocess.Popen(["sysctl", "net.core.wmem_max"], stdout=subprocess.PIPE).stdout.readline().decode("ASCII").strip())
Logger.info(subprocess.Popen(["sysctl", "net.core.wmem_default"], stdout=subprocess.PIPE).stdout.readline().decode("ASCII").strip())
Logger.info(f"creating Connection - rpyc {rpyc.__version__}")
if args.unix:
conn = rpyc.classic.factory.unix_connect(path=socket_path, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1, "nodelay": True})
else:
#conn = rpyc.classic.factory.connect(host=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1})
s = rpyc.core.stream.SocketStream.connect(host=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, ipv6=False, keepalive=False, nodelay=True)
conn = rpyc.classic.factory.connect_stream(s, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1})
Logger.info(f"Connection: {conn}")
conn.execute("import time")
rtime = conn.modules["time"]
for i in range(1, 6):
try:
Logger.info(f"creating list of strict_time ({i})")
t0 = time.time()
tl = [rtime.struct_time([0]*9) for _ in range(1000)]
print(time.time()-t0)
Logger.info(f"free list ({i})")
tl = []
Logger.info(f"done ({i})")
except EOFError as e:
Logger.info(e)
raise e
Output (Server):
/usr/bin/python3 /home/wasc/playground/rpyc/server.py --unix
2024-04-12 11:19:09,668 [1282739] (INFO) server.<module>(28): starting ThreadedServer - rpyc 6.0.0
2024-04-12 11:19:09,668 [1282739] (INFO) server._listen(251): server started on []:/tmp/rpyc-socket
2024-04-12 11:19:21,567 [1282739] (INFO) server.accept(157): accepted with fd 4
2024-04-12 11:19:21,567 [1282739] (INFO) server._serve_client(200): welcome
2024-04-12 11:19:21,604 [1282739] (INFO) server._serve_client(207): goodbye
Output (Client):
/usr/bin/python3 /home/wasc/playground/rpyc/client.py --unix
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(24): net.core.wmem_max = 212992
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(25): net.core.wmem_default = 212992
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(27): creating Connection - rpyc 6.0.0
Traceback (most recent call last):
File "/home/wasc/playground/rpyc/client.py", line 30, in <module>
conn = rpyc.classic.factory.unix_connect(path=socket_path, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1, "nodelay": True})
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 117, in unix_connect
return connect_stream(s, service, config)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 60, in connect_stream
return connect_channel(Channel(stream), service=service, config=config)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 48, in connect_channel
return service._connect(channel, config)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/service.py", line 106, in _connect
self.on_connect(conn)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/service.py", line 215, in on_connect
self._install(conn, conn.root)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 777, in root
self._remote_root = self.sync_request(consts.HANDLE_GETROOT)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 744, in sync_request
return _async_res.value
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/async_.py", line 109, in value
self.wait()
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/async_.py", line 51, in wait
self._conn.serve(self._ttl, waiting=self._waiting)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 472, in serve
self._dispatch(data) # Dispatch will unbox, invoke callbacks, etc.
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 421, in _dispatch
obj = self._unbox(args)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 343, in _unbox
proxy = self._netref_factory(id_pack)
File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 363, in _netref_factory
return cls(self, id_pack)
TypeError: rpyc.core.service.SlaveService() takes no arguments
Process finished with exit code 1