|
@@ -0,0 +1,375 @@
|
|
|
+"""Support to check for available updates."""
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import asyncio, logging, async_timeout, os, voluptuous as vol
|
|
|
+
|
|
|
+from awesomeversion import AwesomeVersion
|
|
|
+from datetime import timedelta
|
|
|
+
|
|
|
+from homeassistant.const import __version__ as current_version
|
|
|
+from homeassistant.helpers import discovery, update_coordinator
|
|
|
+from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
|
+import homeassistant.helpers.config_validation as cv
|
|
|
+
|
|
|
+_LOGGER = logging.getLogger(__name__)
|
|
|
+_log_wmsg = """
|
|
|
+NOTE! Using a replacement (custom) 'updater' component for the snap package.
|
|
|
+Do NOT report bugs of any kind related to 'updater' to the Home Assistant core project.
|
|
|
+Report any issues at https://github.com/home-assistant-snap/home-assistant-snap/issues"""
|
|
|
+
|
|
|
+ATTR_RELEASE_NOTES = "release_notes"
|
|
|
+ATTR_UPDATE_NOTES = "update_notes"
|
|
|
+ATTR_NEWEST_VERSION = "newest_version"
|
|
|
+
|
|
|
+# Keeping for consistency (we're overriding)
|
|
|
+CONF_REPORTING = "reporting"
|
|
|
+CONF_COMPONENT_REPORTING = "include_used_components"
|
|
|
+
|
|
|
+DOMAIN = "updater"
|
|
|
+
|
|
|
+UPDATER_URL = "https://api.snapcraft.io/v2/snaps/info/home-assistant-snap?architecture=%s&fields=channel-map,revision,version"
|
|
|
+
|
|
|
+RESPONSE_SCHEMA = vol.Schema(
|
|
|
+ {
|
|
|
+ vol.Required("channel-map"): cv.ensure_list,
|
|
|
+ vol.Required("default-track"): cv.string,
|
|
|
+ },
|
|
|
+ extra=vol.REMOVE_EXTRA,
|
|
|
+)
|
|
|
+
|
|
|
+class Channel:
|
|
|
+ def __init__(self, track: Track, channel: dict, revision: int, version: str):
|
|
|
+ self.__arch = channel['architecture']
|
|
|
+ self.__risk = 0
|
|
|
+ self.__track = track
|
|
|
+
|
|
|
+ self.__risks = {'stable': 4, 'candidate': 3, 'beta': 2, 'edge': 1}
|
|
|
+
|
|
|
+ if channel['risk'] in self.__risks:
|
|
|
+ self.__risk = self.__risks[channel['risk']]
|
|
|
+
|
|
|
+ self.__revision = revision
|
|
|
+ self.__version = AwesomeVersion(version)
|
|
|
+
|
|
|
+ def __str__(self) -> str:
|
|
|
+ return f"{str(self.__track)}/{self.get_risk()}"
|
|
|
+
|
|
|
+ def __repr__(self) -> str:
|
|
|
+ return f"{self.__version}, revision: {self.__revision}, channel: {str(self)}"
|
|
|
+
|
|
|
+ def get_track(self) -> Track:
|
|
|
+ return self.__track
|
|
|
+
|
|
|
+ def get_risk(self, as_str: bool = True) -> str|int:
|
|
|
+
|
|
|
+ if not as_str:
|
|
|
+ return self.__risk
|
|
|
+
|
|
|
+ for v,k in self.__risks.items():
|
|
|
+ if k == self.__risk:
|
|
|
+ return v
|
|
|
+
|
|
|
+ return self.__risk
|
|
|
+
|
|
|
+ def get_revision(self) -> int:
|
|
|
+ return self.__revision
|
|
|
+
|
|
|
+ def get_version(self) -> AwesomeVersion:
|
|
|
+ return self.__version
|
|
|
+
|
|
|
+ def __gt__(self, other: Channel) -> int:
|
|
|
+ return self.__revision > other.get_revision() and self.__risk >= other.get_risk(False)
|
|
|
+
|
|
|
+class Track:
|
|
|
+ def __init__(self, track: str):
|
|
|
+ self.__track = AwesomeVersion(track)
|
|
|
+ self.__channels = []
|
|
|
+
|
|
|
+ def get_track(self) -> str:
|
|
|
+ return self.__track
|
|
|
+
|
|
|
+ def __eq__(self, other: Track|str) -> bool:
|
|
|
+ if isinstance(other, Track):
|
|
|
+ return self.get_track() == other.get_track()
|
|
|
+
|
|
|
+ return self.get_track() == other
|
|
|
+
|
|
|
+ def add_channel(self, channel) -> Track:
|
|
|
+ self.__channels.append(Channel(self, channel['channel'], channel['revision'], channel['version']))
|
|
|
+ self.__channels.sort(key=lambda x: x.get_risk(False))
|
|
|
+ return self
|
|
|
+
|
|
|
+ def get_channels(self) -> list:
|
|
|
+ return self.__channels
|
|
|
+
|
|
|
+ def channel_with_revision(self, revision: int) -> Channel|None:
|
|
|
+ for channel in self.__channels:
|
|
|
+ if channel.get_revision() == revision:
|
|
|
+ return channel
|
|
|
+ return None
|
|
|
+
|
|
|
+ def channel_with_higher_revision(self, channel: Channel) -> Channel|None:
|
|
|
+ newest = channel
|
|
|
+ for channel in self.__channels:
|
|
|
+ if channel > newest:
|
|
|
+ newest = channel
|
|
|
+ return newest
|
|
|
+
|
|
|
+ def get_latest(self) -> Channel|None:
|
|
|
+ if len(self.__channels) == 0:
|
|
|
+ return None
|
|
|
+ return self.__channels[len(self.__channels)-1]
|
|
|
+
|
|
|
+ def __repr__(self) -> str:
|
|
|
+ risks = []
|
|
|
+ for channel in self.__channels:
|
|
|
+ risks.append(f"{channel.get_risk()}/{channel.get_revision()}")
|
|
|
+
|
|
|
+ return f"{self.__track}({', '.join(risks)})"
|
|
|
+
|
|
|
+ def __str__(self) -> str:
|
|
|
+ return str(self.__track)
|
|
|
+
|
|
|
+class Tracks:
|
|
|
+ def __init__(self, channel_map: list) -> None:
|
|
|
+ self.__tracks = []
|
|
|
+
|
|
|
+ for channel in channel_map:
|
|
|
+ track = self.get_track(channel['channel']['track'])
|
|
|
+
|
|
|
+ if track is not None:
|
|
|
+ track.add_channel(channel)
|
|
|
+ else:
|
|
|
+ track = Track(channel['channel']['track'])
|
|
|
+ track.add_channel(channel)
|
|
|
+
|
|
|
+ self.__tracks.append(track)
|
|
|
+
|
|
|
+ self.__tracks.sort(key=lambda x: x.get_track())
|
|
|
+
|
|
|
+ def get_latest(self) -> Track|None:
|
|
|
+ self.__tracks.reverse()
|
|
|
+ latest = None
|
|
|
+ for track in self.__tracks:
|
|
|
+ if len(track.get_channels()) != 0 and track.get_latest().get_risk() == 'latest':
|
|
|
+ latest = track.get_latest()
|
|
|
+ break
|
|
|
+ self.__tracks.reverse()
|
|
|
+ return latest
|
|
|
+
|
|
|
+ def get_track(self, track: str) -> Track|None:
|
|
|
+ for t in self.__tracks:
|
|
|
+ if t == track:
|
|
|
+ return t
|
|
|
+ return None
|
|
|
+
|
|
|
+ def find_for_revision(self, revision: int) -> Channel|None:
|
|
|
+ for track in self.__tracks:
|
|
|
+ channel = track.channel_with_revision(revision)
|
|
|
+ if channel is not None:
|
|
|
+ return channel
|
|
|
+ return None
|
|
|
+
|
|
|
+ def channel_with_higher_revision(self, channel: Channel) -> Channel|None:
|
|
|
+ for track in self.__tracks:
|
|
|
+ for chan in track.get_channels():
|
|
|
+ if chan > channel:
|
|
|
+ return chan
|
|
|
+ return None
|
|
|
+
|
|
|
+ def track_with_lower_revision(self, revision: int) -> Channel|None:
|
|
|
+ closest = None
|
|
|
+ for track in self.__tracks:
|
|
|
+ for chan in track.get_channels():
|
|
|
+ if chan.get_revision() < revision:
|
|
|
+ if closest is not None and closest < chan:
|
|
|
+ closest = chan
|
|
|
+ else:
|
|
|
+ closest = chan
|
|
|
+ return closest
|
|
|
+
|
|
|
+ def __str__(self) -> str:
|
|
|
+ tracks = []
|
|
|
+
|
|
|
+ for track in self.__tracks:
|
|
|
+ tracks.append(str(track))
|
|
|
+ return ", ".join(tracks)
|
|
|
+
|
|
|
+ def __repr__(self) -> str:
|
|
|
+ return self.__str__()
|
|
|
+
|
|
|
+class Updater:
|
|
|
+ """ Updater class for data exchange."""
|
|
|
+
|
|
|
+ def __init__(self, update_available: bool, default: Channel, current: Channel|None, newer: Channel|None, update_notes: str) -> None:
|
|
|
+
|
|
|
+ self.update_available = update_available
|
|
|
+ self.newest_version = str(newer.get_version()) if newer is not None else str(default.get_version())
|
|
|
+
|
|
|
+ self.update_notes = update_notes
|
|
|
+
|
|
|
+ if isinstance(newer, Channel) and default > newer:
|
|
|
+ self.update_notes += f"\n\nLatest channel is: _«{repr(default)}»_."
|
|
|
+ elif newer is None and isinstance(current, Channel) and default > current:
|
|
|
+ self.update_notes += f"\n\nLatest channel is: _«{repr(default)}»_. Upgrade with: `snap switch home-assistant-snap --channel={default}`"
|
|
|
+ elif current is None and newer is None:
|
|
|
+ self.update_notes += f"\n\nLatest channel is: _«{repr(default)}»_."
|
|
|
+
|
|
|
+ if update_available:
|
|
|
+ _LOGGER.info("UPDATE AVAILABLE: %s, newer: %s, current: %s, default: %s, notes: %s", update_available, newer, current, default, self.update_notes)
|
|
|
+
|
|
|
+ self.release_notes = "https://www.home-assistant.io/blog/categories/core/"
|
|
|
+
|
|
|
+async def async_setup(hass, config):
|
|
|
+
|
|
|
+ conf = config.get(DOMAIN, {})
|
|
|
+
|
|
|
+ _LOGGER.warning(_log_wmsg)
|
|
|
+
|
|
|
+ # Keeping for consistency (we're overriding)
|
|
|
+ for option in (CONF_COMPONENT_REPORTING, CONF_REPORTING):
|
|
|
+ if option in conf:
|
|
|
+ _LOGGER.warning(
|
|
|
+ "Analytics reporting with the option '%s' "
|
|
|
+ "is deprecated and you should remove that from your configuration. "
|
|
|
+ "The analytics part of this integration has moved to the new 'analytics' integration",
|
|
|
+ option,
|
|
|
+ )
|
|
|
+
|
|
|
+ async def check_new_version() -> Updater:
|
|
|
+
|
|
|
+ _LOGGER.warning(_log_wmsg)
|
|
|
+ snap_rev = os.getenv('SNAP_REVISION')
|
|
|
+
|
|
|
+ tracks, default_track = await get_versions(hass)
|
|
|
+
|
|
|
+ if snap_rev is None:
|
|
|
+ if AwesomeVersion(current_version).dev:
|
|
|
+ snap_rev = 295
|
|
|
+ else:
|
|
|
+ raise update_coordinator.UpdateFailed(Exception("Missing SNAP_REVISION environment variable."))
|
|
|
+
|
|
|
+ if f"{snap_rev[0] if type(snap_rev) is str else 'y'}" == 'x':
|
|
|
+ c_v = AwesomeVersion(current_version)
|
|
|
+ track = tracks.get_track(f"{c_v.section(0)}.{c_v.section(1)}")
|
|
|
+ if track is not None and len(track.get_channels()) != 0:
|
|
|
+ snap_rev = track.get_latest().get_revision()
|
|
|
+ else:
|
|
|
+ snap_rev = default_track.get_latest().get_revision()
|
|
|
+
|
|
|
+ snap_rev = int(snap_rev)
|
|
|
+ current_channel = tracks.find_for_revision(snap_rev)
|
|
|
+
|
|
|
+ if current_channel is not None:
|
|
|
+ newer_channel = current_channel.get_track().channel_with_higher_revision(current_channel)
|
|
|
+ if newer_channel is not None and newer_channel > current_channel:
|
|
|
+ return Updater(True, default_track.get_latest(), current_channel, newer_channel,
|
|
|
+ f"You're currently on _«{repr(current_channel)}»_ and can upgrade to _«{repr(newer_channel)}»_. "
|
|
|
+ f"Update with `sudo snap switch home-assistant-snap --channel={newer_channel}`."
|
|
|
+ )
|
|
|
+
|
|
|
+ newer_channel = tracks.channel_with_higher_revision(current_channel)
|
|
|
+ if newer_channel is not None and newer_channel > current_channel:
|
|
|
+ return Updater(True, default_track.get_latest(), current_channel, newer_channel,
|
|
|
+ f"You're currently on _«{repr(current_channel)}»_ and can upgrade to _«{repr(newer_channel)}»_. "
|
|
|
+ f"Update with `sudo snap switch home-assistant-snap --channel={newer_channel}`."
|
|
|
+ )
|
|
|
+
|
|
|
+ return Updater(False, default_track.get_latest(), None, current_channel, f"You're on _«{repr(current_channel)}»_!")
|
|
|
+ else:
|
|
|
+ c_v = AwesomeVersion(current_version)
|
|
|
+ track = tracks.get_track(f"{c_v.section(0)}.{c_v.section(1)}")
|
|
|
+
|
|
|
+ if track is not None and track.get_latest() is not None:
|
|
|
+ current_channel = track.get_latest()
|
|
|
+ newer_channel = tracks.channel_with_higher_revision(current_channel)
|
|
|
+
|
|
|
+ if newer_channel is not None and newer_channel > current_channel:
|
|
|
+ return Updater(True, default_track.get_latest(), current_channel, newer_channel,
|
|
|
+ f"Unknown revision «{snap_rev}», assuming on any channel for {current_channel.get_track()}. The snap package "
|
|
|
+ f"should automatically update, but you can also upgrade to _«{repr(newer_channel)}»_ with: "
|
|
|
+ f"`sudo snap switch home-assistant-snap --channel={newer_channel}`."
|
|
|
+ )
|
|
|
+
|
|
|
+ return Updater(True, default_track.get_latest(), current_channel, None,
|
|
|
+ f"Unknown revision «{snap_rev}», assuming on any channel for track {current_channel.get_track()}. The snap package "
|
|
|
+ f"should automatically update, but double check that the channel is not closed. You can force the update with: "
|
|
|
+ f"`sudo snap refresh home-assistant-snap` and find channels with: `sudo info home-assistant-snap`."
|
|
|
+ )
|
|
|
+
|
|
|
+ older_track = tracks.track_with_lower_revision(snap_rev)
|
|
|
+ if older_track is not None:
|
|
|
+ newer_channel = tracks.channel_with_higher_revision(older_track)
|
|
|
+ if newer_channel is not None:
|
|
|
+ newer_channel = newer_channel.get_track().get_latest()
|
|
|
+ return Updater(True, default_track.get_latest(), None, newer_channel,
|
|
|
+ f"No channel found for {c_v.section(0)}.{c_v.section(1)}, it might have been deleted - and you will not receive updates. "
|
|
|
+ f"A newer channel _«{repr(newer_channel)}»_ is available! "
|
|
|
+ f"You can switch with: `sudo snap refresh home-assistant-snap --channel={newer_channel}`."
|
|
|
+ )
|
|
|
+
|
|
|
+ return Updater(True, default_track.get_latest(), None, None,
|
|
|
+ f"No channel found for «{snap_rev}» ({c_v.section(0)}.{c_v.section(1)}). "
|
|
|
+ f"Please consult `snap info home-assistant-snap` to find a suitable track to upgrade to, "
|
|
|
+ f"and switch channel with: `snap switch home-assistant-snap --channel=<channel>`."
|
|
|
+ )
|
|
|
+
|
|
|
+ coordinator = hass.data[DOMAIN] = update_coordinator.DataUpdateCoordinator[Updater](
|
|
|
+ hass,
|
|
|
+ _LOGGER,
|
|
|
+ name="Home Assistant Snap update",
|
|
|
+ update_method=check_new_version,
|
|
|
+ update_interval=timedelta(seconds=45)
|
|
|
+ )
|
|
|
+
|
|
|
+ asyncio.create_task(coordinator.async_refresh())
|
|
|
+
|
|
|
+ hass.async_create_task(
|
|
|
+ discovery.async_load_platform(hass, 'binary_sensor', DOMAIN, {}, config)
|
|
|
+ )
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+from urllib.parse import urlparse
|
|
|
+
|
|
|
+async def get_versions(hass):
|
|
|
+
|
|
|
+ session = async_get_clientsession(hass)
|
|
|
+ snap_arch = os.getenv('SNAP_ARCH')
|
|
|
+
|
|
|
+ if snap_arch is None:
|
|
|
+ if AwesomeVersion(current_version).dev:
|
|
|
+ snap_arch = "amd64"
|
|
|
+ else:
|
|
|
+ raise update_coordinator.UpdateFailed(Exception("Missing SNAP_ARCH environment variable."))
|
|
|
+
|
|
|
+ with async_timeout.timeout(30):
|
|
|
+ req = await session.get(UPDATER_URL % snap_arch, headers={
|
|
|
+ 'Snap-Device-Series': '16'
|
|
|
+ })
|
|
|
+
|
|
|
+ try:
|
|
|
+ res = await req.json()
|
|
|
+ except ValueError as err:
|
|
|
+ raise update_coordinator.UpdateFailed(
|
|
|
+ f"Received invalid JSON from {urlparse(UPDATER_URL).netloc}"
|
|
|
+ ) from err
|
|
|
+
|
|
|
+ try:
|
|
|
+ res = RESPONSE_SCHEMA(res)
|
|
|
+
|
|
|
+ tracks = Tracks(res['channel-map'])
|
|
|
+ default_track = res['default-track'] if 'default-track' in res else None
|
|
|
+
|
|
|
+ if default_track is None:
|
|
|
+ default_track = tracks.get_latest()
|
|
|
+ else:
|
|
|
+ default_track = tracks.get_track(default_track)
|
|
|
+
|
|
|
+ return [tracks, default_track]
|
|
|
+
|
|
|
+ except vol.Invalid as err:
|
|
|
+ raise update_coordinator.UpdateFailed(
|
|
|
+ f"Got unexepected response: {err}"
|
|
|
+ ) from err
|
|
|
+
|