diff --git a/openwisp_firmware_upgrader/api/serializers.py b/openwisp_firmware_upgrader/api/serializers.py index 5e02ee0..112648d 100644 --- a/openwisp_firmware_upgrader/api/serializers.py +++ b/openwisp_firmware_upgrader/api/serializers.py @@ -1,5 +1,6 @@ import swapper from django.core.exceptions import ValidationError +from django.urls import reverse from django.utils.translation import gettext_lazy as _ from rest_framework import serializers @@ -49,6 +50,23 @@ def validate(self, data): data["build"] = self.context["view"].get_parent_queryset().get() return super().validate(data) + def to_representation(self, instance): + ret = super().to_representation(instance) + request = self.context.get("request") + if request and getattr(instance, "pk", None): + ret["file"] = request.build_absolute_uri( + reverse( + "upgrader:api_firmware_download", + args=[instance.build.pk, instance.pk], + ) + ) + elif hasattr(instance, "file"): + ret["file"] = reverse( + "upgrader:api_firmware_download", + args=[instance.build.pk, instance.pk], + ) + return ret + class Meta(BaseMeta): model = FirmwareImage fields = "__all__" diff --git a/openwisp_firmware_upgrader/api/views.py b/openwisp_firmware_upgrader/api/views.py index fe270ec..2f22160 100644 --- a/openwisp_firmware_upgrader/api/views.py +++ b/openwisp_firmware_upgrader/api/views.py @@ -186,6 +186,7 @@ class FirmwareImageDownloadView(FirmwareImageMixin, generics.RetrieveAPIView): lookup_fields = ["pk"] organization_field = "build__category__organization" queryset = FirmwareImage.objects.none() + permission_classes = [] # Permissions are checked in the private storage view def retrieve(self, request, *args, **kwargs): return private_storage.views.firmware_image_download( diff --git a/openwisp_firmware_upgrader/private_storage/views.py b/openwisp_firmware_upgrader/private_storage/views.py index c09710b..7deb49d 100644 --- a/openwisp_firmware_upgrader/private_storage/views.py +++ b/openwisp_firmware_upgrader/private_storage/views.py @@ -1,15 +1,32 @@ +from django.contrib.auth import get_permission_codename +from django.contrib.auth.mixins import PermissionRequiredMixin +from django.http import HttpResponse from private_storage.views import PrivateStorageDetailView from ..swapper import load_model -class FirmwareImageDownloadView(PrivateStorageDetailView): +class FirmwareImageDownloadView(PermissionRequiredMixin, PrivateStorageDetailView): model = load_model("FirmwareImage") model_file_field = "file" + raise_exception = True slug_field = "file" slug_url_kwarg = "path" + def dispatch(self, request, *args, **kwargs): + if not request.user.is_authenticated: + return HttpResponse(status=401) + return super().dispatch(request, *args, **kwargs) + + def get_permission_required(self): + """ + Return the list of permissions that the user should have. + """ + return [ + f'{self.model._meta.app_label}.{get_permission_codename("view", self.model._meta)}' + ] + def can_access_file(self, private_file): user = private_file.request.user return user.is_superuser or ( diff --git a/openwisp_firmware_upgrader/tests/test_api.py b/openwisp_firmware_upgrader/tests/test_api.py index 68901cf..a7dce49 100644 --- a/openwisp_firmware_upgrader/tests/test_api.py +++ b/openwisp_firmware_upgrader/tests/test_api.py @@ -1,7 +1,9 @@ import uuid import swapper -from django.contrib.auth import get_user_model +from django.contrib.auth import get_permission_codename, get_user_model +from django.contrib.auth.models import Group, Permission +from django.contrib.contenttypes.models import ContentType from django.test import Client, TestCase from django.urls import reverse from packaging.version import parse as parse_version @@ -796,7 +798,11 @@ class TestFirmwareImageViews(TestAPIUpgraderMixin, TestCase): def _serialize_image(self, firmware): serializer = FirmwareImageSerializer() data = dict(serializer.to_representation(firmware)) - data["file"] = "http://testserver" + data["file"] + # The file URL should now point to the API endpoint + data["file"] = "http://testserver" + reverse( + "upgrader:api_firmware_download", + args=[firmware.build.pk, firmware.pk], + ) return data def test_firmware_unauthorized(self): @@ -817,6 +823,7 @@ def test_firmware_unauthorized(self): r = client.get(url) self.assertEqual(r.status_code, 401) + # The download URL is handled by private_storage view which returns 401 for unauthenticated users url = reverse("upgrader:api_firmware_download", args=[image.build.pk, image.pk]) with self.subTest(url=url): with self.assertNumQueries(1): @@ -827,13 +834,24 @@ def test_firmware_list(self): image = self._create_firmware_image() self._create_firmware_image(type=self.TPLINK_4300_IL_IMAGE) + url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) + with self.assertNumQueries(8): + r = self.client.get(url) + + # Verify file URLs point to API endpoint + for result in r.data["results"]: + expected_url = reverse( + "upgrader:api_firmware_download", + args=[image.build.pk, result["id"]], + ) + self.assertTrue(result["file"].endswith(expected_url)) + self.assertNotIn("private-media", result["file"]) + + # Verify rest of the serialized data serialized_list = [ self._serialize_image(image) for image in FirmwareImage.objects.all().order_by("-created") ] - url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) - with self.assertNumQueries(6): - r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) def test_firmware_list_404(self): @@ -850,14 +868,14 @@ def test_firmware_list_django_filters(self): url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) filter_params = dict(type=self.TPLINK_4300_IMAGE) - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url, filter_params) self.assertEqual(r.data["results"], [self._serialize_image(image)]) url = reverse("upgrader:api_firmware_list", args=[image.build.pk]) filter_params = dict(type=self.TPLINK_4300_IL_IMAGE) - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url, filter_params) self.assertEqual(r.data["results"], [self._serialize_image(image2)]) @@ -876,14 +894,14 @@ def test_firmware_list_filter_org(self): self._login("operator", "tester") serialized_list = [self._serialize_image(image)] - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) url = reverse("upgrader:api_firmware_list", args=[image2.build.pk]) self._login("operator2", "tester") serialized_list = [self._serialize_image(image2)] - with self.assertNumQueries(6): + with self.assertNumQueries(7): r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) @@ -905,7 +923,7 @@ def test_firmware_list_filter_org_admin(self): self._serialize_image(image), ] - with self.assertNumQueries(4): + with self.assertNumQueries(5): r = self.client.get(url) self.assertEqual(r.data["results"], serialized_list) @@ -913,7 +931,7 @@ def test_firmware_list_filter_org_admin(self): data_filter = {"org": "New org"} serialized_list = [self._serialize_image(image2)] - with self.assertNumQueries(4): + with self.assertNumQueries(5): r = self.client.get(url, data_filter) self.assertEqual(r.data["results"], serialized_list) @@ -1813,3 +1831,141 @@ def test_api_docs(self): self._login("admin", "tester") response = self.client.get(url) self.assertEqual(response.status_code, 403) + + +class TestFirmwareDownloadPermissions(TestAPIUpgraderMixin, TestCase): + def setUp(self): + super().setUp() + self.image = self._create_firmware_image() + self.default_org = self._get_org("default") + self.test_org = self._get_org() + self.other_org = self._create_org(name="other", slug="other") + + # Get view permission + content_type = ContentType.objects.get_for_model(FirmwareImage) + perm_codename = get_permission_codename("view", FirmwareImage._meta) + self.view_perm = Permission.objects.get( + content_type=content_type, codename=perm_codename + ) + + # Get Operator group + self.operator_group = Group.objects.get(name="Operator") + + def _setup_user( + self, + is_staff=False, + is_org_admin=False, + org=None, + has_view_perm=False, + is_operator=False, + ): + """Helper method to setup user with specific permissions""" + user = self._get_operator() + user.is_staff = is_staff + user.save() + + # Clear existing permissions and relationships + user.user_permissions.clear() + user.groups.clear() + OrganizationUser.objects.filter(user=user).delete() + + if org: + self._create_org_user(user=user, organization=org, is_admin=is_org_admin) + + if has_view_perm: + user.user_permissions.add(self.view_perm) + + if is_operator: + user.groups.add(self.operator_group) + + self._login(user.username, "tester") + return user + + def test_firmware_download_permissions(self): + """ + Test firmware download permissions for different user scenarios. + """ + with self.subTest("User without any permissions"): + user = self._get_user() + self._login(user.username, "tester") + + url = reverse( + "upgrader:api_firmware_download", + args=[self.image.build.pk, self.image.pk], + ) + with self.assertNumQueries(4): + response = self.client.get(url) + self.assertEqual(response.status_code, 403) + + with self.subTest("Staff user without org admin or view permission"): + self._setup_user( + is_staff=True, + is_org_admin=False, + org=None, + ) + + url = reverse( + "upgrader:api_firmware_download", + args=[self.image.build.pk, self.image.pk], + ) + with self.assertNumQueries(4): + response = self.client.get(url) + self.assertEqual(response.status_code, 403) + + with self.subTest("Staff user who is org admin of a different organization"): + self._setup_user( + is_staff=True, + is_org_admin=True, + org=self.other_org, + ) + + url = reverse( + "upgrader:api_firmware_download", + args=[self.image.build.pk, self.image.pk], + ) + with self.assertNumQueries(4): + response = self.client.get(url) + self.assertEqual(response.status_code, 403) + + with self.subTest("Staff user who is org admin of same organization"): + # Create a staff user who is org admin of the same organization + user = self._get_operator() + user.is_staff = True + user.save() + + # Clear existing permissions and relationships + user.user_permissions.clear() + user.groups.clear() + OrganizationUser.objects.filter(user=user).delete() + + # Add to Operator group + user.groups.add(self.operator_group) + + # Create org admin relationship + self._create_org_user( + user=user, + organization=self.image.build.category.organization, + is_admin=True, + ) + + self._login(user.username, "tester") + + url = reverse( + "upgrader:api_firmware_download", + args=[self.image.build.pk, self.image.pk], + ) + with self.assertNumQueries(8): + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + + with self.subTest("Superuser access"): + user = self._get_admin() + self._login(user.username, "tester") + + url = reverse( + "upgrader:api_firmware_download", + args=[self.image.build.pk, self.image.pk], + ) + with self.assertNumQueries(3): + response = self.client.get(url) + self.assertEqual(response.status_code, 200) diff --git a/openwisp_firmware_upgrader/tests/test_private_storage.py b/openwisp_firmware_upgrader/tests/test_private_storage.py index 055cfe9..48b496d 100644 --- a/openwisp_firmware_upgrader/tests/test_private_storage.py +++ b/openwisp_firmware_upgrader/tests/test_private_storage.py @@ -1,12 +1,18 @@ import swapper +from django.contrib.auth import get_permission_codename +from django.contrib.auth.models import Permission +from django.contrib.contenttypes.models import ContentType from django.test import TestCase from django.urls import reverse from openwisp_users.tests.utils import TestMultitenantAdminMixin +from ..swapper import load_model from .base import TestUpgraderMixin OrganizationUser = swapper.load_model("openwisp_users", "OrganizationUser") +FirmwareImage = load_model("FirmwareImage") +Group = swapper.load_model("openwisp_users", "Group") class TestPrivateStorage(TestUpgraderMixin, TestMultitenantAdminMixin, TestCase): @@ -15,62 +21,83 @@ def setUp(self): self.image = self._create_firmware_image() self.default_org = self._get_org("default") self.test_org = self._get_org() + self.other_org = self._create_org(name="other", slug="other") + + # Get view permission + content_type = ContentType.objects.get_for_model(FirmwareImage) + perm_codename = get_permission_codename("view", FirmwareImage._meta) + self.view_perm = Permission.objects.get( + content_type=content_type, codename=perm_codename + ) + + # Get Operator group + self.operator_group = Group.objects.get(name="Operator") def _download_firmware_assert_status(self, status_code): + """Helper method to test firmware download via private storage view""" response = self.client.get( reverse("serve_private_file", args=[self.image.file]) ) self.assertEqual(response.status_code, status_code) - def test_unauthenticated_user(self): - self._download_firmware_assert_status(403) + def _setup_user( + self, + is_staff=False, + is_org_admin=False, + org=None, + has_view_perm=False, + is_operator=False, + ): + """Helper method to setup user with specific permissions""" + user = self._get_operator() + user.is_staff = is_staff + user.save() - def test_authenticated_user(self): - user = self._get_user() - self.client.force_login(user) - self._download_firmware_assert_status(403) + if org: + self._create_org_user(user=user, organization=org, is_admin=is_org_admin) - def test_authenticated_user_with_different_organization(self): - self._create_org_user() - user = self._get_user() - self.client.force_login(user) - self._download_firmware_assert_status(403) + if has_view_perm: + user.user_permissions.add(self.view_perm) + + if is_operator: + user.groups.add(self.operator_group) - def test_authenticated_user_with_same_organization(self): - self._create_org_user(organization=self.test_org) - user = self._get_user() self.client.force_login(user) - self._download_firmware_assert_status(403) - - def test_staff_user_with_different_organization(self): - staff_user = self._get_operator() - self._create_org_user(user=staff_user, organization=self.default_org) - self.client.force_login(staff_user) - self._download_firmware_assert_status(403) - - def test_staff_user_with_same_organization(self): - staff_user = self._get_operator() - self._create_org_user(user=staff_user, organization=self.test_org) - self.client.force_login(staff_user) - self._download_firmware_assert_status(403) - - def test_staff_operator_with_different_organization(self): - staff_user = self._get_operator() - self._create_org_user( - user=staff_user, organization=self.default_org, is_admin=True - ) - self.client.force_login(staff_user) - self._download_firmware_assert_status(403) + return user - def test_staff_operator_with_same_organization(self): - staff_user = self._get_operator() - self._create_org_user( - user=staff_user, organization=self.test_org, is_admin=True - ) - self.client.force_login(staff_user) - self._download_firmware_assert_status(200) + def test_firmware_download_permissions(self): + """ + Test firmware download permissions for different user scenarios. + """ + with self.subTest("User without any permissions"): + user = self._get_user() + self.client.force_login(user) + self._download_firmware_assert_status(403) - def test_superuser(self): - user = self._get_admin() - self.client.force_login(user) - self._download_firmware_assert_status(200) + with self.subTest("Staff user without org admin or view permission"): + user = self._get_operator() + user.is_staff = True + user.save() + self.client.force_login(user) + self._download_firmware_assert_status(403) + + with self.subTest("Staff user who is org admin of a different organization"): + self._setup_user( + is_staff=True, + is_org_admin=True, + org=self.other_org, + ) + self._download_firmware_assert_status(403) + + with self.subTest("Staff user who is org admin of same organization"): + self._setup_user( + is_staff=True, + is_org_admin=True, + org=self.image.build.category.organization, + ) + self._download_firmware_assert_status(200) + + with self.subTest("Superuser access"): + user = self._get_admin() + self.client.force_login(user) + self._download_firmware_assert_status(200)