Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pyhilo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from pyhilo.device.switch import Switch
from pyhilo.devices import Devices
from pyhilo.event import Event
from pyhilo.exceptions import HiloError, InvalidCredentialsError, WebsocketError
from pyhilo.exceptions import HiloError, InvalidCredentialsError, SignalRError
from pyhilo.signalr import SignalREvent
from pyhilo.util import from_utc_timestamp, time_diff
from pyhilo.websocket import WebsocketEvent

__all__ = [
"API",
Expand All @@ -17,10 +17,10 @@
"Event",
"HiloError",
"InvalidCredentialsError",
"WebsocketError",
"SignalRError",
"from_utc_timestamp",
"time_diff",
"WebsocketEvent",
"SignalREvent",
"UNMONITORED_DEVICES",
"Switch",
]
116 changes: 22 additions & 94 deletions pyhilo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,8 @@
)
from pyhilo.device import DeviceAttribute, HiloDevice, get_device_attributes
from pyhilo.exceptions import InvalidCredentialsError, RequestError
from pyhilo.util.state import (
StateDict,
WebsocketDict,
WebsocketTransportsDict,
get_state,
set_state,
)
from pyhilo.websocket import WebsocketClient, WebsocketManager
from pyhilo.signalr import SignalRHub, SignalRManager
from pyhilo.util.state import AndroidDeviceDict, StateDict, get_state, set_state


class API:
Expand All @@ -74,27 +68,19 @@ def __init__(
) -> None:
"""Initialize"""
self._backoff_refresh_lock_api = asyncio.Lock()
self._backoff_refresh_lock_ws = asyncio.Lock()
self._request_retries = request_retries
self._state_yaml: str = DEFAULT_STATE_FILE
self.state: StateDict = {}
self.async_request = self._wrap_request_method(self._request_retries)
self.device_attributes = get_device_attributes()
self.session: ClientSession = session
self._oauth_session = oauth_session
self.websocket_devices: WebsocketClient
# Backward compatibility during transition to websocket for challenges. Currently the HA Hilo integration
# uses the .websocket attribute. Re-added this attribute and point to the same object as websocket_devices.
# Should be removed once the transition to the challenge websocket is completed everywhere.
self.websocket: WebsocketClient
self.websocket_challenges: WebsocketClient
self.signalr_devices: SignalRHub
self.signalr_challenges: SignalRHub
self.log_traces = log_traces
self._get_device_callbacks: list[Callable[..., Any]] = []
self.ws_url: str = ""
self.ws_token: str = ""
self.endpoint: str = ""
self._urn: str | None = None
# Device cache from websocket DeviceListInitialValuesReceived
# Device cache from SignalR DeviceListInitialValuesReceived
self._device_cache: list[dict[str, Any]] = []
self._device_cache_event: asyncio.Event = asyncio.Event()

Expand Down Expand Up @@ -146,7 +132,7 @@ async def async_get_access_token(self) -> str:
await self._oauth_session.async_ensure_token_valid()

access_token = str(self._oauth_session.token["access_token"])
LOG.debug("Websocket access token is %s", access_token)
LOG.debug("SignalR access token is %s", access_token)

urn = self.urn
LOG.debug("Extracted URN: %s", urn)
Expand Down Expand Up @@ -356,19 +342,7 @@ async def _async_handle_on_backoff(self, _: dict[str, Any]) -> None:
err: ClientResponseError = err_info[1].with_traceback(err_info[2]) # type: ignore

if err.status in (401, 403):
LOG.warning("Refreshing websocket token %s", err.request_info.url)
if (
"client/negotiate" in str(err.request_info.url)
and err.request_info.method == "POST"
):
LOG.info(
"401 detected on websocket, refreshing websocket token. Old url: {self.ws_url} Old Token: {self.ws_token}"
)
LOG.info("401 detected on %s", err.request_info.url)
async with self._backoff_refresh_lock_ws:
await self.refresh_ws_token()
await self.get_websocket_params()
return
LOG.warning("Refreshing API token on %s", err.request_info.url)

@staticmethod
def _handle_on_giveup(_: dict[str, Any]) -> None:
Expand Down Expand Up @@ -413,57 +387,13 @@ def enable_request_retries(self) -> None:

async def _async_post_init(self) -> None:
"""Perform some post-init actions."""
LOG.debug("Websocket _async_post_init running")
LOG.debug("SignalR _async_post_init running")
await self._get_fid()
await self._get_device_token()

# Initialize WebsocketManager ic-dev21
self.websocket_manager = WebsocketManager(
self.session, self.async_request, self._state_yaml, set_state
)
await self.websocket_manager.initialize_websockets()

# Create both websocket clients
# ic-dev21 need to work on this as it can't lint as is, may need to
# instantiate differently
# TODO: fix type ignore after refactor
self.websocket_devices = WebsocketClient(self.websocket_manager.devicehub) # type: ignore

# For backward compatibility during the transition to challengehub websocket
self.websocket = self.websocket_devices
self.websocket_challenges = WebsocketClient(self.websocket_manager.challengehub) # type: ignore

async def refresh_ws_token(self) -> None:
"""Refresh the websocket token."""
await self.websocket_manager.refresh_token(self.websocket_manager.devicehub)
await self.websocket_manager.refresh_token(self.websocket_manager.challengehub)

async def get_websocket_params(self) -> None:
"""Retrieves and constructs WebSocket connection parameters from the negotiation endpoint."""
uri = parse.urlparse(self.ws_url)
LOG.debug("Getting websocket params")
LOG.debug("Getting uri %s", uri)
resp: dict[str, Any] = await self.async_request(
"post",
f"{uri.path}negotiate?{uri.query}",
host=uri.netloc,
headers={
"authorization": f"Bearer {self.ws_token}",
},
)
conn_id: str = resp.get("connectionId", "")
self.full_ws_url = f"{self.ws_url}&id={conn_id}&access_token={self.ws_token}"
LOG.debug("Getting full ws URL %s", self.full_ws_url)
transport_dict: list[WebsocketTransportsDict] = resp.get(
"availableTransports", []
)
websocket_dict: WebsocketDict = {
"connection_id": conn_id,
"available_transports": transport_dict,
"full_ws_url": self.full_ws_url,
}
LOG.debug("Calling set_state from get_websocket_params")
await set_state(self._state_yaml, "websocket", websocket_dict)
signalr_manager = SignalRManager(self.async_request)
self.signalr_devices = signalr_manager.build_hub("/DeviceHub")
self.signalr_challenges = signalr_manager.build_hub("/ChallengeHub")

async def fb_install(self, fb_id: str) -> None:
"""Registers a Firebase installation and stores the authentication token state."""
Expand Down Expand Up @@ -535,9 +465,7 @@ async def android_register(self) -> None:
await set_state(
self._state_yaml,
"android",
{
"token": token,
},
cast(AndroidDeviceDict, {"token": token}),
)

async def get_location_ids(self) -> tuple[int, str]:
Expand All @@ -548,26 +476,26 @@ async def get_location_ids(self) -> tuple[int, str]:
return (req[0]["id"], req[0]["locationHiloId"])

def set_device_cache(self, devices: list[dict[str, Any]]) -> None:
"""Store devices received from websocket DeviceListInitialValuesReceived.
"""Store devices received from SignalR DeviceListInitialValuesReceived.

This replaces the old REST API get_devices call. The websocket sends
This replaces the old REST API get_devices call. SignalR sends
device data with list-type attributes (supportedAttributesList, etc.)
which need to be converted to comma-separated strings to match the
format that HiloDevice.update() expects.
"""
self._device_cache = [self._convert_ws_device(device) for device in devices]
LOG.debug(
"Device cache populated with %d devices from websocket",
"Device cache populated with %d devices from SignalR",
len(self._device_cache),
)
self._device_cache_event.set()

@staticmethod
def _convert_ws_device(ws_device: dict[str, Any]) -> dict[str, Any]:
"""Convert a websocket device dict to the format generate_device expects.
"""Convert a SignalR device dict to the format generate_device expects.

The REST API returned supportedAttributes/settableAttributes as
comma-separated strings. The websocket returns supportedAttributesList/
comma-separated strings. SignalR returns supportedAttributesList/
settableAttributesList/supportedParametersList as Python lists.
We convert to the old format so HiloDevice.update() works unchanged.
"""
Expand All @@ -590,25 +518,25 @@ def _convert_ws_device(ws_device: dict[str, Any]) -> dict[str, Any]:
return device

async def wait_for_device_cache(self, timeout: float = 30.0) -> None:
"""Wait for the device cache to be populated from websocket.
"""Wait for the device cache to be populated from SignalR.

:param timeout: Maximum time to wait in seconds
:raises TimeoutError: If the device cache is not populated in time
"""
if self._device_cache_event.is_set():
return
LOG.debug("Waiting for device cache from websocket (timeout=%ss)", timeout)
LOG.debug("Waiting for device cache from SignalR (timeout=%ss)", timeout)
try:
await asyncio.wait_for(self._device_cache_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
LOG.error(
"Timed out waiting for device list from websocket after %ss",
"Timed out waiting for device list from SignalR after %ss",
timeout,
)
raise

def get_device_cache(self, location_id: int) -> list[dict[str, Any]]:
"""Return cached devices from websocket.
"""Return cached devices from SignalR.

:param location_id: Hilo location id (unused but kept for interface compat)
:return: List of device dicts ready for generate_device()
Expand All @@ -618,7 +546,7 @@ def get_device_cache(self, location_id: int) -> list[dict[str, Any]]:
def add_to_device_cache(self, devices: list[dict[str, Any]]) -> None:
"""Append new devices to the existing cache (e.g. from DeviceAdded).

Converts websocket format and adds to the cache without replacing
Converts SignalR format and adds to the cache without replacing
existing entries. Skips devices already in cache (by id).
"""
existing_ids = {d.get("id") for d in self._device_cache}
Expand Down
14 changes: 7 additions & 7 deletions pyhilo/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def all(self) -> list[HiloDevice]:

@property
def attributes_list(self) -> list[Union[int, dict[int, list[str]]]]:
"""This is sent to websocket to subscribe to the device attributes updates
"""This is sent to the SignalR hub to subscribe to the device attributes updates

:return: Dict of devices (key) with their attributes.
:rtype: list
Expand Down Expand Up @@ -99,8 +99,8 @@ def generate_device(self, device: dict) -> HiloDevice:
return dev

async def update(self) -> None:
"""Update device list from websocket cache + gateway from REST."""
# Get devices from websocket cache (already populated by DeviceListInitialValuesReceived)
"""Update device list from SignalR cache + gateway from REST."""
# Get devices from SignalR cache (already populated by DeviceListInitialValuesReceived)
cached_devices = self._api.get_device_cache(self.location_id)
generated_devices = []
for raw_device in cached_devices:
Expand Down Expand Up @@ -140,7 +140,7 @@ async def update(self) -> None:
async def update_devicelist_from_signalr(
self, values: list[dict[str, Any]]
) -> list[HiloDevice]:
"""Process device list received from SignalR websocket.
"""Process device list received from SignalR hub.

This is called when DeviceListInitialValuesReceived arrives.
It populates the API device cache and generates HiloDevice objects.
Expand All @@ -161,7 +161,7 @@ async def update_devicelist_from_signalr(
async def add_device_from_signalr(
self, values: list[dict[str, Any]]
) -> list[HiloDevice]:
"""Process individual device additions from SignalR websocket.
"""Process individual device additions from SignalR hub.

This is called when DeviceAdded arrives. It appends to the existing
cache rather than replacing it.
Expand All @@ -181,7 +181,7 @@ async def add_device_from_signalr(
async def async_init(self) -> None:
"""Initialize the Hilo "manager" class.

Gets location IDs from REST API, then waits for the websocket
Gets location IDs from REST API, then waits for the SignalR hub
to deliver the device list via DeviceListInitialValuesReceived.
The gateway is appended from REST.
"""
Expand All @@ -190,5 +190,5 @@ async def async_init(self) -> None:
self.location_id = location_ids[0]
self.location_hilo_id = location_ids[1]
# Device list will be populated when DeviceListInitialValuesReceived
# arrives on the websocket. The hilo integration's async_init will
# arrives on the SignalR hub. The hilo integration's async_init will
# call wait_for_device_cache() and then update() after subscribing.
24 changes: 12 additions & 12 deletions pyhilo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,37 @@ class RequestError(HiloError):
pass


class WebsocketError(HiloError):
"""An error related to generic websocket errors."""
class SignalRError(HiloError):
"""An error related to generic SignalR errors."""

pass


class CannotConnectError(WebsocketError):
"""Define a error when the websocket can't be connected to."""
class CannotConnectError(SignalRError):
"""Define a error when the SignalR hub can't be connected to."""

pass


class ConnectionClosedError(WebsocketError):
"""Define a error when the websocket closes unexpectedly."""
class ConnectionClosedError(SignalRError):
"""Define a error when the SignalR hub closes unexpectedly."""

pass


class ConnectionFailedError(WebsocketError):
"""Define a error when the websocket connection fails."""
class ConnectionFailedError(SignalRError):
"""Define a error when the SignalR connection fails."""

pass


class InvalidMessageError(WebsocketError):
"""Define a error related to an invalid message from the websocket server."""
class InvalidMessageError(SignalRError):
"""Define a error related to an invalid message from the SignalR server."""

pass


class NotConnectedError(WebsocketError):
"""Define a error when the websocket isn't properly connected to."""
class NotConnectedError(SignalRError):
"""Define a error when the SignalR hub isn't properly connected to."""

pass
Loading