Tinkoff/invest-python

Ограничение использования AsyncRetryingClient

mancubus721 opened this issue · 6 comments

Что случилось?

Обрадовался, найдя Retrying, переделал код на него, но оказалось что работает не всегда, получил ошибку "AttributeError: 'NoneType' object has no attribute 'ratelimit_reset'". Как я понял, для моего случая исключения get_metadata_from_aio_error(exception) возвращает None. Но в таком случае смысл Retrying`а ломается, все равно приходится обрабатывать повторные попытки соединения снаружи, а такая ошибка за сегодня уже вылетала несколько раз.
Дополнительно: не только мне одному было бы интересно увидеть в examples пример рекомендуемой реализации реконнекта по исключениям соединения, желательно с использованием market_data_stream.

Воспроизведение

async def get_balance_report(acc_id):
    rep = {}
    try:
        async with AsyncRetryingClient(config.settings.token, settings=retry_settings) as client:
            portfolio = await client.operations.get_portfolio(account_id=acc_id)
            b = to_number(portfolio.total_amount_bonds)
            c = to_number(portfolio.total_amount_currencies)
            e = to_number(portfolio.total_amount_etf)
            s = to_number(portfolio.total_amount_shares)
            rbal = b + c + e + s
        positions = portfolio.positions
        tas = to_number(portfolio.total_amount_shares)
    #        print(portfolio)
    except:
        logging.error(traceback.format_exc())
        rbal = -1
        positions = []
        tas = 0

Tinkoff Invest Version

0.2.0-beta45

Python Version

3.10

OS

Windows

Логи

01/10/2023 11:04:50 ERROR:tinklib:Traceback (most recent call last):
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\retry_manager.py", line 20, in call_with_retries
    await response
  File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_call.py", line 290, in __await__
    raise _create_rpc_error(self._cython_call._initial_metadata,
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:91.194.227.80:443: tcp handshaker shutdown"
        debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:91.194.227.80:443: tcp handshaker shutdown {created_time:"2023-01-10T08:04:50.030889041+00:00", grpc_status:14}"
>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\mnc\trading\Phyton\pych\tinklib.py", line 193, in get_balance_report
    portfolio = await client.operations.get_portfolio(account_id=acc_id)
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\_errors.py", line 68, in wrapper
    return await func(*args, **kwargs)
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\async_services.py", line 928, in get_portfolio
    response = await response_coro
  File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_interceptor.py", line 442, in __await__
    call = yield from self._interceptors_task.__await__()
  File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_interceptor.py", line 631, in _invoke
    return await _run_interceptor(iter(interceptors), client_call_details,
  File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_interceptor.py", line 612, in _run_interceptor
    call_or_response = await interceptor.intercept_unary_unary(
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\grpc_interceptor.py", line 19, in intercept_unary_unary
    return await self._intercept_with_retry(
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\grpc_interceptor.py", line 16, in _intercept_with_retry
    return await self._retry_manager.call_with_retries(call=call)
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\retry_manager.py", line 28, in call_with_retries
    seconds_to_sleep = self.extract_seconds_to_sleep(metadata)
  File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\base_retry_manager.py", line 23, in extract_seconds_to_sleep
    seconds_to_sleep = metadata.ratelimit_reset
AttributeError: 'NoneType' object has no attribute 'ratelimit_reset'

Я имею ввиду что-то такое в tinkoff/invest/retrying/aio/retry_manager.py:
if metadata is not None:
seconds_to_sleep = self.extract_seconds_to_sleep(metadata)
else:
seconds_to_sleep = 1
await self._sleep(seconds_to_sleep)

Дополнительно: не только мне одному было бы интересно увидеть в examples пример рекомендуемой реализации реконнекта по исключениям соединения, желательно с использованием market_data_stream.

Постараемся реализовать само восстановления после падения #177

К сожалению, RetryingClient был предназначен для повторов запросов, которые сервер отбил и просит подождать.

Да, я понял для чего задуман Retrying клиент, когда уже написал этот issue. Если возможно дополнить пример /examples/async_stream_client.py конструкцией try-except для обрывов соединения это было бы очень полезно, я не уверен где это писать, когда используется итератор. Заранее спасибо.

Привет) Удалось сделать такую обработку? конструкцией try-except для обрывов соединения

Лично для себя я решил следующим образом, извиняюсь за избыточность кода, привожу пример моей функции сборщика свечек на текущий момент для торгового робота. После запуска робота, сборщик собирает минутные свечки за день, чтобы робот каждую минуту рассчитывал стратегию на шаг. Так у меня вылетов робота стало радикально меньше. Можно проматывать до инструкции try:

candles harvester

async def candles_harvest_run(self):
    logging.info('candles_harvest_run started')
    # if started after beginning of trading day, try to collect existing data
    if dt.now().time() > self.harvstime:
        logging.info('check for updates for candles table')
        dbc = pd.DataFrame()
        for figi in self.stocks:
            dbc0 = tinklib.get_today_candles_1m(figi)
            dbc = pd.concat([dbc, dbc0])
        if len(dbc) > 0:
            self.today_candles = dbc.sort_index()
        else:
            logging.error(
                'attempt to update candles table failed, candles not found, try to collect from void table')
    # wait for trading day start
    while dt.now().time() < self.harvstime:
        logging.debug('candles_harvest_run: wait for harvstime')
        await asyncio.sleep(10)
    logging.info('start candles harvesting')
    # subscribe for instruments
    ci = []
    for f in self.stocks:
        ci.append(CandleInstrument(figi=f,
                                   interval=schemas.SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE))
    logging.debug(ci)
    # define request iterator
    async def request_iterator():
        yield MarketDataRequest(subscribe_candles_request=SubscribeCandlesRequest(
            subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
            instruments=ci),
        )
        while self.f_harvest:
            await asyncio.sleep(1)
        logging.info('candles harvest stop')
    
    # collect actual data
    self.f_harvest = True
    retries_left = 10
    seconds_to_sleep = 2
    async with AsyncClient(config.settings.token) as client:
        while retries_left > 0:
            try:
                async for marketdata in client.market_data_stream.market_data_stream(request_iterator()):
                    if not self.f_harvest:
                        return
                    if marketdata.candle is not None:
                        begin = marketdata.candle.time.astimezone(pytz.timezone('Europe/Moscow'))
                        begin = begin.replace(tzinfo=None)
                        i = (marketdata.candle.figi, begin)
                        self.today_candles.at[i, 'open'] = tinklib.to_number(marketdata.candle.open)
                        self.today_candles.at[i, 'high'] = tinklib.to_number(marketdata.candle.high)
                        self.today_candles.at[i, 'low'] = tinklib.to_number(marketdata.candle.low)
                        self.today_candles.at[i, 'close'] = tinklib.to_number(marketdata.candle.close)
                        self.today_candles.at[i, 'volume'] = marketdata.candle.volume
                        retries_left = 10
                    else:
                        logging.debug(marketdata)
            except (AioRpcError, AioRequestError) as error:
                await asyncio.sleep(seconds_to_sleep)
                retries_left -= 1
                if retries_left > 0:
                    logging.warning(f"candles_harvest_run() raise {error} exception, retry...")
                else:
                    logging.error(f"candles_harvest_run() raise {error} too much exceptions, abort...")
                    logging.info('stop candles harvesting!')
                    return
    logging.info('stop candles harvesting!')
    return