diff --git a/backend/dataall/modules/datasets/tasks/bucket_policy_updater.py b/backend/dataall/modules/datasets/tasks/bucket_policy_updater.py deleted file mode 100644 index 0de0ad66a..000000000 --- a/backend/dataall/modules/datasets/tasks/bucket_policy_updater.py +++ /dev/null @@ -1,172 +0,0 @@ -import json -import logging -import os -import sys - -from sqlalchemy import and_ - -from dataall.base.db import get_engine -from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository -from dataall.modules.datasets.aws.s3_dataset_bucket_policy_client import S3DatasetBucketPolicyClient -from dataall.modules.datasets_base.db.dataset_models import Dataset - -root = logging.getLogger() -root.setLevel(logging.INFO) -if not root.hasHandlers(): - root.addHandler(logging.StreamHandler(sys.stdout)) -log = logging.getLogger(__name__) - - -class BucketPoliciesUpdater: - def __init__(self, engine, event=None): - self.engine = engine - self.event = event - self.reports = [] - - def sync_imported_datasets_bucket_policies(self): - with self.engine.scoped_session() as session: - imported_datasets = ( - session.query(Dataset) - .filter( - and_( - Dataset.imported, - Dataset.deleted.is_(None), - ) - ) - .all() - ) - log.info(f'Found {len(imported_datasets)} imported datasets') - - for dataset in imported_datasets: - account_prefixes = {} - - shared_tables = ShareObjectRepository.get_shared_tables(session, dataset) - log.info( - f'Found {len(shared_tables)} shared tables with dataset {dataset.S3BucketName}' - ) - - shared_folders = ShareObjectRepository.get_shared_folders(session, dataset) - log.info( - f'Found {len(shared_folders)} shared folders with dataset {dataset.S3BucketName}' - ) - - for table in shared_tables: - data_prefix = self.clear_table_location_from_delta_path(table) - prefix = data_prefix.rstrip('/') + '/*' - accountid = table.TargetAwsAccountId - - prefix = f"arn:aws:s3:::{prefix.split('s3://')[1]}" - self.group_prefixes_by_accountid( - accountid, prefix, account_prefixes - ) - - bucket = ( - f"arn:aws:s3:::{prefix.split('arn:aws:s3:::')[1].split('/')[0]}" - ) - self.group_prefixes_by_accountid( - accountid, bucket, account_prefixes - ) - - for folder in shared_folders: - prefix = f'arn:aws:s3:::{folder.S3Prefix}' + '/*' - accountid = folder.AwsAccountId - self.group_prefixes_by_accountid( - accountid, prefix, account_prefixes - ) - bucket = ( - f"arn:aws:s3:::{prefix.split('arn:aws:s3:::')[1].split('/')[0]}" - ) - self.group_prefixes_by_accountid( - accountid, bucket, account_prefixes - ) - - client = S3DatasetBucketPolicyClient(dataset) - - policy = client.get_bucket_policy() - - BucketPoliciesUpdater.update_policy(account_prefixes, policy) - - report = client.put_bucket_policy(policy) - - self.reports.append(report) - - if any(r['status'] == 'FAILED' for r in self.reports): - raise Exception( - 'Failed to update one or more bucket policies' - f'Check the reports: {self.reports}' - ) - return self.reports - - @staticmethod - def clear_table_location_from_delta_path(table): - data_prefix = ( - table.S3Prefix - if '/packages.delta' not in table.S3Prefix - else table.S3Prefix.replace('/packages.delta', '') - ) - data_prefix = ( - data_prefix - if '/_symlink_format_manifest' not in data_prefix - else data_prefix.replace('/_symlink_format_manifest', '') - ) - return data_prefix - - @staticmethod - def update_policy(account_prefixes, policy): - log.info('Updating Policy') - statements = policy['Statement'] - for key, value in account_prefixes.items(): - added = False - for s in statements: - if key in s.get('Principal').get('AWS') and 'DA' in s.get('Sid'): - log.info(f'Principal already on the policy {key}') - added = True - for v in value: - if v not in s.get('Resource'): - existing_resources = ( - list(s.get('Resource')) - if not isinstance(s.get('Resource'), list) - else s.get('Resource') - ) - existing_resources.append(v) - s['Resource'] = existing_resources - break - if not added: - log.info( - f'Principal {key} with permissions {value} ' - f'Not on the policy adding it' - ) - statements.append( - { - 'Sid': f'DA{key}', - 'Effect': 'Allow', - 'Action': ['s3:Get*', 's3:List*'], - 'Resource': value - if isinstance(value, list) and len(value) > 1 - else value, - 'Principal': {'AWS': key}, - } - ) - policy.update({'Statement': statements}) - log.info(f'Final Policy --> {policy}') - return policy - - @classmethod - def group_prefixes_by_accountid(cls, accountid, prefix, account_prefixes): - if account_prefixes.get(accountid): - prefixes = account_prefixes[accountid] - if prefix not in prefixes: - prefixes.append(prefix) - account_prefixes[accountid] = prefixes - else: - account_prefixes[accountid] = [prefix] - return account_prefixes - - -if __name__ == '__main__': - ENVNAME = os.environ.get('envname', 'local') - ENGINE = get_engine(envname=ENVNAME) - log.info('Updating bucket policies for shared datasets...') - service = BucketPoliciesUpdater(engine=ENGINE) - service.sync_imported_datasets_bucket_policies() - log.info('Bucket policies for shared datasets update successfully...') diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index 1c0c6a85e..277c1d08b 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -205,7 +205,6 @@ def __init__( self.add_catalog_indexer_task() self.add_sync_dataset_table_task() - self.add_bucket_policy_updater_task() self.add_subscription_task() self.add_share_management_task() @@ -302,28 +301,6 @@ def add_subscription_task(self): ) self.ecs_task_definitions_families.append(subscriptions_task.task_definition.family) - @run_if(["modules.datasets.active"]) - def add_bucket_policy_updater_task(self): - update_bucket_policies_task, update_bucket_task_def = self.set_scheduled_task( - cluster=self.ecs_cluster, - command=['python3.9', '-m', 'dataall.modules.datasets.tasks.bucket_policy_updater'], - container_id=f'container', - ecr_repository=self._ecr_repository, - environment=self._create_env('DEBUG'), - image_tag=self._cdkproxy_image_tag, - log_group=self.create_log_group( - self._envname, self._resource_prefix, log_group_name='policies-updater' - ), - schedule_expression=Schedule.expression('rate(15 minutes)'), - scheduled_task_id=f'{self._resource_prefix}-{self._envname}-policies-updater-schedule', - task_id=f'{self._resource_prefix}-{self._envname}-policies-updater', - task_role=self.task_role, - vpc=self._vpc, - security_group=self.scheduled_tasks_sg, - prod_sizing=self._prod_sizing, - ) - self.ecs_task_definitions_families.append(update_bucket_policies_task.task_definition.family) - @run_if(["modules.datasets.active"]) def add_sync_dataset_table_task(self): sync_tables_task, sync_tables_task_def = self.set_scheduled_task( diff --git a/tests/modules/datasets/tasks/test_dataset_policies.py b/tests/modules/datasets/tasks/test_dataset_policies.py deleted file mode 100644 index 83f36b2c0..000000000 --- a/tests/modules/datasets/tasks/test_dataset_policies.py +++ /dev/null @@ -1,111 +0,0 @@ -from unittest.mock import MagicMock - -from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset -from dataall.modules.datasets.tasks.bucket_policy_updater import BucketPoliciesUpdater -import pytest - - -@pytest.fixture(scope='module', autouse=True) -def sync_dataset(org_fixture, env_fixture, db): - with db.scoped_session() as session: - dataset = Dataset( - organizationUri=org_fixture.organizationUri, - environmentUri=env_fixture.environmentUri, - label='label', - owner='foo', - SamlAdminGroupName='foo', - businessOwnerDelegationEmails=['foo@amazon.com'], - businessOwnerEmail=['bar@amazon.com'], - name='name', - S3BucketName='S3BucketName', - GlueDatabaseName='GlueDatabaseName', - KmsAlias='kmsalias', - AwsAccountId='123456789012', - region='eu-west-1', - IAMDatasetAdminUserArn=f'arn:aws:iam::123456789012:user/dataset', - IAMDatasetAdminRoleArn=f'arn:aws:iam::123456789012:role/dataset', - imported=True, - ) - session.add(dataset) - yield dataset - - -@pytest.fixture(scope='module', autouse=True) -def table(org, env, db, sync_dataset): - with db.scoped_session() as session: - table = DatasetTable( - datasetUri=sync_dataset.datasetUri, - AWSAccountId='12345678901', - S3Prefix='S3prefix', - label='label', - owner='foo', - name='name', - GlueTableName='table1', - S3BucketName='S3BucketName', - GlueDatabaseName='GlueDatabaseName', - region='eu-west-1', - ) - session.add(table) - yield table - - -def test_prefix_delta(): - s = 's3://insite-data-lake-core-alpha-eu-west-1/forecast/ship_plan/insite_version=0.1/insite_region_id=2/ship_plan.delta/_symlink_format_manifest/*' - delta_path = s.split('/_symlink_format_manifest')[0].split('/')[-1] - prefix = s.split(f'/{delta_path}')[0] - assert ( - prefix - == 's3://insite-data-lake-core-alpha-eu-west-1/forecast/ship_plan/insite_version=0.1/insite_region_id=2' - ) - prefix = 'arn:aws:s3:::insite-data-lake-core-alpha-eu-west-1/forecast/ship_plan/insite_version=0.1/insite_region_id=2' - bucket = prefix.split('arn:aws:s3:::')[1].split('/')[0] - assert bucket == 'insite-data-lake-core-alpha-eu-west-1' - - -def test_group_prefixes_by_accountid(db, mocker): - statements = {} - updater = BucketPoliciesUpdater(db) - updater.group_prefixes_by_accountid('675534', 'prefix1', statements) - updater.group_prefixes_by_accountid('675534', 'prefix2', statements) - updater.group_prefixes_by_accountid('675534', 'prefix3', statements) - updater.group_prefixes_by_accountid('675534', 'prefix3', statements) - updater.group_prefixes_by_accountid('3455', 'prefix4', statements) - assert len(set(statements['675534'])) == 3 - policy = { - 'Version': '2012-10-17', - 'Statement': [ - { - 'Sid': f'OwnerAccount', - 'Effect': 'Allow', - 'Action': ['s3:*'], - 'Resource': [ - f'arn:aws:s3:::', - f'arn:aws:s3:::', - ], - 'Principal': {'AWS': f'arn:aws:iam::root'}, - }, - { - 'Sid': f'DH675534', - 'Effect': 'Allow', - 'Action': ['s3:*'], - 'Resource': [ - f'prefix3', - f'prefix2', - ], - 'Principal': {'AWS': '675534'}, - }, - ] - } - BucketPoliciesUpdater.update_policy(statements, policy) - assert policy - - -def test_handler(org, env, db, sync_dataset, mocker): - s3_client = MagicMock() - mocker.patch('dataall.modules.datasets.tasks.bucket_policy_updater.S3DatasetBucketPolicyClient', s3_client) - s3_client().get_bucket_policy.return_value = {'Version': '2012-10-17', 'Statement': []} - s3_client().put_bucket_policy.return_value = {'status': 'SUCCEEDED'} - - updater = BucketPoliciesUpdater(db) - assert len(updater.sync_imported_datasets_bucket_policies()) == 1 - assert updater.sync_imported_datasets_bucket_policies()[0]['status'] == 'SUCCEEDED'