Server response periodically blocks?
Closed this issue · 11 comments
Hello,
I will provide a reproduction example, but first, I am aware that what I'm doing isn't normal. I believe technically, this should be able to work.
It seems that sending return values is periodically blocking indefinitely. I'm not sure why this is occuring.
If you run the below example and run
for i in {1..100} ; do zerorpc --timeout 1 tcp://localhost:5559 lolita ; done
You'll notice that ever 5th or so request results in a timeout. But from the debug output in the server, you can see that the rpc method is executed and only appears to block while sending the response to the broker.
Is there a better way to do this?
# main.py
# A lot of debugging junk in here
import gevent
from gevent import monkey
monkey.patch_all()
import gipc
from zmq import green as zmq
import zerorpc
import logging
import multiprocessing as mp
from zerorpc.socket import SocketBase
from zerorpc.core import ServerBase
from .util import zhelpers as zhelpers
class SrvBase(ServerBase, SocketBase):
def __init__(self, methods=None, name=None, context=None, pool_size=1000, heartbeat=5):
SocketBase.__init__(self, zmq.XREP, context)
if methods is None:
methods = self
name = name or ServerBase._extract_name(methods)
methods = ServerBase._filter_methods(SrvBase, self, methods)
ServerBase.__init__(self, self._events, methods, name, context, pool_size, heartbeat)
# print "DEBUG TEST"
# print self.__dict__
def close(self):
ServerBase.close(self)
SocketBase.close(self)
class RPCWorker(SrvBase):
def lolita(self):
return 42
def add(self, a, b):
return a + b
def initializer(url="tcp://127.0.0.1:5560"):
def do_work():
class MySrv(SrvBase):
def lolita(self):
print "ME"
return 42
def add(self, a, b):
return a + b
srv = MySrv()
print "BREAK"
srv.connect(url)
print "BREAK1"
gevent.spawn(srv.run)
print "BREAK2"
while True:
gevent.sleep(0.50)
monkey.patch_all()
THREADPOOL = gevent.get_hub().threadpool
threads = []
for i in range(5):
t = THREADPOOL.spawn(do_work)
threads.append( t )
print threads
while True:
# print "TICK"
gevent.sleep(0.50)
class DataPool:
def __init__(self):
self.ctx = zmq.Context.instance()
try:
# Socket facing clients
self.frontend = self.ctx.socket(zmq.XREP)
self.frontend.bind("tcp://127.0.0.1:5559")
# Socket facing services
self.backend = self.ctx.socket(zmq.XREQ)
self.backend.bind("tcp://127.0.0.1:5560")
# Socket poller
self.poller = zmq.Poller()
self.poller.register(self.frontend, zmq.POLLIN)
self.poller.register(self.backend, zmq.POLLIN)
# Start pool
# self.pool = mp.Pool(processes=4, initializer=initializer)
procs = []
for i in xrange(10):
p = gipc.start_process(initializer)
procs.append(p)
# p = mp.Process(target=initializer)
# p.start()
except Exception, e:
raise
def run(self, *args, **kwargs):
gevent.spawn(self.run_loop)
def run_loop(self):
try:
while True:
# print "TICK"
socks = dict(self.poller.poll(100))
gevent.sleep(0)
if socks.get(self.frontend) == zmq.POLLIN:
message = self.frontend.recv(zmq.NOBLOCK)
more = self.frontend.getsockopt(zmq.RCVMORE)
if more:
print "Rtr<<",
gevent.spawn(zhelpers.dump_part, message)
self.backend.send(message, zmq.SNDMORE)
print "SENDMORE"
gevent.sleep(0)
else:
print "Rtr<.",
gevent.spawn(zhelpers.dump_part, message)
self.backend.send(message, zmq.NOBLOCK)
gevent.sleep(0)
if socks.get(self.backend) == zmq.POLLIN:
message = self.backend.recv(zmq.NOBLOCK)
more = self.backend.getsockopt(zmq.RCVMORE)
if more:
print "Rtr>>",
gevent.spawn(zhelpers.dump_part, message)
self.frontend.send(message, zmq.SNDMORE)
print "RECEIVEMORE"
gevent.sleep(0)
else:
print "Rtr>.",
gevent.spawn(zhelpers.dump_part, message)
self.frontend.send(message, zmq.NOBLOCK)
gevent.sleep(0)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
self.frontend.close()
self.backend.close()
self.ctx.term()
# util.zhelpers
from random import randint
import zmq
import msgpack
# Receives all message parts from socket, prints neatly
def dump(zsocket):
print "----------------------------------------"
for part in zsocket.recv_multipart():
dump_part(part)
def dump_part(part):
# print part
try:
# dec_part = format_part( part )
# dec_part = msgpack.unpackb(part)
unpacker = msgpack.Unpacker(encoding='utf-8')
unpacker.feed(part)
dec_part = unpacker.unpack()
except Exception, e:
print "Can't unpack %s" % e
dec_part = format_part( part )
finally:
print "[%03d]" % len(part), dec_part
# print "[%03d]" % len(part), format_part( part )
def format_part(part):
if all(31 < ord(c) < 128 for c in part):
return "'" + part + "'"
else:
return "0x" + "".join("%x" % ord(c) for c in part)
# Set simple random printable identity on socket
def set_id(zsocket):
identity = "%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
zsocket.setsockopt(zmq.IDENTITY, identity)
What's your pyzmq
version? Try to upgrading pyzmq
to 15.2.0 or downgrading to 13.1.0.
Hello,
This is using 15.2.0. If I downgrade to 13.1.0, I can not perform an equivalent test because it does not appear to have the XREP socket type. Any other ideas?
I reworked this a bit and downgraded to 13.1.0, and you are correct, this works as expected now. This should be considered a duplicate of #123
Seems to be another case of monkey patching gevent everywhere messing up with pyzmq then. I have no easy solution in mind.
But why do you need to monkey patch in the first place? You are basically begging for trouble. Monkey patching is really crazy and changing the behavior of some important functions. Either you write your code fully gevent compliant or not. Of course, zerorpc-python requires gevent, and there is no version of zerorpc working without gevent. Could a version independent of gevent fixes your problems?
Thanks to @faith0811 see zeromq/pyzmq#766.
In short: pyzmq has a gc thread to handle the lifetime of zerocopy messages. This thread uses a zmq context. When you gevent monkey patch the threading API, the gc thread is not using the green wrapper of the zmq context (zmq.green.Context). The gc thead hangs.
The latest version of pyzmq does detect if the threading API was monkey patched, and uses zmq.green.Context in this case. This likely means you have to monkey patch the threading API early on, before pyzmq starts the gc thread.
@bombela I think (at least in my case) it is not enough for zerorpc.Context to use zmq.green. I experimented with this a bit over the weekend, and in my case it did not change the problem at all.
The issue @faith0811 points out (thank you!) works reliably. This explains the lockup I was seeing when using threads.
Unfortunately, I think it is necessary to monkey patch my framework early on because it is simply providing multi processing wrappers to an existing single threaded code base. A lot of libs will be included indirectly.
It has nothing to do with zerorpc here, zerorpc is based on gevent and thus already works with it. You would encounter the same freeze without zerorpc. It happenned when you combine gevent monkey patching of the threading API + using zerocopy messages with pyzmq.
And yes @faith0811 provided the links to the real problem and solution, I was merely reporting it in this thread for closing it :)
You are correct that it is not a zerorpc issue, however, zerorpc could proactively fix this in zmq.Context subclass by providing an initializer. Sorry I should have explained better, I haven't been sleeping much.
Ok, I am not sure to follow 100%, do you mean providing something like zerorpc.green.Context similarly to zmq.green.Context? Or you mean providing a way to initialize a zerorpc.Context (
zerorpc-python/zerorpc/context.py
Line 32 in 82dcbc2
I could subclass zmq.green.Context maybe? But I am still failing to understand what it would fix.