From 752e824130bf53ddeb4395cd17c06de201210628 Mon Sep 17 00:00:00 2001 From: dlpzx <71252798+dlpzx@users.noreply.github.com> Date: Thu, 8 Feb 2024 16:47:27 +0100 Subject: [PATCH] Remove policies-updater ECS task (#1046) ### Feature or Bugfix - Bugfix ### Detail In data.all there are 7 ECS Tasks: Tasks currently being used: - cdkproxy -- on demand - it deploys CDK stacks in Environment accounts - share-manager -- on demand - it executes sharing actions - stacks-updater -- on schedule every 1day - Every day environment and dataset stacks are updated Tasks that need to be reviewed: - subscriptions -- on schedule every 15mins - it tries to poll message from subscriptions queue. The queue is empty and we are not posting any messages. We could consider subscriptions to be legacy at the moment. - catalog-indexer -- on schedule every 6hours - it reads all active items from RDS and indexes them in the Catalog. It does not look for deleted items. - tables-syncer -- on schedule every 15mins - it reads all active datasets. With boto3 it reads the Glue tables in that database and syncs the Glue tables with the registered tables in data.all. It upserts in OpenSearch and grant LF permissions. Tasks that currently are not used and need to be removed: - policies-updater -- on schedule every 15mins - it reapplies shares on imported buckets. It is legacy from folder sharing based on bucket policies. It uses the generic ecs-tasks-role In this PR the task policies-updater is removed Tested in AWS - [X] CICD pipeline succeeds - [X] ECS CFN stack is updated and deleted policies-updater task. It also deletes log-group. All other tasks remain untouched ### Relates - ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- .../datasets/tasks/bucket_policy_updater.py | 172 ------------------ deploy/stacks/container.py | 23 --- .../datasets/tasks/test_dataset_policies.py | 111 ----------- 3 files changed, 306 deletions(-) delete mode 100644 backend/dataall/modules/datasets/tasks/bucket_policy_updater.py delete mode 100644 tests/modules/datasets/tasks/test_dataset_policies.py diff --git a/backend/dataall/modules/datasets/tasks/bucket_policy_updater.py b/backend/dataall/modules/datasets/tasks/bucket_policy_updater.py deleted file mode 100644 index 0de0ad66a..000000000 --- a/backend/dataall/modules/datasets/tasks/bucket_policy_updater.py +++ /dev/null @@ -1,172 +0,0 @@ -import json -import logging -import os -import sys - -from sqlalchemy import and_ - -from dataall.base.db import get_engine -from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository -from dataall.modules.datasets.aws.s3_dataset_bucket_policy_client import S3DatasetBucketPolicyClient -from dataall.modules.datasets_base.db.dataset_models import Dataset - -root = logging.getLogger() -root.setLevel(logging.INFO) -if not root.hasHandlers(): - root.addHandler(logging.StreamHandler(sys.stdout)) -log = logging.getLogger(__name__) - - -class BucketPoliciesUpdater: - def __init__(self, engine, event=None): - self.engine = engine - self.event = event - self.reports = [] - - def sync_imported_datasets_bucket_policies(self): - with self.engine.scoped_session() as session: - imported_datasets = ( - session.query(Dataset) - .filter( - and_( - Dataset.imported, - Dataset.deleted.is_(None), - ) - ) - .all() - ) - log.info(f'Found {len(imported_datasets)} imported datasets') - - for dataset in imported_datasets: - account_prefixes = {} - - shared_tables = ShareObjectRepository.get_shared_tables(session, dataset) - log.info( - f'Found {len(shared_tables)} shared tables with dataset {dataset.S3BucketName}' - ) - - shared_folders = ShareObjectRepository.get_shared_folders(session, dataset) - log.info( - f'Found {len(shared_folders)} shared folders with dataset {dataset.S3BucketName}' - ) - - for table in shared_tables: - data_prefix = self.clear_table_location_from_delta_path(table) - prefix = data_prefix.rstrip('/') + '/*' - accountid = table.TargetAwsAccountId - - prefix = f"arn:aws:s3:::{prefix.split('s3://')[1]}" - self.group_prefixes_by_accountid( - accountid, prefix, account_prefixes - ) - - bucket = ( - f"arn:aws:s3:::{prefix.split('arn:aws:s3:::')[1].split('/')[0]}" - ) - self.group_prefixes_by_accountid( - accountid, bucket, account_prefixes - ) - - for folder in shared_folders: - prefix = f'arn:aws:s3:::{folder.S3Prefix}' + '/*' - accountid = folder.AwsAccountId - self.group_prefixes_by_accountid( - accountid, prefix, account_prefixes - ) - bucket = ( - f"arn:aws:s3:::{prefix.split('arn:aws:s3:::')[1].split('/')[0]}" - ) - self.group_prefixes_by_accountid( - accountid, bucket, account_prefixes - ) - - client = S3DatasetBucketPolicyClient(dataset) - - policy = client.get_bucket_policy() - - BucketPoliciesUpdater.update_policy(account_prefixes, policy) - - report = client.put_bucket_policy(policy) - - self.reports.append(report) - - if any(r['status'] == 'FAILED' for r in self.reports): - raise Exception( - 'Failed to update one or more bucket policies' - f'Check the reports: {self.reports}' - ) - return self.reports - - @staticmethod - def clear_table_location_from_delta_path(table): - data_prefix = ( - table.S3Prefix - if '/packages.delta' not in table.S3Prefix - else table.S3Prefix.replace('/packages.delta', '') - ) - data_prefix = ( - data_prefix - if '/_symlink_format_manifest' not in data_prefix - else data_prefix.replace('/_symlink_format_manifest', '') - ) - return data_prefix - - @staticmethod - def update_policy(account_prefixes, policy): - log.info('Updating Policy') - statements = policy['Statement'] - for key, value in account_prefixes.items(): - added = False - for s in statements: - if key in s.get('Principal').get('AWS') and 'DA' in s.get('Sid'): - log.info(f'Principal already on the policy {key}') - added = True - for v in value: - if v not in s.get('Resource'): - existing_resources = ( - list(s.get('Resource')) - if not isinstance(s.get('Resource'), list) - else s.get('Resource') - ) - existing_resources.append(v) - s['Resource'] = existing_resources - break - if not added: - log.info( - f'Principal {key} with permissions {value} ' - f'Not on the policy adding it' - ) - statements.append( - { - 'Sid': f'DA{key}', - 'Effect': 'Allow', - 'Action': ['s3:Get*', 's3:List*'], - 'Resource': value - if isinstance(value, list) and len(value) > 1 - else value, - 'Principal': {'AWS': key}, - } - ) - policy.update({'Statement': statements}) - log.info(f'Final Policy --> {policy}') - return policy - - @classmethod - def group_prefixes_by_accountid(cls, accountid, prefix, account_prefixes): - if account_prefixes.get(accountid): - prefixes = account_prefixes[accountid] - if prefix not in prefixes: - prefixes.append(prefix) - account_prefixes[accountid] = prefixes - else: - account_prefixes[accountid] = [prefix] - return account_prefixes - - -if __name__ == '__main__': - ENVNAME = os.environ.get('envname', 'local') - ENGINE = get_engine(envname=ENVNAME) - log.info('Updating bucket policies for shared datasets...') - service = BucketPoliciesUpdater(engine=ENGINE) - service.sync_imported_datasets_bucket_policies() - log.info('Bucket policies for shared datasets update successfully...') diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index 1c0c6a85e..277c1d08b 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -205,7 +205,6 @@ def __init__( self.add_catalog_indexer_task() self.add_sync_dataset_table_task() - self.add_bucket_policy_updater_task() self.add_subscription_task() self.add_share_management_task() @@ -302,28 +301,6 @@ def add_subscription_task(self): ) self.ecs_task_definitions_families.append(subscriptions_task.task_definition.family) - @run_if(["modules.datasets.active"]) - def add_bucket_policy_updater_task(self): - update_bucket_policies_task, update_bucket_task_def = self.set_scheduled_task( - cluster=self.ecs_cluster, - command=['python3.9', '-m', 'dataall.modules.datasets.tasks.bucket_policy_updater'], - container_id=f'container', - ecr_repository=self._ecr_repository, - environment=self._create_env('DEBUG'), - image_tag=self._cdkproxy_image_tag, - log_group=self.create_log_group( - self._envname, self._resource_prefix, log_group_name='policies-updater' - ), - schedule_expression=Schedule.expression('rate(15 minutes)'), - scheduled_task_id=f'{self._resource_prefix}-{self._envname}-policies-updater-schedule', - task_id=f'{self._resource_prefix}-{self._envname}-policies-updater', - task_role=self.task_role, - vpc=self._vpc, - security_group=self.scheduled_tasks_sg, - prod_sizing=self._prod_sizing, - ) - self.ecs_task_definitions_families.append(update_bucket_policies_task.task_definition.family) - @run_if(["modules.datasets.active"]) def add_sync_dataset_table_task(self): sync_tables_task, sync_tables_task_def = self.set_scheduled_task( diff --git a/tests/modules/datasets/tasks/test_dataset_policies.py b/tests/modules/datasets/tasks/test_dataset_policies.py deleted file mode 100644 index 83f36b2c0..000000000 --- a/tests/modules/datasets/tasks/test_dataset_policies.py +++ /dev/null @@ -1,111 +0,0 @@ -from unittest.mock import MagicMock - -from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset -from dataall.modules.datasets.tasks.bucket_policy_updater import BucketPoliciesUpdater -import pytest - - -@pytest.fixture(scope='module', autouse=True) -def sync_dataset(org_fixture, env_fixture, db): - with db.scoped_session() as session: - dataset = Dataset( - organizationUri=org_fixture.organizationUri, - environmentUri=env_fixture.environmentUri, - label='label', - owner='foo', - SamlAdminGroupName='foo', - businessOwnerDelegationEmails=['foo@amazon.com'], - businessOwnerEmail=['bar@amazon.com'], - name='name', - S3BucketName='S3BucketName', - GlueDatabaseName='GlueDatabaseName', - KmsAlias='kmsalias', - AwsAccountId='123456789012', - region='eu-west-1', - IAMDatasetAdminUserArn=f'arn:aws:iam::123456789012:user/dataset', - IAMDatasetAdminRoleArn=f'arn:aws:iam::123456789012:role/dataset', - imported=True, - ) - session.add(dataset) - yield dataset - - -@pytest.fixture(scope='module', autouse=True) -def table(org, env, db, sync_dataset): - with db.scoped_session() as session: - table = DatasetTable( - datasetUri=sync_dataset.datasetUri, - AWSAccountId='12345678901', - S3Prefix='S3prefix', - label='label', - owner='foo', - name='name', - GlueTableName='table1', - S3BucketName='S3BucketName', - GlueDatabaseName='GlueDatabaseName', - region='eu-west-1', - ) - session.add(table) - yield table - - -def test_prefix_delta(): - s = 's3://insite-data-lake-core-alpha-eu-west-1/forecast/ship_plan/insite_version=0.1/insite_region_id=2/ship_plan.delta/_symlink_format_manifest/*' - delta_path = s.split('/_symlink_format_manifest')[0].split('/')[-1] - prefix = s.split(f'/{delta_path}')[0] - assert ( - prefix - == 's3://insite-data-lake-core-alpha-eu-west-1/forecast/ship_plan/insite_version=0.1/insite_region_id=2' - ) - prefix = 'arn:aws:s3:::insite-data-lake-core-alpha-eu-west-1/forecast/ship_plan/insite_version=0.1/insite_region_id=2' - bucket = prefix.split('arn:aws:s3:::')[1].split('/')[0] - assert bucket == 'insite-data-lake-core-alpha-eu-west-1' - - -def test_group_prefixes_by_accountid(db, mocker): - statements = {} - updater = BucketPoliciesUpdater(db) - updater.group_prefixes_by_accountid('675534', 'prefix1', statements) - updater.group_prefixes_by_accountid('675534', 'prefix2', statements) - updater.group_prefixes_by_accountid('675534', 'prefix3', statements) - updater.group_prefixes_by_accountid('675534', 'prefix3', statements) - updater.group_prefixes_by_accountid('3455', 'prefix4', statements) - assert len(set(statements['675534'])) == 3 - policy = { - 'Version': '2012-10-17', - 'Statement': [ - { - 'Sid': f'OwnerAccount', - 'Effect': 'Allow', - 'Action': ['s3:*'], - 'Resource': [ - f'arn:aws:s3:::', - f'arn:aws:s3:::', - ], - 'Principal': {'AWS': f'arn:aws:iam::root'}, - }, - { - 'Sid': f'DH675534', - 'Effect': 'Allow', - 'Action': ['s3:*'], - 'Resource': [ - f'prefix3', - f'prefix2', - ], - 'Principal': {'AWS': '675534'}, - }, - ] - } - BucketPoliciesUpdater.update_policy(statements, policy) - assert policy - - -def test_handler(org, env, db, sync_dataset, mocker): - s3_client = MagicMock() - mocker.patch('dataall.modules.datasets.tasks.bucket_policy_updater.S3DatasetBucketPolicyClient', s3_client) - s3_client().get_bucket_policy.return_value = {'Version': '2012-10-17', 'Statement': []} - s3_client().put_bucket_policy.return_value = {'status': 'SUCCEEDED'} - - updater = BucketPoliciesUpdater(db) - assert len(updater.sync_imported_datasets_bucket_policies()) == 1 - assert updater.sync_imported_datasets_bucket_policies()[0]['status'] == 'SUCCEEDED'