Skip to content

Commit

Permalink
Improved testing for mandatoryFilterSet
Browse files Browse the repository at this point in the history
* Test what happens when a scope is missing
* Fix BRP_R profile -> BRP_RNAME so these auth/profile roles don't overlap.
* Added brp_schema fixture, change how brp_dataset fixture is constructed.
  • Loading branch information
vdboor committed Aug 1, 2024
1 parent f2579bf commit 70cd027
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 85 deletions.
5 changes: 4 additions & 1 deletion src/schematools/contrib/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def name_from_schema(cls, schema: DatasetSchema) -> str:
return name

@classmethod
def create_for_schema(cls, schema: DatasetSchema, path: str | None = None) -> Dataset:
def create_for_schema(
cls, schema: DatasetSchema, path: str | None = None, enable_db: bool = True
) -> Dataset:
"""Create the schema based on the Amsterdam Schema JSON input"""
name = cls.name_from_schema(schema)
if path is None:
Expand All @@ -225,6 +227,7 @@ def create_for_schema(cls, schema: DatasetSchema, path: str | None = None) -> Da
version=schema.version,
is_default_version=schema.is_default_version,
enable_api=cls.has_api_enabled(schema),
enable_db=enable_db,
)
obj._dataset_collection = schema.loader # retain collection on saving
obj.save()
Expand Down
10 changes: 8 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ def brk2_simple_schema(schema_loader) -> DatasetSchema:
return schema_loader.get_dataset_from_file("brk2_simple.json")


@pytest.fixture
def brp_schema(schema_loader) -> DatasetSchema:
"""Fixture for the BRP dataset."""
return schema_loader.get_dataset_from_file("brp.json")


@pytest.fixture
def composite_key_schema(schema_loader) -> ProfileSchema:
return schema_loader.get_dataset_from_file("composite_key.json")
Expand Down Expand Up @@ -391,9 +397,9 @@ def profile_loader(here) -> FileSystemProfileLoader:


@pytest.fixture
def brp_r_profile_schema(profile_loader) -> ProfileSchema:
def brp_rname_profile_schema(profile_loader) -> ProfileSchema:
"""A downloaded profile schema definition"""
return profile_loader.get_profile("BRP_R")
return profile_loader.get_profile("BRP_RNAME")


@pytest.fixture
Expand Down
23 changes: 6 additions & 17 deletions tests/django/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations
"""Extra fixtures for Django-based tests."""

import json
from pathlib import Path
from typing import Any
from __future__ import annotations

import pytest

Expand Down Expand Up @@ -45,24 +43,15 @@ def kadastraleobjecten_dataset(kadastraleobjecten_schema: DatasetSchema) -> Data


@pytest.fixture
def brp_r_profile(brp_r_profile_schema: ProfileSchema) -> Profile:
def brp_rname_profile(brp_rname_profile_schema: ProfileSchema) -> Profile:
"""The persistent database profile object based on a downlaoded schema definition."""
return Profile.create_for_schema(brp_r_profile_schema)


@pytest.fixture
def brp_schema_json(here: Path) -> Any:
"""Fixture for the BRP dataset."""
path = here / "files/brp.json"
return json.loads(path.read_text())
return Profile.create_for_schema(brp_rname_profile_schema)


@pytest.fixture
def brp_dataset(brp_schema_json: dict) -> Dataset:
def brp_dataset(brp_schema: DatasetSchema) -> Dataset:
"""Create a remote dataset."""
return Dataset.objects.create(
name="brp", schema_data=brp_schema_json, enable_db=False, path="brp"
)
return Dataset.create_for_schema(brp_schema, path="brp", enable_db=False)


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions tests/django/test_models_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@


@pytest.mark.django_db
def test_profile(brp_r_profile_schema):
def test_profile(brp_rname_profile_schema):
"""Prove that the BRP data is properly stored in the DB"""
brp_r_profile = Profile.create_for_schema(brp_r_profile_schema)
brp_r_profile = Profile.create_for_schema(brp_rname_profile_schema)
assert brp_r_profile.name == "brp_medewerker"
assert brp_r_profile.get_scopes() == {"BRP/R"}
assert brp_r_profile.get_scopes() == {"BRP/RNAME"}

perm = Permission(PermissionLevel.READ)
assert brp_r_profile.schema.datasets["brp"].tables["ingeschrevenpersonen"].permissions == perm
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "brp_medewerker",
"scopes": ["BRP/R"],
"scopes": ["BRP/RNAME"],
"datasets": {
"brp": {
"tables": {
Expand Down
169 changes: 111 additions & 58 deletions tests/test_permissions_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,117 @@
from schematools.types import PermissionLevel


def _active_profiles(
user_scopes: UserScopes, dataset_id: str, table_id: str | None = None
) -> set[str]:
"""Shorthand to get active profile names"""
if table_id is not None:
return {
p.dataset.profile.name
for p in user_scopes.get_active_profile_tables(dataset_id, table_id)
}
else:
return {p.profile.name for p in user_scopes.get_active_profile_datasets(dataset_id)}


def test_profile_scopes(profile_verkeer_medewerker_schema, profile_brk_encoded_schema):
"""Prove that profiles are only applied for the correct scopes."""
user_scopes = UserScopes(
query_params={},
request_scopes=["FP/MD"],
all_profiles=[profile_verkeer_medewerker_schema, profile_brk_encoded_schema],
)

assert _active_profiles(user_scopes, "brk") == set()
assert _active_profiles(user_scopes, "verkeer") == {"verkeer_medewerker"}


def test_mandatory_filters(brp_r_profile_schema):
"""Prove that having a scope + mandatory filters actives a profile."""
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/R"],
all_profiles=[brp_r_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == {"brp_medewerker"}

# Also prove the opposite: not getting access
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
},
request_scopes=["BRP/R"],
all_profiles=[brp_r_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()
def _active_profiles(user_scopes: UserScopes, dataset_id: str, table_id: str) -> set[str]:
"""Tell which profiles are active for a table."""
return {
p.dataset.profile.name for p in user_scopes.get_active_profile_tables(dataset_id, table_id)
}


def _active_profiles_by_dataset(user_scopes: UserScopes, dataset_id: str) -> set[str]:
"""Tell which profiles are active for a dataset."""
return {p.profile.name for p in user_scopes.get_active_profile_datasets(dataset_id)}


class TestProfileActivation:
def test_profile_scopes(self, profile_verkeer_medewerker_schema, profile_brk_encoded_schema):
"""Prove that profiles are only applied for the correct scopes."""
user_scopes = UserScopes(
query_params={},
request_scopes=["FP/MD"],
all_profiles=[profile_verkeer_medewerker_schema, profile_brk_encoded_schema],
)

assert _active_profiles_by_dataset(user_scopes, "brk") == set()
assert _active_profiles_by_dataset(user_scopes, "verkeer") == {"verkeer_medewerker"}

def test_mandatory_filter_match(self, brp_rname_profile_schema):
"""Prove that having a scope + mandatory filters actives a profile."""
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == {"brp_medewerker"}

def test_mandatory_filter_operators(self, brp_rname_profile_schema):
"""Prove that operators can't be used to circumvent list-filter limitations."""
user_scopes = UserScopes(
query_params={
"postcode[like]": "*",
"lastname[like]": "*",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()

def test_mandatory_filter_missing_scope(self, brp_rname_profile_schema):
# Also prove the opposite: not getting access
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/R"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()

def test_mandatory_filter_missing_query(self, brp_rname_profile_schema):
# Also prove the opposite: not getting access
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()


class TestTableAccess:

def test_has_table_fields_access(self, id_auth_schema):
"""Prove that a table with one protected field cannot be accessed with OPENBAAR scope."""

user_scopes = UserScopes(
{},
request_scopes=["OPENBAAR"],
)
table = id_auth_schema.get_table_by_id("base")
assert not user_scopes.has_table_fields_access(table)

def test_table_access_via_mandatory_filters(self, brp_schema, brp_rname_profile_schema):
"""Prove that table access can be granted if the query_params matches a profile."""
table = brp_schema.get_table_by_id("ingeschrevenpersonen")

user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)
assert not user_scopes.has_table_access(table)

# Create the object again, as it caches many results
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)
assert user_scopes.has_table_access(table)


class TestFieldAccess:
Expand Down Expand Up @@ -155,16 +218,6 @@ def test_profile_field_inheritance_from_dataset(
assert user_scopes.has_field_access(table.get_field_by_id("identificatie")) == expect
assert user_scopes.has_field_access(table.get_field_by_id("registratiedatum")) == expect

def test_has_table_fields_access(self, id_auth_schema):
"""Prove that a table with one protected field cannot be accessed with OPENBAAR scope."""

user_scopes = UserScopes(
{},
request_scopes=["OPENBAAR"],
)
table = id_auth_schema.get_table_by_id("base")
assert not user_scopes.has_table_fields_access(table)

def test_subfields_have_protection(self, subfield_auth_schema):
"""Prove that the subfields of a protected field are also protected."""

Expand Down
6 changes: 3 additions & 3 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def test_datasetschema_from_file_not_a_dataset(schema_loader) -> None:
schema_loader.get_dataset_from_file("not_a_json_file.txt")


def test_profile_schema(brp_r_profile_schema: ProfileSchema) -> None:
def test_profile_schema(brp_rname_profile_schema: ProfileSchema) -> None:
"""Prove that the profile files are properly read,
and have their fields access the JSON data.
"""
assert brp_r_profile_schema.scopes == {"BRP/R"}
assert brp_rname_profile_schema.scopes == {"BRP/RNAME"}

brp = brp_r_profile_schema.datasets["brp"]
brp = brp_rname_profile_schema.datasets["brp"]
table = brp.tables["ingeschrevenpersonen"]

assert table.permissions.level is PermissionLevel.READ
Expand Down

0 comments on commit 70cd027

Please sign in to comment.