diff --git a/tests/conftest.py b/tests/conftest.py index 990d3e1ba..8e0cbf572 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,6 +43,8 @@ class _FakeApp(ControllerApplication): + _ezsp = None + async def add_endpoint(self, descriptor: zdo_t.SimpleDescriptor): pass diff --git a/tests/test_coordinator_light.py b/tests/test_coordinator_light.py new file mode 100644 index 000000000..7d39d0be9 --- /dev/null +++ b/tests/test_coordinator_light.py @@ -0,0 +1,184 @@ +"""Test coordinator LED light support.""" + +from collections.abc import AsyncGenerator +from types import SimpleNamespace +from unittest.mock import AsyncMock + +from bellows.ezsp.xncp import FirmwareFeatures +import pytest + +from tests.common import get_entity +import tests.conftest as test_conftest +from zha.application import Platform +from zha.application.gateway import Gateway +from zha.application.platforms.light import ( + DEFAULT_COORDINATOR_LED_BRIGHTNESS, + DEFAULT_COORDINATOR_LED_XY_COLOR, + CoordinatorLED, + _xy_brightness_to_rgb, +) +from zha.application.platforms.light.const import ColorMode + + +@pytest.fixture +async def zbt2_gateway( + zha_data, + zigpy_app_controller, + caplog, # pylint: disable=unused-argument +) -> AsyncGenerator[Gateway, None]: + """Set up a ZBT-2-capable gateway.""" + zigpy_app_controller.state.node_info.manufacturer = "Nabu Casa" + zigpy_app_controller.state.node_info.model = "Home Assistant Connect ZBT-2" + zigpy_app_controller._ezsp = SimpleNamespace( + _xncp_features=FirmwareFeatures.LED_CONTROL, + xncp_set_led_state=AsyncMock(), + ) + + async with test_conftest.TestGateway(zha_data, zigpy_app_controller) as gateway: + yield gateway + + +async def test_coordinator_led_not_exposed_without_feature( + zha_gateway: Gateway, +) -> None: + """Test that the coordinator LED entity is not exposed without support.""" + with pytest.raises(KeyError): + get_entity( + zha_gateway.coordinator_zha_device, + platform=Platform.LIGHT, + exact_entity_type=CoordinatorLED, + ) + + +async def test_coordinator_led_entity(zbt2_gateway: Gateway) -> None: + """Test coordinator LED light discovery and control.""" + entity = get_entity( + zbt2_gateway.coordinator_zha_device, + platform=Platform.LIGHT, + exact_entity_type=CoordinatorLED, + ) + ezsp = zbt2_gateway.application_controller._ezsp + + assert entity.state["on"] is False + assert entity.state["color_mode"] == ColorMode.XY + assert entity.state["supported_color_modes"] == {ColorMode.XY} + + await entity.async_turn_on(brightness=128) + await zbt2_gateway.async_block_till_done() + + expected_red, expected_green, expected_blue = _xy_brightness_to_rgb( + DEFAULT_COORDINATOR_LED_XY_COLOR[0], + DEFAULT_COORDINATOR_LED_XY_COLOR[1], + 128, + ) + assert ezsp.xncp_set_led_state.await_args_list[0].kwargs == { + "red": expected_red, + "green": expected_green, + "blue": expected_blue, + } + assert entity.state["on"] is True + assert entity.state["brightness"] == 128 + assert entity.state["color_mode"] == ColorMode.XY + + await entity.async_turn_off() + await zbt2_gateway.async_block_till_done() + + assert ezsp.xncp_set_led_state.await_args_list[1].kwargs == { + "red": 0, + "green": 0, + "blue": 0, + } + assert entity.state["on"] is False + + +async def test_coordinator_led_first_turn_on_uses_firmware_dim_white_default( + zbt2_gateway: Gateway, +) -> None: + """Test that the first turn_on without a brightness uses the firmware dim white level.""" + entity = get_entity( + zbt2_gateway.coordinator_zha_device, + platform=Platform.LIGHT, + exact_entity_type=CoordinatorLED, + ) + ezsp = zbt2_gateway.application_controller._ezsp + + await entity.async_turn_on() + await zbt2_gateway.async_block_till_done() + + expected_red, expected_green, expected_blue = _xy_brightness_to_rgb( + DEFAULT_COORDINATOR_LED_XY_COLOR[0], + DEFAULT_COORDINATOR_LED_XY_COLOR[1], + DEFAULT_COORDINATOR_LED_BRIGHTNESS, + ) + assert ezsp.xncp_set_led_state.await_args_list[0].kwargs == { + "red": expected_red, + "green": expected_green, + "blue": expected_blue, + } + assert entity.state["brightness"] == DEFAULT_COORDINATOR_LED_BRIGHTNESS + + +async def test_coordinator_led_brightness_uses_current_xy_color( + zbt2_gateway: Gateway, +) -> None: + """Test that brightness-only updates preserve the current XY color.""" + entity = get_entity( + zbt2_gateway.coordinator_zha_device, + platform=Platform.LIGHT, + exact_entity_type=CoordinatorLED, + ) + ezsp = zbt2_gateway.application_controller._ezsp + + red_xy = (0.64, 0.33) + await entity.async_turn_on(xy_color=red_xy, brightness=255) + await zbt2_gateway.async_block_till_done() + + await entity.async_turn_on(brightness=64) + await zbt2_gateway.async_block_till_done() + + assert ezsp.xncp_set_led_state.await_args_list[1].kwargs == { + "red": 64, + "green": 22, + "blue": 11, + } + assert entity.state["xy_color"] == red_xy + assert entity.state["brightness"] == 64 + + +async def test_coordinator_led_restores_attributes_and_replays_on_state( + zbt2_gateway: Gateway, +) -> None: + """Test that coordinator LED restoration replays the previous on-state.""" + entity = get_entity( + zbt2_gateway.coordinator_zha_device, + platform=Platform.LIGHT, + exact_entity_type=CoordinatorLED, + ) + ezsp = zbt2_gateway.application_controller._ezsp + + entity.restore_external_state_attributes( + state=True, + off_with_transition=False, + off_brightness=None, + brightness=80, + color_temp=None, + xy_color=(0.64, 0.33), + color_mode=ColorMode.XY, + effect=None, + ) + await zbt2_gateway.async_block_till_done() + + expected_red, expected_green, expected_blue = _xy_brightness_to_rgb( + 0.64, + 0.33, + 80, + ) + assert ezsp.xncp_set_led_state.await_args_list[0].kwargs == { + "red": expected_red, + "green": expected_green, + "blue": expected_blue, + } + assert entity.state["on"] is True + assert entity.state["brightness"] == 80 + assert entity.state["xy_color"] == (0.64, 0.33) + assert entity.state["color_mode"] == ColorMode.XY diff --git a/zha/application/discovery.py b/zha/application/discovery.py index 493b48be4..b146807f5 100644 --- a/zha/application/discovery.py +++ b/zha/application/discovery.py @@ -8,8 +8,10 @@ import functools import itertools import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast +from bellows.ezsp.xncp import FirmwareFeatures +import bellows.zigbee.application from zigpy.quirks.v2 import ( BinarySensorMetadata, CustomDeviceV2, @@ -159,7 +161,7 @@ def discover_device_entities(device: Device) -> Iterator[BaseEntity]: @ignore_exceptions_during_iteration def discover_coordinator_device_entities( device: Device, -) -> Iterator[sensor.DeviceCounterSensor]: +) -> Iterator[BaseEntity]: """Discover entities for the coordinator device.""" _LOGGER.debug( "Discovering entities for coordinator device: %s-%s", @@ -168,6 +170,23 @@ def discover_coordinator_device_entities( ) state: State = device.gateway.application_controller.state + if device.gateway.radio_type is zha_const.RadioType.ezsp: + app_controller = cast( + bellows.zigbee.application.ControllerApplication, + device.gateway.application_controller, + ) + ezsp = app_controller._ezsp + + if ezsp is not None and FirmwareFeatures.LED_CONTROL in ezsp._xncp_features: + yield light.CoordinatorLED(device) + + _LOGGER.debug( + "'%s' platform -> '%s' using %s", + Platform.LIGHT, + light.CoordinatorLED.__name__, + "bellows XNCP LED control", + ) + for counter_groups in ( "counters", "broadcast_counters", diff --git a/zha/application/platforms/light/__init__.py b/zha/application/platforms/light/__init__.py index aa41c148c..66aaa9497 100644 --- a/zha/application/platforms/light/__init__.py +++ b/zha/application/platforms/light/__init__.py @@ -25,6 +25,7 @@ BaseEntity, BaseEntityInfo, ClusterHandlerMatch, + EntityCategory, GroupEntity, PlatformEntity, PlatformFeatureGroup, @@ -265,6 +266,223 @@ def restore_external_state_attributes( self._effect = effect +DEFAULT_COORDINATOR_LED_XY_COLOR = (0.3127, 0.3290) +DEFAULT_COORDINATOR_LED_BRIGHTNESS = 75 + + +def _clamp_rgb_channel(value: float) -> int: + """Clamp an RGB channel to an 8-bit integer.""" + return max(0, min(255, int(round(value)))) + + +def _xy_brightness_to_rgb(x: float, y: float, brightness: int) -> tuple[int, int, int]: + """Convert XY + brightness into an RGB tuple.""" + if y <= 0 or brightness <= 0: + return (0, 0, 0) + + y_luma = 1.0 + x_tristimulus = (y_luma / y) * x + z_tristimulus = (y_luma / y) * (1 - x - y) + + red = (x_tristimulus * 1.656492) - (y_luma * 0.354851) - (z_tristimulus * 0.255038) + green = ( + -(x_tristimulus * 0.707196) + (y_luma * 1.655397) + (z_tristimulus * 0.036152) + ) + blue = (x_tristimulus * 0.051713) - (y_luma * 0.121364) + (z_tristimulus * 1.01153) + + if red > blue and red > green and red > 1.0: + green /= red + blue /= red + red = 1.0 + elif green > blue and green > red and green > 1.0: + red /= green + blue /= green + green = 1.0 + elif blue > red and blue > green and blue > 1.0: + red /= blue + green /= blue + blue = 1.0 + + def gamma_correct(value: float) -> float: + value = max(0.0, value) + if value <= 0.0031308: + return 12.92 * value + return (1.055 * pow(value, 1 / 2.4)) - 0.055 + + scale = min(255, brightness) / 255 + return ( + _clamp_rgb_channel(gamma_correct(red) * 255 * scale), + _clamp_rgb_channel(gamma_correct(green) * 255 * scale), + _clamp_rgb_channel(gamma_correct(blue) * 255 * scale), + ) + + +class CoordinatorLED(BaseLight): + """Coordinator LED control exposed through bellows XNCP.""" + + _attr_entity_category = EntityCategory.CONFIG + _attr_fallback_name = "Coordinator LED" + _attr_primary = False + + def __init__(self, zha_device: Device) -> None: + """Init the coordinator LED entity.""" + slugified_device_id = zha_device.unique_id.replace(":", "-") + super().__init__(unique_id=f"{slugified_device_id}_coordinator_led") + self._device = zha_device + self._state = False + self._brightness = DEFAULT_COORDINATOR_LED_BRIGHTNESS + self._color_mode = ColorMode.XY + self._supported_color_modes = {ColorMode.XY} + self._xy_color = DEFAULT_COORDINATOR_LED_XY_COLOR + self._restore_on_startup = False + + @functools.cached_property + def info_object(self) -> LightEntityInfo: + """Return a representation of the coordinator LED.""" + return dataclasses.replace( + super().info_object, + device_ieee=self._device.ieee, + available=self.available, + ) + + @property + def available(self) -> bool: + """Return entity availability.""" + return self._device.available + + @property + def state(self) -> dict[str, Any]: + """Return the state of the coordinator LED.""" + response = super().state + response["available"] = self.available + return response + + def on_add(self) -> None: + """Run when entity is added.""" + super().on_add() + self._schedule_restore_turn_on() + + def restore_external_state_attributes( + self, + *, + state: bool | None, + off_with_transition: bool | None, + off_brightness: int | None, + brightness: int | None, + color_temp: int | None, + xy_color: tuple[float, float] | None, + color_mode: ColorMode | None, + effect: str | None, + ) -> None: + """Restore coordinator LED state and replay the last on-state optimistically.""" + super().restore_external_state_attributes( + state=False, + off_with_transition=off_with_transition, + off_brightness=off_brightness, + brightness=brightness, + color_temp=color_temp, + xy_color=xy_color, + color_mode=color_mode, + effect=effect, + ) + self._restore_on_startup = bool(state) + self._schedule_restore_turn_on() + + def _schedule_restore_turn_on(self) -> None: + """Replay the restored LED state after startup.""" + if not self._restore_on_startup or not self.available: + return + + self._restore_on_startup = False + task = self._device.gateway.async_create_task( + self._async_restore_turn_on(), + name=f"{self.unique_id}_restore_turn_on", + ) + self._tracked_tasks.append(task) + + def remove_task(done_task: asyncio.Task) -> None: + with contextlib.suppress(ValueError): + self._tracked_tasks.remove(done_task) + + task.add_done_callback(remove_task) + + async def _async_restore_turn_on(self) -> None: + """Restore the last known LED state after startup.""" + try: + await self.async_turn_on( + brightness=self._brightness, + xy_color=self._xy_color, + ) + except Exception: # pylint: disable=broad-except + self.error("Failed to restore coordinator LED state", exc_info=True) + + def _get_rgb_color( + self, + brightness: int | None, + xy_color: tuple[float, float] | None, + ) -> tuple[int, int, int]: + """Resolve the RGB color to send to the adapter.""" + if brightness is not None: + target_brightness = max(DEFAULT_MIN_BRIGHTNESS, min(255, brightness)) + self._brightness = target_brightness + elif self._brightness is None: + target_brightness = DEFAULT_COORDINATOR_LED_BRIGHTNESS + self._brightness = target_brightness + else: + target_brightness = self._brightness + + if xy_color is not None: + self._xy_color = xy_color + return _xy_brightness_to_rgb(xy_color[0], xy_color[1], target_brightness) + + if self._xy_color is not None: + return _xy_brightness_to_rgb( + self._xy_color[0], self._xy_color[1], target_brightness + ) + + self._xy_color = DEFAULT_COORDINATOR_LED_XY_COLOR + return _xy_brightness_to_rgb( + self._xy_color[0], self._xy_color[1], target_brightness + ) + + async def _async_set_led_state(self, red: int, green: int, blue: int) -> None: + """Send the LED state to the coordinator.""" + ezsp = getattr(self._device.gateway.application_controller, "_ezsp", None) + if ezsp is None or not hasattr(ezsp, "xncp_set_led_state"): + raise RuntimeError("Coordinator does not expose XNCP LED control") + + await ezsp.xncp_set_led_state(red=red, green=green, blue=blue) + + async def async_turn_on( + self, + *, + transition: float | None = None, + brightness: int | None = None, + effect: str | None = None, + flash: FlashMode | None = None, + color_temp: int | None = None, + xy_color: tuple[int, int] | None = None, + ) -> None: + """Turn the entity on.""" + + resolved_xy_color: tuple[float, float] | None = None + if xy_color is not None: + resolved_xy_color = (float(xy_color[0]), float(xy_color[1])) + elif self._xy_color is None and self._color_mode == ColorMode.XY: + resolved_xy_color = DEFAULT_COORDINATOR_LED_XY_COLOR + + red, green, blue = self._get_rgb_color(brightness, resolved_xy_color) + await self._async_set_led_state(red=red, green=green, blue=blue) + self._state = True + self.maybe_emit_state_changed_event() + + async def async_turn_off(self, *, transition: float | None = None) -> None: + """Turn the entity off.""" + await self._async_set_led_state(red=0, green=0, blue=0) + self._state = False + self.maybe_emit_state_changed_event() + + class BaseClusterHandlerLight(BaseLight): """Operations common to all light entities."""