Pr0Ger/PyAPNs2

TooManyStreamsError Exception occurred. Max outbound streams is 1000, 1000 open

Anant706 opened this issue · 5 comments

I am frequently getting
TooManyStreamsError Exception occurred. Max outbound streams is 1000, 1000 open

I am using apns2==0.5.0

apns_response = apns.get_notification_result(stream_id)
File "/usr/local/lib/python3.6/site-packages/apns2/client.py", line 116, in get_notification_result
with self._connection.get_response(stream_id) as response:
File "/usr/local/lib/python3.6/site-packages/hyper/http20/connection.py", line 311, in get_response
stream = self._get_stream(stream_id)
File "/usr/local/lib/python3.6/site-packages/hyper/http20/connection.py", line 289, in _get_stream
raise StreamResetError("Stream forcefully closed")
hyper.http20.exceptions.StreamResetError: Stream forcefully closed"

Getting this too, same apns2==0.5.0. Before that it seems we got occasional hyper.http20.exceptions.StreamResetError: Stream forcefully closed in client.py line 77, in send_notification: result = self.get_notification_result(stream_id)

It may be these above errors are consuming the streams in the H2 state machine and eventually run them down. To work around, we catch hyper StreamResetError exceptions at the app level and reconnect/retry when this happens.

My reading of the h2 issues and code indicates that the h2 state machine does not catch this condition (stream closed server side after header frame sent, before body frame sent) and does not clean up the resources when this happens. It seems h2 uses a request method that lumps sending headers and body into 1 action but these are distinct frames in the protocol and could fail independently. Just my interpretation...

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

@mmeisinger is the reconnect working for you? Does your flow like this?

try:
  client.send_notification(...)
except StreamResetError:
  client.connect() # I don't see other method that tries to establish a connection
  # retry

@jbarreneche We've come to live with the H2 memory leaks in production. The way we do it is to keep a usage counter. Once the counter exceeds a limit (say 30,000 pretty high) then a thread lock-protected code section creates new a APNsClient and calls client.connect(). Once this is done, the existing client is replaced. This seems to work as long as the limit is not too high.

And additionally for each send_notification(), any error such as a StreamResetError (ConnectionError, SSLError, H2Error, HTTP20Error) is caught, causing an immediate new APNsClient and client.connect() (thread lock protected, so that 1 thread proceeds and other wait for the new connected client) and retry, so each thread will succeed in sending all notifications.

Something along these lines (simplified):

from ssl import SSLError
from threading import RLock
from apns2.client import APNsClient
from h2.exceptions import H2Error
from hyper.http20.exceptions import HTTP20Error, StreamResetError

class MyAPNSClient(object):
    def __init__(self):
        self.request_limit = 30000
        self.retry_timeout = 2
        self.lock = RLock()
        self.requests = 0
        self.client = None
        self.client_ts = 0
        self._init_client()

    def _init_client(self):
        cur_client_ts = self.client_ts
        with self.lock:
            if cur_client_ts != self.client_ts:
                # We waited for the lock and another thread already renewed the connection
                return

            self.requests = 0
            new_client = APNsClient(PUSH_CERT])
            try:
                cl.connect()
            except Exception:
                logger.exception("Cannot open APNS connection")

            # Note: replacing client will automatically close the old connections
            self.client = new_client
            self.client_ts = util.timestamp()

    def send(self, token, payload):
        if self.requests >= self.request_limit:
            self._init_clients()

        push_sent = False
        try:
            try:
                self.client.send_notification(token_hex=token, notification=payload)
                push_sent = True

            except (ConnectionError, SSLError, H2Error, HTTP20Error) as ex:
                time.sleep(self.retry_timeout)
                self._init_clients()
            # Catch other errors here too

            if not push_sent:
                self.client.send_notification(token_hex=token, notification=payload)
                push_sent = True

            if push_sent:
                self.requests += 1

        except Exception:
            logger.exception("Cannot send notification")