Garrit Franke
2 years ago
4 changed files with 164 additions and 1 deletions
@ -1 +1,87 @@ |
|||||||
"""The Gitea integration.""" |
"""The Gitea integration.""" |
||||||
|
|
||||||
|
from homeassistant.config_entries import ConfigEntry |
||||||
|
from homeassistant.const import CONF_ACCESS_TOKEN, Platform |
||||||
|
from homeassistant.core import HomeAssistant, callback |
||||||
|
from homeassistant.helpers import device_registry as dr |
||||||
|
from homeassistant.helpers.aiohttp_client import ( |
||||||
|
SERVER_SOFTWARE, |
||||||
|
async_get_clientsession, |
||||||
|
) |
||||||
|
|
||||||
|
import giteapy |
||||||
|
from giteapy import RepositoryApi |
||||||
|
|
||||||
|
from .const import CONF_REPOSITORIES, DOMAIN, LOGGER |
||||||
|
from .coordinator import GiteaDataUpdateCoordinator |
||||||
|
|
||||||
|
PLATFORMS: list[Platform] = [Platform.SENSOR] |
||||||
|
|
||||||
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: |
||||||
|
"""Set up GitHub from a config entry.""" |
||||||
|
hass.data.setdefault(DOMAIN, {}) |
||||||
|
|
||||||
|
configuration = giteapy.Configuration() |
||||||
|
configuration.api_key['access_token'] = entry.data[CONF_ACCESS_TOKEN] |
||||||
|
|
||||||
|
client = giteapy.RepositoryApi(giteapy.ApiClient(configuration)) |
||||||
|
|
||||||
|
repositories: list[str] = entry.options[CONF_REPOSITORIES] |
||||||
|
|
||||||
|
for repository in repositories: |
||||||
|
coordinator = GiteaDataUpdateCoordinator( |
||||||
|
hass=hass, |
||||||
|
client=client, |
||||||
|
repository=repository, |
||||||
|
) |
||||||
|
|
||||||
|
await coordinator.async_config_entry_first_refresh() |
||||||
|
|
||||||
|
hass.data[DOMAIN][repository] = coordinator |
||||||
|
|
||||||
|
async_cleanup_device_registry(hass=hass, entry=entry) |
||||||
|
|
||||||
|
hass.config_entries.async_setup_platforms(entry, PLATFORMS) |
||||||
|
entry.async_on_unload(entry.add_update_listener(async_reload_entry)) |
||||||
|
return True |
||||||
|
|
||||||
|
@callback |
||||||
|
def async_cleanup_device_registry( |
||||||
|
hass: HomeAssistant, |
||||||
|
entry: ConfigEntry, |
||||||
|
) -> None: |
||||||
|
"""Remove entries form device registry if we no longer track the repository.""" |
||||||
|
device_registry = dr.async_get(hass) |
||||||
|
devices = dr.async_entries_for_config_entry( |
||||||
|
registry=device_registry, |
||||||
|
config_entry_id=entry.entry_id, |
||||||
|
) |
||||||
|
for device in devices: |
||||||
|
for item in device.identifiers: |
||||||
|
if DOMAIN == item[0] and item[1] not in entry.options[CONF_REPOSITORIES]: |
||||||
|
LOGGER.debug( |
||||||
|
"Unlinking device %s for untracked repository %s from config entry %s", |
||||||
|
device.id, |
||||||
|
item[1], |
||||||
|
entry.entry_id, |
||||||
|
) |
||||||
|
device_registry.async_update_device( |
||||||
|
device.id, remove_config_entry_id=entry.entry_id |
||||||
|
) |
||||||
|
break |
||||||
|
|
||||||
|
|
||||||
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: |
||||||
|
"""Unload a config entry.""" |
||||||
|
repositories: dict[str, GiteaDataUpdateCoordinator] = hass.data[DOMAIN] |
||||||
|
for coordinator in repositories.values(): |
||||||
|
coordinator.unsubscribe() |
||||||
|
|
||||||
|
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): |
||||||
|
hass.data.pop(DOMAIN) |
||||||
|
return unload_ok |
||||||
|
|
||||||
|
|
||||||
|
async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None: |
||||||
|
"""Handle an options update.""" |
||||||
|
await hass.config_entries.async_reload(entry.entry_id) |
@ -0,0 +1,16 @@ |
|||||||
|
"""Constants for the Gitea integration.""" |
||||||
|
from __future__ import annotations |
||||||
|
|
||||||
|
from datetime import timedelta |
||||||
|
from logging import Logger, getLogger |
||||||
|
|
||||||
|
LOGGER: Logger = getLogger(__package__) |
||||||
|
|
||||||
|
DOMAIN = "gitea" |
||||||
|
|
||||||
|
DEFAULT_REPOSITORIES = ["Freeyourgadget/Gadgetbridge"] |
||||||
|
FALLBACK_UPDATE_INTERVAL = timedelta(hours=1, minutes=30) |
||||||
|
|
||||||
|
CONF_HOST = "https://codeberg.org/" |
||||||
|
CONF_ACCESS_TOKEN = "access_token" |
||||||
|
CONF_REPOSITORIES = "repositories" |
@ -0,0 +1,60 @@ |
|||||||
|
"""Custom data update coordinator for the GitHub integration.""" |
||||||
|
from __future__ import annotations |
||||||
|
|
||||||
|
from typing import Any |
||||||
|
|
||||||
|
import giteapy |
||||||
|
|
||||||
|
from giteapy.rest import ApiException |
||||||
|
|
||||||
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP |
||||||
|
from homeassistant.core import HomeAssistant |
||||||
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed |
||||||
|
|
||||||
|
from .const import FALLBACK_UPDATE_INTERVAL, LOGGER |
||||||
|
|
||||||
|
class GiteaDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): |
||||||
|
"""Data update coordinator for the Gitea integration.""" |
||||||
|
|
||||||
|
def __init__( |
||||||
|
self, |
||||||
|
hass: HomeAssistant, |
||||||
|
client: giteapy.RepositoryApi, |
||||||
|
repository: str, |
||||||
|
) -> None: |
||||||
|
"""Initialize GitHub data update coordinator base class.""" |
||||||
|
self.repository = repository |
||||||
|
self._client = client |
||||||
|
self._last_response: giteapy.Repository | None = None |
||||||
|
self._subscription_id: str | None = None |
||||||
|
self.data = {} |
||||||
|
|
||||||
|
super().__init__( |
||||||
|
hass, |
||||||
|
LOGGER, |
||||||
|
name=repository, |
||||||
|
update_interval=FALLBACK_UPDATE_INTERVAL, |
||||||
|
) |
||||||
|
|
||||||
|
async def _async_update_data(self) -> giteapy.Repository: |
||||||
|
"""Update data.""" |
||||||
|
owner, repository = self.repository.split("/") |
||||||
|
try: |
||||||
|
response = self._client.repo_get(owner, repository) |
||||||
|
except ApiException as exception: |
||||||
|
# These are unexpected and we log the trace to help with troubleshooting |
||||||
|
LOGGER.exception(exception) |
||||||
|
raise UpdateFailed(exception) from exception |
||||||
|
else: |
||||||
|
self._last_response = response |
||||||
|
return response |
||||||
|
|
||||||
|
async def _handle_event(self, event) -> None: |
||||||
|
"""Handle an event.""" |
||||||
|
LOGGER.info("Handling event: %s", event) |
||||||
|
await self.async_request_refresh() |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
async def _handle_error(error: ApiException) -> None: |
||||||
|
"""Handle an error.""" |
||||||
|
LOGGER.error("An error occurred while processing new events - %s", error) |
Loading…
Reference in new issue