You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
61 lines
2.0 KiB
61 lines
2.0 KiB
2 years ago
|
"""Custom data update coordinator for the GitHub integration."""
|
||
|
from __future__ import annotations
|
||
|
|
||
|
from typing import Any
|
||
|
|
||
|
import giteapy
|
||
|
|
||
|
from giteapy.rest import ApiException
|
||
|
|
||
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||
|
from homeassistant.core import HomeAssistant
|
||
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||
|
|
||
|
from .const import FALLBACK_UPDATE_INTERVAL, LOGGER
|
||
|
|
||
|
class GiteaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||
|
"""Data update coordinator for the Gitea integration."""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
hass: HomeAssistant,
|
||
|
client: giteapy.RepositoryApi,
|
||
|
repository: str,
|
||
|
) -> None:
|
||
|
"""Initialize GitHub data update coordinator base class."""
|
||
|
self.repository = repository
|
||
|
self._client = client
|
||
|
self._last_response: giteapy.Repository | None = None
|
||
|
self._subscription_id: str | None = None
|
||
|
self.data = {}
|
||
|
|
||
|
super().__init__(
|
||
|
hass,
|
||
|
LOGGER,
|
||
|
name=repository,
|
||
|
update_interval=FALLBACK_UPDATE_INTERVAL,
|
||
|
)
|
||
|
|
||
|
async def _async_update_data(self) -> giteapy.Repository:
|
||
|
"""Update data."""
|
||
|
owner, repository = self.repository.split("/")
|
||
|
try:
|
||
|
response = self._client.repo_get(owner, repository)
|
||
|
except ApiException as exception:
|
||
|
# These are unexpected and we log the trace to help with troubleshooting
|
||
|
LOGGER.exception(exception)
|
||
|
raise UpdateFailed(exception) from exception
|
||
|
else:
|
||
|
self._last_response = response
|
||
|
return response
|
||
|
|
||
|
async def _handle_event(self, event) -> None:
|
||
|
"""Handle an event."""
|
||
|
LOGGER.info("Handling event: %s", event)
|
||
|
await self.async_request_refresh()
|
||
|
|
||
|
@staticmethod
|
||
|
async def _handle_error(error: ApiException) -> None:
|
||
|
"""Handle an error."""
|
||
|
LOGGER.error("An error occurred while processing new events - %s", error)
|