bachya/simplisafe-python

Unable to get api.add_refresh_token_callback() to work

Closed this issue ยท 16 comments

Follow on from a different issue/question. I've been trying since yesterday, and I can't manage to get the callback to work. I'm pretty sure I need the callback to handle the auth token refresh at the one hour mark.

Here's my demo app. It's runnable, with logging. Any idea why the callback isn't working?

import asyncio
from datetime import datetime

from aiohttp import ClientSession
import aiofiles
from aioconsole import ainput

from simplipy import API
from simplipy.errors import (
    EndpointUnavailableError,
    InvalidCredentialsError,
    SimplipyError,
    WebsocketError,
)

from simplipy.websocket import (
    EVENT_AUTOMATIC_TEST,
    EVENT_CAMERA_MOTION_DETECTED,
    EVENT_CONNECTION_LOST,
    EVENT_CONNECTION_RESTORED,
    EVENT_DEVICE_TEST,
    EVENT_DOORBELL_DETECTED,
    EVENT_LOCK_LOCKED,
    EVENT_LOCK_UNLOCKED,
    EVENT_POWER_OUTAGE,
    EVENT_POWER_RESTORED,
    EVENT_SECRET_ALERT_TRIGGERED,
    EVENT_SENSOR_PAIRED_AND_NAMED,
    EVENT_USER_INITIATED_TEST,
    WebsocketEvent,
)
import logging

class SimpliSafe:
    """Define a SimpliSafe data object."""

    def __init__(self, api: API) -> None:
        """Initialize."""
        logging.basicConfig(level=logging.INFO, format='%(asctime)s.%(msecs)02d\t%(levelname)6s %(name)12s.%(funcName)-30s%(msg)s',datefmt='%Y-%m-%d %H:%M:%S')
        self.logger = logging.getLogger("SimpliSafe")
        self._api = api
        self._websocket_reconnect_task: asyncio.Task | None = None
        self.initial_event_to_use: dict[int, dict[str, Any]] = {}
        self.subscription_data: dict[int, Any] = api.subscription_data
        self.systems: dict[int, SystemType] = {}

    async def _async_start_websocket_loop(self) -> None:
        """Start a websocket reconnection loop."""
        self.logger.info("_async_start_websocket_loop")

        assert self._api.websocket
        try:
            await self._api.websocket.async_connect()
            await self._api.websocket.async_listen()
        except asyncio.CancelledError as err:
            self.logger.info(f"_async_start_websocket_loop: asyncio.CancelledError: {err}")
            raise
        except WebsocketError as err:
            self.logger.error(f"_async_start_websocket_loop: WebsocketError: {err}")
        except Exception as err:  # pylint: disable=broad-except
            self.logger.error(f"_async_start_websocket_loop: Exception: {err}")

        await self._async_cancel_websocket_loop()
        self._websocket_reconnect_task = asyncio.create_task(self._async_start_websocket_loop())

    async def _async_cancel_websocket_loop(self) -> None:
        """Stop any existing websocket reconnection loop."""
        self.logger.info("_async_cancel_websocket_loop")

        if self._websocket_reconnect_task:
            self._websocket_reconnect_task.cancel()
            try:
                await self._websocket_reconnect_task
            except asyncio.CancelledError as err:
                self.logger.info(f"_async_cancel_websocket_loop: asyncio.CancelledError: {err}")
                self._websocket_reconnect_task = None
            except Exception as err:  # pylint: disable=broad-except
                self.logger.error(f"_async_cancel_websocket_loop: Exception: {err}")

            assert self._api.websocket
            await self._api.websocket.async_disconnect()

    async def _async_websocket_on_event(self, event: WebsocketEvent) -> None:
        """Define a callback for receiving a websocket event."""
        self.logger.info(f"_async_websocket_on_event, event: {event.info}")

    async def _async_websocket_on_connect(self) -> None:
        self.logger.info(f"_async_websocket_on_connect")

    async def _async_websocket_on_disconnect(self) -> None:
        self.logger.info(f"_async_websocket_on_disconnect")

    async def async_update(self) -> None:
        """Get updated data from SimpliSafe."""
        self.logger.info("async_update")

        async def async_update_system(system: SystemType) -> None:
            """Update a system."""
            asyncio.sleep(0)

        tasks = [async_update_system(system) for system in self.systems.values()]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    async def async_save_refresh_token(self, token: str) -> None:
        """Save a refresh token to the config entry."""
        self.logger.info(f"async_save_refresh_token: {token}")
        async with aiofiles.open("token_file", "w", encoding="utf-8") as f:
            await f.write(token)

    async def async_handle_refresh_token(self, token: str) -> None:
        """Handle a new refresh token."""
        self.logger.info(f"async_handle_refresh_token: {token}")

        await async_save_refresh_token(token)

        # Open a new websocket connection with the fresh token:
        assert self._api.websocket
        await self._async_cancel_websocket_loop()
        self._websocket_reconnect_task = asyncio.create_task(self._async_start_websocket_loop())

    async def async_init(self) -> None:
        """Initialize the SimpliSafe "manager" class."""
        self.logger.info(f"async_init")

        assert self._api.refresh_token
        assert self._api.websocket

        # Save the refresh token we got on entry setup:
        await self.async_save_refresh_token(self._api.refresh_token)

        self._api.websocket.add_event_callback(self._async_websocket_on_event)
        self._api.websocket.add_connect_callback(self._async_websocket_on_connect)
        self._api.websocket.add_disconnect_callback(self._async_websocket_on_disconnect)
        self._api.add_refresh_token_callback(self.async_handle_refresh_token)
        self._websocket_reconnect_task = asyncio.create_task(self._async_start_websocket_loop())

        self.systems = await self._api.async_get_systems()
        for system in self.systems.values():

            # Future events will come from the websocket, but since subscription to the
            # websocket doesn't provide the most recent event, we grab it from the REST
            # API to ensure event-related attributes aren't empty on startup:
            try:
                self.initial_event_to_use[
                    system.system_id
                ] = await system.async_get_latest_event()
            except SimplipyError as err:
                self.logger.error(f"async_init: SimplipyError: {err}")
                self.initial_event_to_use[system.system_id] = {}

async def main() -> None:
    """Create the aiohttp session and run."""
    async with ClientSession() as session:

        try:
            async with aiofiles.open("token_file", "r", encoding="utf-8") as f:
                refresh_token = await f.read()
        except OSError:
            refresh_token = None

        if refresh_token:
            _api = await API.async_from_refresh_token(refresh_token, session=session)
        else:
            username = await ainput("Username: ")
            password = await ainput("Password: ")
            _api = await API.async_from_credentials(username, password, session=session)

            sms = await ainput("SMS Code: ")
            try:
                await _api.async_verify_2fa_sms(sms)
                print(_api.auth_state)
            except InvalidCredentialsError as err:
                print("Invalid SMS 2FA code")

        print(f"Authentication Successful: {_api.auth_state}")

        simplisafe = SimpliSafe(_api)
        try:
            await simplisafe.async_init()
        except SimplipyError as err:
            raise ConfigEntryNotReady from err

        systems = await _api.async_get_systems()
        for systemid, system in systems.items():
            print(f"System: {system.system_id} @ {system.address}")

        # get commands
        while True:
            await asyncio.sleep(0)
            cmd = await ainput("\n")
            match cmd:
                case 'quit':
                    break
                case 'off':
                    await system.async_set_off()
                    await system.async_update()
                case 'home':
                    await system.async_set_home()
                    await system.async_update()
                case 'away':
                    await system.async_set_away()
                    await system.async_update()
                case 'state':
                    print(f"System state: {system.state}")
                case '':
                    continue
                case _:
                    print(f"Unknown command '{cmd}'")

asyncio.run(main())

By adding a call to the internal method API._async_refresh_access_token() to my code, I've determined that the callback to the refresh handler is actually working. So the problem is that the refresh isn't happening automatically. The only place that happens is in _async_handle_on_backoff(), but I can't find anything that calls that method.

Thoughts?

I can't speak to your exact application, but automatic refreshing of the tokens, while the event loop is running happens here:

if err.status in (401, 403):
assert self._token_last_refreshed
# Calculate the window between now and the last time the token was
# refreshed:
window = (datetime.utcnow() - self._token_last_refreshed).total_seconds()
# Since we might have multiple requests (each running their own retry
# sequence) land here, we only refresh the access token if it hasn't
# been refreshed within the window (and we lock the attempt so other
# requests can't try it at the same time):
async with self._backoff_refresh_lock:
if window < DEFAULT_TOKEN_EXPIRATION_WINDOW:
LOGGER.debug("Skipping refresh attempt since window hasn't busted")
return
LOGGER.info("401 detected; attempting refresh token")
await self._async_refresh_access_token()

Keep digging; this same structure is running fine in Home Assistant (my instance has been running for over two weeks and I haven't lost connection to SS).

I understand that's what supposed to do the refresh. The question is, what calls _async_handle_on_backoff()? I think this is something internal to HASS that's not part of the actual API.

Ok, I did find it. It's a callback from_wrap_request_method(), which is part of __init__() in API.

So it's all part of the backoff logic, which applies to what? I see self.async_request(), which is only used by async_update_subscription_data(), which is called by async_get_systems(). And so on. So clearly there's a requirement to do certain API calls on a regular basis to trigger one of those. If the event loop is ONLY waiting on websocket events, then those don't happen, and the refresh never happens.

I see self.async_request(), which is only used by async_update_subscription_data(), which is called by async_get_systems().

That isn't correct. API.async_request is used all over the place โ€“ it's what makes a request to the REST API, so anytime you do anything with that API, you're using async_request.

So clearly there's a requirement to do certain API calls on a regular basis to trigger one of those. If the event loop is ONLY waiting on websocket events, then those don't happen, and the refresh never happens.

Correct: the API object doesn't check the token's validity if you're not making calls to the API.

Which is, at a minimum, a deficiency in the documentation. At least now I know.

I'll implement a new test routine that calls an API update on a schedule and see how that works.

Not calling the API and expecting token management to still work is not something I would have ever thought to document. ๐Ÿคท๐Ÿป If you feel differently, docs PRs are always welcome.

Take this scenario: App calls and gets all the current system info, then waits for events to get sent via the websocket, or the user to send a command to the system. That requires both a keep-alive loop for the websocket, and a keep-alive of some sort for the auth token. Neither of those are in the docs or the example code. The websocket keep-alive is obvious in the HASS component. But the auth token is not. I expect that's because HASS is doing some sort of polling of the system data, perhaps looking for additional systems or sensors to become active. But it's not obvious that the polling is required.

I'll probably do a PR after I get my code working correctly.

Thanks for that context. You are right: HASS employs a mixture of watching for websocket events and regular polling of the REST API; since that's the use case I initially built this library for, I never considered other possibilities (like yours).

We can undoubtedly document the way things are (I appreciate your willingness to help); simultaneously, I'll think of ways to modify the current architecture to support your use case (perhaps lessening the need to document). One possibility is to preemptively refresh the token every hour (vs. โ€“ or in addition to โ€“ the current approach of refreshing it when a REST API call fails).

I haven't see the raw data returned by the server when the auth token is obtained. In most cases I'm used, to the server returns an expiration time or delta along with the Auth and refresh tokens. In those cases, I've always done the refresh at some percentage of the delta. Like for an X min expire, refresh at 0.75X to allow for retries, etc.

Also, I would highly recommend making a public version of _async_refresh_access_token() so the app developer can do their own refresh method.

Also, I would highly recommend making a public version of _async_refresh_access_token() so the app developer can do their own refresh method.

Why would that be needed? The underlying REST API mechanics aren't going to change and the callback mechanism allows developers to add their own post-refresh logic.

I mean refresh timing, not method. That is, do the refresh when they think it's appropriate. But if you implement the timed refresh then it's not a big deal.

Thinking further, I like how your idea allows the app dev to refresh when they want. That's the easiest way to start.

FYI, even though I closed this with #356, feel free to keep chatting if new questions come up.