diff --git a/g3w-admin/caching/tests/test_api.py b/g3w-admin/caching/tests/test_api.py
index a12af58cd..e54a9a6ed 100644
--- a/g3w-admin/caching/tests/test_api.py
+++ b/g3w-admin/caching/tests/test_api.py
@@ -80,6 +80,8 @@ def test_tilestache_api(self):
client = Client()
layer = Layer.objects.get(project=self.project.instance, qgs_layer_id='spatialite_points20190604101052075')
assign_perm('view_project', self.anonymoususer, self.project.instance)
+ for l in self.project.instance.layer_set.all():
+ assign_perm("view_layer", self.anonymoususer, l)
# active caching for layer
cachinglayer = G3WCachingLayer.objects.create(app_name='qdjango', layer_id=layer.pk)
diff --git a/g3w-admin/qdjango/auth.py b/g3w-admin/qdjango/auth.py
index 6ac45feee..0ab70f6b2 100644
--- a/g3w-admin/qdjango/auth.py
+++ b/g3w-admin/qdjango/auth.py
@@ -25,6 +25,7 @@ def auth_request(self, **kwargs):
try:
ba = BasicAuthentication()
user, other = ba.authenticate(self.request)
+ self.request.user = user
return user.has_perm('qdjango.view_project', self.project)
except Exception as e:
print(e)
diff --git a/g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py b/g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py
new file mode 100644
index 000000000..0aa90bbf6
--- /dev/null
+++ b/g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py
@@ -0,0 +1,50 @@
+# coding=utf-8
+"""" Che layer acl
+.. note:: This program is free software; you can redistribute it and/or modify
+ it under the terms of the Mozilla Public License 2.0.
+
+"""
+
+__author__ = "lorenzetti@gis3w.it"
+__date__ = "2023-09-25"
+__copyright__ = "Copyright 2015 - 2023, Gis3w"
+__license__ = "MPL 2.0"
+
+from guardian.shortcuts import get_perms
+from qgis.server import QgsAccessControlFilter
+from qgis.core import QgsMessageLog, Qgis
+from qdjango.apps import QGS_SERVER
+from qdjango.models import Layer
+
+
+class LayerAclAccessControlFilter(QgsAccessControlFilter):
+ """Filter layer by ACL properties"""
+
+ def __init__(self, server_iface):
+ super().__init__(server_iface)
+
+ def layerPermissions(self, layer):
+
+ rights = QgsAccessControlFilter.LayerPermissions()
+
+ try:
+ qdjango_layer = Layer.objects.get(
+ project=QGS_SERVER.project, qgs_layer_id=layer.id())
+
+ # Check permission
+ perms = get_perms(QGS_SERVER.user, qdjango_layer)
+ rights.canRead = "view_layer" in perms
+ rights.canInsert = "add_layer" in perms
+ rights.canUpdate = "change_layer" in perms
+ rights.canDelete = "delete_layer" in perms
+
+ except Layer.DoesNotExist:
+ pass
+
+ return rights
+
+
+# Register the filter, keep a reference because of the garbage collector
+layeracl_filter = LayerAclAccessControlFilter(QGS_SERVER.serverInterface())
+# Note: this should be the last filter, set the priority to 10000
+QGS_SERVER.serverInterface().registerAccessControl(layeracl_filter, 10010)
\ No newline at end of file
diff --git a/g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg b/g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg
index 0dac454a1..69b5fbbd8 100644
Binary files a/g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg and b/g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg differ
diff --git a/g3w-admin/qdjango/tests/test_constraints.py b/g3w-admin/qdjango/tests/test_constraints.py
index 5d6deacf7..f215193ca 100644
--- a/g3w-admin/qdjango/tests/test_constraints.py
+++ b/g3w-admin/qdjango/tests/test_constraints.py
@@ -164,6 +164,31 @@ def _check_subset_string(self, login=True):
return is_rome
+ def _check_wfs_getfeature(self, login=True):
+ """Check for ROME in the returned content"""
+
+ ows_url = reverse('OWS:ows', kwargs={'group_slug': self.qdjango_project.group.slug,
+ 'project_type': 'qdjango', 'project_id': self.qdjango_project.id})
+
+ c = Client()
+ if login:
+ self.assertTrue(c.login(username='admin01', password='admin01'))
+ response = c.get(ows_url, {
+ 'REQUEST': 'GetFeature',
+ 'SERVICE': 'WFS',
+ 'VERSION': '1.1.0',
+ 'TYPENAME': 'world'
+ })
+
+ is_rome = b"ROME" in response.content
+ # Now query another location to make sure the whole layer was not invalidated
+ assert b"BERLIN" in response.content
+
+ if login:
+ c.logout()
+
+ return is_rome
+
class SingleLayerSubsetStringConstraints(TestSingleLayerConstraintsBase):
"""Test single layer subset string constraints"""
@@ -172,6 +197,7 @@ def test_user_constraint(self):
"""Test model with user constraint"""
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
admin01 = self.test_user1
constraint = SingleLayerConstraint(layer=self.world, active=True)
@@ -196,6 +222,7 @@ def test_user_constraint(self):
admin01, self.world.pk), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
@@ -222,6 +249,7 @@ def test_user_constraint(self):
admin01, self.world.pk), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
@@ -252,12 +280,16 @@ def test_user_constraint(self):
# for OGC service only in v an ve context
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
def test_anonymoususer_constraint(self):
"""Test for anonymous user"""
# For AnonymousUser
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
+ for l in self.qdjango_project.layer_set.all():
+ assign_perm('view_layer', get_anonymous_user(), l)
+
self.assertTrue(self._check_subset_string(login=False))
constraint_anonymous = SingleLayerConstraint(layer=self.world, active=True)
@@ -267,11 +299,13 @@ def test_anonymoususer_constraint(self):
rule_anonymous.save()
self.assertFalse(self._check_subset_string(login=False))
+ self.assertFalse(self._check_wfs_getfeature(login=False))
def test_group_constraint(self):
"""Test model with group constraint"""
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
admin01 = self.test_user1
group1 = admin01.groups.all()[0]
@@ -298,6 +332,7 @@ def test_group_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
@skipIf(IS_QGIS_3_10, "In QGIS 3.10 setSubsetString() always returns True")
def test_validate_sql(self):
@@ -395,6 +430,7 @@ def test_user_constraint(self):
"""Test model with user constraint"""
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
admin01 = self.test_user1
world = self.world
@@ -420,6 +456,7 @@ def test_user_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
@@ -438,6 +475,7 @@ def test_user_constraint(self):
admin01, world.pk, context='e'), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
@@ -458,6 +496,7 @@ def test_user_constraint(self):
admin01, world.pk, context='e'), "(NAME != 'ITALY')")
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
@@ -468,7 +507,11 @@ def test_anonymoususer_constraint(self):
# For AnonymousUser
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
+ for l in self.qdjango_project.layer_set.all():
+ assign_perm('view_layer', get_anonymous_user(), l)
+
self.assertTrue(self._check_subset_string(login=False))
+ self.assertTrue(self._check_wfs_getfeature(login=False))
constraint_anonymous = SingleLayerConstraint(layer=self.world, active=True)
constraint_anonymous.save()
@@ -477,11 +520,13 @@ def test_anonymoususer_constraint(self):
rule_anonymous.save()
self.assertFalse(self._check_subset_string(login=False))
+ self.assertFalse(self._check_wfs_getfeature(login=False))
def test_group_constraint(self):
"""Test model with group constraint"""
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
admin01 = self.test_user1
group1 = admin01.groups.all()[0]
@@ -508,6 +553,7 @@ def test_group_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
# context view + editing ve
# =========================
@@ -529,6 +575,7 @@ def test_group_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
# context editing e
# =========================
@@ -554,6 +601,7 @@ def test_group_constraint(self):
# for OWS service only for context v and ve
self.assertTrue(self._check_subset_string())
+ self.assertTrue(self._check_wfs_getfeature())
def test_validate_sql(self):
@@ -768,6 +816,8 @@ def test_shp_api(self):
# =============================
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
+ for l in self.qdjango_project.layer_set.all():
+ assign_perm('view_layer', get_anonymous_user(), l)
rule = ConstraintExpressionRule(
constraint=constraint, user=get_anonymous_user(), rule="NAME != 'ITALY'", anonymoususer=True)
@@ -982,6 +1032,8 @@ def test_xls_api(self):
# -----------------
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
+ for l in self.qdjango_project.layer_set.all():
+ assign_perm('view_layer', get_anonymous_user(), l)
rule = ConstraintExpressionRule(
constraint=constraint, user=get_anonymous_user(), rule="NAME != 'ITALY'", anonymoususer=True)
@@ -1614,6 +1666,7 @@ def test_bbox_filter(self):
rule="intersects_bbox( $geometry, geom_from_wkt( 'POLYGON((8 51, 11 51, 11 52, 11 52, 8 51))') )")
rule.save()
self.assertFalse(self._check_subset_string())
+ self.assertFalse(self._check_wfs_getfeature())
rule.delete()
@@ -1773,12 +1826,12 @@ def test_geoconstraint_filter(self):
constraint.save()
# assign permissions
- assign_perm('view_project', self.test_viewer1, self.qdjango_project)
- assign_perm('view_project', self.test_viewer1_3, self.qdjango_project)
- assign_perm('view_project', self.test_gu_viewer1, self.qdjango_project)
-
# also to Anonymous user
- assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
+ for u in (self.test_viewer1, self.test_viewer1_3, self.test_gu_viewer1, get_anonymous_user()):
+ assign_perm('view_project', u, self.qdjango_project)
+ for l in self.qdjango_project.layer_set.all():
+ assign_perm("view_layer", u, l)
+
ows_url = reverse('OWS:ows', kwargs={'group_slug': self.qdjango_project.group.slug,
'project_type': 'qdjango', 'project_id': self.qdjango_project.id})
diff --git a/g3w-admin/qdjango/tests/test_ows.py b/g3w-admin/qdjango/tests/test_ows.py
index a0071dd11..b3ee298ed 100644
--- a/g3w-admin/qdjango/tests/test_ows.py
+++ b/g3w-admin/qdjango/tests/test_ows.py
@@ -50,12 +50,19 @@ class OwsTest(QdjangoTestBase):
def setUpTestData(cls):
super().setUpTestData()
- cls.qdjango_project = Project(
- qgis_file=cls.project.qgisProjectFile,
- title='Test qdjango project',
- group=cls.project_group,
- )
- cls.qdjango_project.save()
+ #cls.qdjango_project = Project(
+ # qgis_file=cls.project.qgisProjectFile,
+ # title='Test qdjango project',
+ # group=cls.project_group,
+ #)
+ #cls.qdjango_project.save()
+
+ cls.project2 = QgisProject(cls.project.qgisProjectFile)
+ cls.project2.title = "Test qdjango project"
+ cls.project2.group = cls.project_group
+ cls.project2.save()
+
+ cls.qdjango_project = cls.project2.instance
qgis_project_file_widget = File(open('{}{}{}'.format(
CURRENT_PATH, TEST_BASE_PATH, QGS310_WIDGET_FILE), 'r'))
@@ -174,6 +181,8 @@ def test_authorizzer(self):
# give permission to user
assign_perm('view_project', self.test_viewer1, self.qdjango_project)
+ for l in self.qdjango_project.layer_set.all():
+ assign_perm("view_layer", self.test_viewer1, l)
response = c.get(ows_url, {
'REQUEST': 'GetCapabilities',
@@ -188,7 +197,7 @@ def test_authorizzer(self):
# try basic authentication
# for viewer1
c = Client(HTTP_AUTHORIZATION='Basic dmlld2VyMTp2aWV3ZXIx')
- esponse = c.get(ows_url, {
+ response = c.get(ows_url, {
'REQUEST': 'GetCapabilities',
'SERVICE': 'WMS'
})
@@ -196,6 +205,51 @@ def test_authorizzer(self):
self.assertEqual(response.status_code, 200)
self.assertTrue(b'bluemarble' in response.content)
+ # Filter layer by user
+ for l in self.qdjango_project.layer_set.filter(name__in=['bluemarble', 'world']):
+ remove_perm("view_layer", self.test_viewer1, l)
+
+ response = c.get(ows_url, {
+ "REQUEST": "GetCapabilities",
+ "SERVICE": "WMS"
+ })
+
+ self.assertEqual(response.status_code, 200)
+ self.assertFalse(b'bluemarble' in response.content)
+ self.assertFalse(b"world" in response.content)
+ self.assertTrue(b"spatialite_points" in response.content)
+
+ # For WFS
+ response = c.get(ows_url, {
+ "REQUEST": "GetCapabilities",
+ "SERVICE": "WFS",
+ "VERSION": "1.1.0",
+ "TYPENAME": "world"
+ })
+
+ self.assertEqual(response.status_code, 200)
+ self.assertFalse(b"world" in response.content)
+ self.assertTrue(b"spatialite_points" in response.content)
+
+ response = c.get(ows_url, {
+ "REQUEST": "GetCapabilities",
+ "SERVICE": "WFS"
+ })
+
+ self.assertEqual(response.status_code, 200)
+
+ for l in self.qdjango_project.layer_set.filter(name='world'):
+ assign_perm("view_layer", self.test_viewer1, l)
+
+ response = c.get(ows_url, {
+ "REQUEST": "GetCapabilities",
+ "SERVICE": "WFS"
+ })
+
+ self.assertEqual(response.status_code, 200)
+ self.assertTrue(b"world" in response.content)
+
+
def test_get_getfeatureinfo(self):
"""Test GetFeatureInfo for QGIS widget"""