diff --git a/openwisp_firmware_upgrader/api/serializers.py b/openwisp_firmware_upgrader/api/serializers.py index 5e02ee0..112648d 100644 --- a/openwisp_firmware_upgrader/api/serializers.py +++ b/openwisp_firmware_upgrader/api/serializers.py @@ -1,5 +1,6 @@ import swapper from django.core.exceptions import ValidationError +from django.urls import reverse from django.utils.translation import gettext_lazy as _ from rest_framework import serializers @@ -49,6 +50,23 @@ def validate(self, data): data["build"] = self.context["view"].get_parent_queryset().get() return super().validate(data) + def to_representation(self, instance): + ret = super().to_representation(instance) + request = self.context.get("request") + if request and getattr(instance, "pk", None): + ret["file"] = request.build_absolute_uri( + reverse( + "upgrader:api_firmware_download", + args=[instance.build.pk, instance.pk], + ) + ) + elif hasattr(instance, "file"): + ret["file"] = reverse( + "upgrader:api_firmware_download", + args=[instance.build.pk, instance.pk], + ) + return ret + class Meta(BaseMeta): model = FirmwareImage fields = "__all__" diff --git a/openwisp_firmware_upgrader/api/views.py b/openwisp_firmware_upgrader/api/views.py index fe270ec..2f22160 100644 --- a/openwisp_firmware_upgrader/api/views.py +++ b/openwisp_firmware_upgrader/api/views.py @@ -186,6 +186,7 @@ class FirmwareImageDownloadView(FirmwareImageMixin, generics.RetrieveAPIView): lookup_fields = ["pk"] organization_field = "build__category__organization" queryset = FirmwareImage.objects.none() + permission_classes = [] # Permissions are checked in the private storage view def retrieve(self, request, *args, **kwargs): return private_storage.views.firmware_image_download( diff --git a/openwisp_firmware_upgrader/private_storage/views.py b/openwisp_firmware_upgrader/private_storage/views.py index c09710b..7deb49d 100644 --- a/openwisp_firmware_upgrader/private_storage/views.py +++ b/openwisp_firmware_upgrader/private_storage/views.py @@ -1,15 +1,32 @@ +from django.contrib.auth import get_permission_codename +from django.contrib.auth.mixins import PermissionRequiredMixin +from django.http import HttpResponse from private_storage.views import PrivateStorageDetailView from ..swapper import load_model -class FirmwareImageDownloadView(PrivateStorageDetailView): +class FirmwareImageDownloadView(PermissionRequiredMixin, PrivateStorageDetailView): model = load_model("FirmwareImage") model_file_field = "file" + raise_exception = True slug_field = "file" slug_url_kwarg = "path" + def dispatch(self, request, *args, **kwargs): + if not request.user.is_authenticated: + return HttpResponse(status=401) + return super().dispatch(request, *args, **kwargs) + + def get_permission_required(self): + """ + Return the list of permissions that the user should have. + """ + return [ + f'{self.model._meta.app_label}.{get_permission_codename("view", self.model._meta)}' + ] + def can_access_file(self, private_file): user = private_file.request.user return user.is_superuser or ( diff --git a/openwisp_firmware_upgrader/tests/test_api.py b/openwisp_firmware_upgrader/tests/test_api.py index 68901cf..9758a98 100644 --- a/openwisp_firmware_upgrader/tests/test_api.py +++ b/openwisp_firmware_upgrader/tests/test_api.py @@ -796,7 +796,11 @@ class TestFirmwareImageViews(TestAPIUpgraderMixin, TestCase): def _serialize_image(self, firmware): serializer = FirmwareImageSerializer() data = dict(serializer.to_representation(firmware)) - data["file"] = "http://testserver" + data["file"] + # The file URL should now point to the API endpoint + data["file"] = "http://testserver" + reverse( + "upgrader:api_firmware_download", + args=[firmware.build.pk, firmware.pk], + ) return data def test_firmware_unauthorized(self): @@ -817,6 +821,7 @@ def test_firmware_unauthorized(self): r = client.get(url) self.assertEqual(r.status_code, 401) + # The download URL is handled by private_storage view which returns 401 for unauthenticated users url = reverse("upgrader:api_firmware_download", args=[image.build.pk, image.pk]) with self.subTest(url=url): with self.assertNumQueries(1): @@ -827,13 +832,24 @@ def test_firmware_list(self): image = self._create_firmware_image() self._create_firmware_image(type=self.TPLINK_4300_IL_IMAGE) + url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) + with self.assertNumQueries(8): + r = self.client.get(url) + + # Verify file URLs point to API endpoint + for result in r.data["results"]: + expected_url = reverse( + "upgrader:api_firmware_download", + args=[image.build.pk, result["id"]], + ) + self.assertTrue(result["file"].endswith(expected_url)) + self.assertNotIn("private-media", result["file"]) + + # Verify rest of the serialized data serialized_list = [ self._serialize_image(image) for image in FirmwareImage.objects.all().order_by("-created") ] - url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) - with self.assertNumQueries(6): - r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) def test_firmware_list_404(self): @@ -850,14 +866,14 @@ def test_firmware_list_django_filters(self): url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) filter_params = dict(type=self.TPLINK_4300_IMAGE) - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url, filter_params) self.assertEqual(r.data["results"], [self._serialize_image(image)]) url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) filter_params = dict(type=self.TPLINK_4300_IL_IMAGE) - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url, filter_params) self.assertEqual(r.data["results"], [self._serialize_image(image2)]) @@ -876,14 +892,14 @@ def test_firmware_list_filter_org(self): self._login("operator", "tester") serialized_list = [self._serialize_image(image)] - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) url = reverse("upgrader:api_firmware_list", args=[image2.build.pk]) self._login("operator2", "tester") serialized_list = [self._serialize_image(image2)] - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) @@ -905,7 +921,7 @@ def test_firmware_list_filter_org_admin(self): self._serialize_image(image), ] - with self.assertNumQueries(4): + with self.assertNumQueries(5): r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) @@ -913,7 +929,7 @@ def test_firmware_list_filter_org_admin(self): data_filter = {"org": "New org"} serialized_list = [self._serialize_image(image2)] - with self.assertNumQueries(4): + with self.assertNumQueries(5): r = self.client.get(url, data_filter) self.assertEqual(r.data["results"], serialized_list) diff --git a/openwisp_firmware_upgrader/tests/test_private_storage.py b/openwisp_firmware_upgrader/tests/test_private_storage.py index 055cfe9..32f96b6 100644 --- a/openwisp_firmware_upgrader/tests/test_private_storage.py +++ b/openwisp_firmware_upgrader/tests/test_private_storage.py @@ -1,12 +1,18 @@ import swapper +from django.contrib.auth import get_permission_codename +from django.contrib.auth.models import Permission +from django.contrib.contenttypes.models import ContentType from django.test import TestCase from django.urls import reverse from openwisp_users.tests.utils import TestMultitenantAdminMixin +from ..swapper import load_model from .base import TestUpgraderMixin OrganizationUser = swapper.load_model("openwisp_users", "OrganizationUser") +FirmwareImage = load_model("FirmwareImage") +Group = swapper.load_model("openwisp_users", "Group") class TestPrivateStorage(TestUpgraderMixin, TestMultitenantAdminMixin, TestCase): @@ -23,7 +29,7 @@ def _download_firmware_assert_status(self, status_code): self.assertEqual(response.status_code, status_code) def test_unauthenticated_user(self): - self._download_firmware_assert_status(403) + self._download_firmware_assert_status(401) def test_authenticated_user(self): user = self._get_user() @@ -74,3 +80,47 @@ def test_superuser(self): user = self._get_admin() self.client.force_login(user) self._download_firmware_assert_status(200) + + def test_view_permission_check(self): + staff_user = self._get_operator() + self.client.force_login(staff_user) + org = self.image.build.category.organization + + with self.subTest("Test initial access without permissions"): + self._download_firmware_assert_status(403) + + # Add view permission first + content_type = ContentType.objects.get_for_model(FirmwareImage) + perm_codename = get_permission_codename("view", FirmwareImage._meta) + view_perm = Permission.objects.get( + content_type=content_type, codename=perm_codename + ) + staff_user.user_permissions.add(view_perm) + + with self.subTest("Test access with view permission and org admin status"): + self._create_org_user(user=staff_user, organization=org, is_admin=True) + self._download_firmware_assert_status(200) + + # Remove org manager status + org_user = staff_user.openwisp_users_organization.get( + organization_users__organization=org + ) + org_user.is_admin = False + org_user.save() + + # Remove staff status + staff_user.is_staff = False + staff_user.save() + + # Restore org manager status + org_user.is_admin = True + org_user.save() + + with self.subTest("Test access without staff status"): + self._download_firmware_assert_status(403) + + # Remove view permission + staff_user.user_permissions.remove(view_perm) + + with self.subTest("Test access without view permission"): + self._download_firmware_assert_status(403)