Ограничение использования AsyncRetryingClient
mancubus721 opened this issue · 6 comments
Что случилось?
Обрадовался, найдя Retrying, переделал код на него, но оказалось что работает не всегда, получил ошибку "AttributeError: 'NoneType' object has no attribute 'ratelimit_reset'". Как я понял, для моего случая исключения get_metadata_from_aio_error(exception) возвращает None. Но в таком случае смысл Retrying`а ломается, все равно приходится обрабатывать повторные попытки соединения снаружи, а такая ошибка за сегодня уже вылетала несколько раз.
Дополнительно: не только мне одному было бы интересно увидеть в examples пример рекомендуемой реализации реконнекта по исключениям соединения, желательно с использованием market_data_stream.
Воспроизведение
async def get_balance_report(acc_id):
rep = {}
try:
async with AsyncRetryingClient(config.settings.token, settings=retry_settings) as client:
portfolio = await client.operations.get_portfolio(account_id=acc_id)
b = to_number(portfolio.total_amount_bonds)
c = to_number(portfolio.total_amount_currencies)
e = to_number(portfolio.total_amount_etf)
s = to_number(portfolio.total_amount_shares)
rbal = b + c + e + s
positions = portfolio.positions
tas = to_number(portfolio.total_amount_shares)
# print(portfolio)
except:
logging.error(traceback.format_exc())
rbal = -1
positions = []
tas = 0
Tinkoff Invest Version
0.2.0-beta45
Python Version
3.10
OS
Windows
Логи
01/10/2023 11:04:50 ERROR:tinklib:Traceback (most recent call last):
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\retry_manager.py", line 20, in call_with_retries
await response
File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_call.py", line 290, in __await__
raise _create_rpc_error(self._cython_call._initial_metadata,
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:91.194.227.80:443: tcp handshaker shutdown"
debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:91.194.227.80:443: tcp handshaker shutdown {created_time:"2023-01-10T08:04:50.030889041+00:00", grpc_status:14}"
>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\mnc\trading\Phyton\pych\tinklib.py", line 193, in get_balance_report
portfolio = await client.operations.get_portfolio(account_id=acc_id)
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\_errors.py", line 68, in wrapper
return await func(*args, **kwargs)
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\async_services.py", line 928, in get_portfolio
response = await response_coro
File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_interceptor.py", line 442, in __await__
call = yield from self._interceptors_task.__await__()
File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_interceptor.py", line 631, in _invoke
return await _run_interceptor(iter(interceptors), client_call_details,
File "c:\users\mnc\anaconda3\lib\site-packages\grpc\aio\_interceptor.py", line 612, in _run_interceptor
call_or_response = await interceptor.intercept_unary_unary(
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\grpc_interceptor.py", line 19, in intercept_unary_unary
return await self._intercept_with_retry(
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\grpc_interceptor.py", line 16, in _intercept_with_retry
return await self._retry_manager.call_with_retries(call=call)
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\aio\retry_manager.py", line 28, in call_with_retries
seconds_to_sleep = self.extract_seconds_to_sleep(metadata)
File "c:\users\mnc\anaconda3\lib\site-packages\tinkoff\invest\retrying\base_retry_manager.py", line 23, in extract_seconds_to_sleep
seconds_to_sleep = metadata.ratelimit_reset
AttributeError: 'NoneType' object has no attribute 'ratelimit_reset'
Я имею ввиду что-то такое в tinkoff/invest/retrying/aio/retry_manager.py:
if metadata is not None:
seconds_to_sleep = self.extract_seconds_to_sleep(metadata)
else:
seconds_to_sleep = 1
await self._sleep(seconds_to_sleep)
Дополнительно: не только мне одному было бы интересно увидеть в examples пример рекомендуемой реализации реконнекта по исключениям соединения, желательно с использованием market_data_stream.
Постараемся реализовать само восстановления после падения #177
К сожалению, RetryingClient был предназначен для повторов запросов, которые сервер отбил и просит подождать.
Да, я понял для чего задуман Retrying клиент, когда уже написал этот issue. Если возможно дополнить пример /examples/async_stream_client.py конструкцией try-except для обрывов соединения это было бы очень полезно, я не уверен где это писать, когда используется итератор. Заранее спасибо.
Привет) Удалось сделать такую обработку? конструкцией try-except для обрывов соединения
Лично для себя я решил следующим образом, извиняюсь за избыточность кода, привожу пример моей функции сборщика свечек на текущий момент для торгового робота. После запуска робота, сборщик собирает минутные свечки за день, чтобы робот каждую минуту рассчитывал стратегию на шаг. Так у меня вылетов робота стало радикально меньше. Можно проматывать до инструкции try:
candles harvester
async def candles_harvest_run(self):
logging.info('candles_harvest_run started')
# if started after beginning of trading day, try to collect existing data
if dt.now().time() > self.harvstime:
logging.info('check for updates for candles table')
dbc = pd.DataFrame()
for figi in self.stocks:
dbc0 = tinklib.get_today_candles_1m(figi)
dbc = pd.concat([dbc, dbc0])
if len(dbc) > 0:
self.today_candles = dbc.sort_index()
else:
logging.error(
'attempt to update candles table failed, candles not found, try to collect from void table')
# wait for trading day start
while dt.now().time() < self.harvstime:
logging.debug('candles_harvest_run: wait for harvstime')
await asyncio.sleep(10)
logging.info('start candles harvesting')
# subscribe for instruments
ci = []
for f in self.stocks:
ci.append(CandleInstrument(figi=f,
interval=schemas.SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE))
logging.debug(ci)
# define request iterator
async def request_iterator():
yield MarketDataRequest(subscribe_candles_request=SubscribeCandlesRequest(
subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
instruments=ci),
)
while self.f_harvest:
await asyncio.sleep(1)
logging.info('candles harvest stop')
# collect actual data
self.f_harvest = True
retries_left = 10
seconds_to_sleep = 2
async with AsyncClient(config.settings.token) as client:
while retries_left > 0:
try:
async for marketdata in client.market_data_stream.market_data_stream(request_iterator()):
if not self.f_harvest:
return
if marketdata.candle is not None:
begin = marketdata.candle.time.astimezone(pytz.timezone('Europe/Moscow'))
begin = begin.replace(tzinfo=None)
i = (marketdata.candle.figi, begin)
self.today_candles.at[i, 'open'] = tinklib.to_number(marketdata.candle.open)
self.today_candles.at[i, 'high'] = tinklib.to_number(marketdata.candle.high)
self.today_candles.at[i, 'low'] = tinklib.to_number(marketdata.candle.low)
self.today_candles.at[i, 'close'] = tinklib.to_number(marketdata.candle.close)
self.today_candles.at[i, 'volume'] = marketdata.candle.volume
retries_left = 10
else:
logging.debug(marketdata)
except (AioRpcError, AioRequestError) as error:
await asyncio.sleep(seconds_to_sleep)
retries_left -= 1
if retries_left > 0:
logging.warning(f"candles_harvest_run() raise {error} exception, retry...")
else:
logging.error(f"candles_harvest_run() raise {error} too much exceptions, abort...")
logging.info('stop candles harvesting!')
return
logging.info('stop candles harvesting!')
return