-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Dataset integration tests - S3 Share requests #1389
Add Dataset integration tests - S3 Share requests #1389
Conversation
…+ aws clients for dataset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming does not match any of the other tests, we could call them something closer to the other modules test_shares
and test_shares_backwards_compatibility
|
||
|
||
def test_create_share_object(share1): | ||
assert_that(share1.status).is_equal_to(ShareObjectStatus.Draft.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also add assertions on the principal of the share and a new test for create_share_object for consumption roles that makes sure that the groupUri and the principal are correct
'ShareItemsFound', 'The request is empty' | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the dataset PRs are merged we should add tests on getShareObject with the filters used to list shared items. Making sure it lists the tables, folders and bucket defining them explicitly instead of depending on a list of items
assert_that(items).is_length(1) | ||
assert_that(items[0].shareItemUri).is_equal_to(share_item_uri) | ||
assert_that(items[0].status).is_equal_to(ShareItemStatus.PendingApproval.value) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that we cannot test test_reject_share
without running test_submit_object_no_auto_approval
before. The tests are coupled, which in this case it might be alright, we are testing test_share_workflow succeed in parts. I would just clearly indicate that as a comment and group them all together (all tests on share1 together and all tests on share3 together to show the path). @petrkalos wdyt?
items = updated_share['items'].nodes | ||
|
||
assert_that(updated_share.status).is_equal_to(ShareObjectStatus.Processed.value) | ||
for item in items: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concern: how do we ensure there are items and this is not an empty list? As a next step after datasets PR, for the validation we would 1) s3 get object to the bucket and access point shares 2) run an athena query on the glue tables
assert_that(items).extracting('itemType').contains(ShareableType.Table.name) | ||
assert_that(items).extracting('itemType').contains(ShareableType.S3Bucket.name) | ||
assert_that(items).extracting('itemType').contains(ShareableType.StorageLocation.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe assert_that(items).extracting('itemType').contains('foo1', 'foo2', 'foo3')
docs?
for item in items: | ||
assert_that(item.status).is_equal_to(ShareItemStatus.Revoke_Succeeded.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe assert_that(items).extracting('status').contains_only(ShareItemStatus.Revoke_Succeeded.value)
?
tests_new/clean_up_s3.sh
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd guess you need that because the account exceeded the number of buckets. Here is a script I wrote that can be run as a lambda which will clean up all the "orphan" (buckets that do not belong to any cfn stack) buckets. We can later extend this to other resources that are being left behind and maybe run it as part of the pytest teardown or as part of the pipeline.
p.s One piece that is currently missing is that if S3 buckets have access points it will fail to delete them.
import logging
import sys
from concurrent.futures.thread import ThreadPoolExecutor
import boto3
from botocore.exceptions import ClientError
logging.getLogger().setLevel(logging.INFO)
if not logging.getLogger().hasHandlers():
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
logger = logging.getLogger(__name__)
session = boto3.session.Session()
s3client = session.client('s3')
s3resource = session.resource('s3')
def is_dataall_bucket(bucket) -> bool:
try:
tags = {tag['Key']: tag['Value'] for tag in bucket.Tagging().tag_set}
return 'testUser' in tags.get('Creator', '') and tags.get('Environment', '').startswith('test')
except ClientError as e:
return False
def is_orphan_bucket(bucket):
region = s3client.get_bucket_location(Bucket=bucket.name)['LocationConstraint'] or 'us-east-1'
cfnclient = session.client('cloudformation', region_name=region)
try:
return not cfnclient.describe_stack_resources(PhysicalResourceId=bucket.name)
except ClientError as e:
return 'does not exist' in e.response['Error']['Message']
def delete_bucket(bucket):
bucket_versioning = bucket.Versioning()
if bucket_versioning.status == 'Enabled':
bucket.object_versions.delete()
else:
bucket.objects.all().delete()
bucket.delete()
def cleanup_bucket(bucket):
try:
logger.info(f'checking {bucket.name=}')
if is_dataall_bucket(bucket) and is_orphan_bucket(bucket):
logger.info(f'deleting {bucket.name}')
delete_bucket(bucket)
except Exception:
logger.exception(f'something went wrong when deleting {bucket.name=}')
def run():
with ThreadPoolExecutor(max_workers=8) as tpe:
for _ in tpe.map(cleanup_bucket, s3resource.buckets.all()):
...
def lambda_handler(event, context):
run()
if __name__ == '__main__':
lambda_handler(None, None)
} | ||
``` | ||
- For this deployment the `config.json` flag `cdk_pivot_role_multiple_environments_same_account` must be set to `true` if an AWS account is going to be reused for multiple environments, | ||
- Second test account is bootstraped, and first account is added to trusted policy in target regions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The term second and first account are a bit confusing here. We have 3 types of accounts
- DevOps/Tooling account
- Service/Deployment account
- Environment accounts
All environment accounts must trust the service/deployment account and not the first account. Although environment and service account can be the same account we neither encourage this nor use it in our own pipeline.
Would you mind make it clearer in the doc?
def run_query(self, query, workgroup='primary', output_location=None): | ||
if output_location: | ||
result = self._client.start_query_execution( | ||
QueryString=query, ResultConfiguration={'OutputLocation': output_location} | ||
) | ||
else: | ||
result = self._client.start_query_execution(QueryString=query, WorkGroup=workgroup) | ||
return result['QueryExecutionId'] | ||
|
||
def wait_for_query(self, query_id): | ||
for i in range(self.retries): | ||
result = self._client.get_query_execution(QueryExecutionId=query_id) | ||
state = result['QueryExecution']['Status']['State'] | ||
if state not in ['QUEUED', 'RUNNING']: | ||
return state | ||
time.sleep(self.timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd make those 2 private and provide a higher level blocking method for queries. Perhaps you can use the boto3 waiter as well.
"aws_profiles": { | ||
"second": "second_int_test_profile" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the use of aws_profiles
will not play very well with CodeBuild, instead I propose to use the existing infrastracture (see session_env1_aws_client
).
As discussed offline you need this account to test consumption roles. By using the integration test account you can solve it using the two following patterns
- (simpler) add the already created (during env deployment) integration-test role directly as a consumption role. Current (CodeBuild/Local) account have already permissions to assume this role so you can use STS to assume it and then run S3 queries to make sure that the share was succesful.
- (more complex) use the integration-test role to create new roles in the target account that you will register as consumption roles. Then proceed with assuming those roles and testing for S3 access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemeted via AssumeRole
@pytest.mark.parametrize( | ||
'principal_type', | ||
['Group', 'ConsumptionRole'], | ||
) | ||
def test_create_and_delete_share_object( | ||
client5, persistent_cross_acc_env_1, session_s3_dataset1, consumption_role_1, group5, principal_type | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of parametrizing the tests you can parametrize a fixture and then all tests that are using this fixture will run as many times as the parameters of the fixture.
For example in this case you can do something like...
@pytest.fixture(params=["Group", "ConsumptionRole"])
def principal1(request, group5, consumption_role_1):
"""
:return: tuple with (principalUri, principalType)
"""
if request.param is 'Group':
yield (group5, request.param)
else:
yield (consumption_role_1.consumptionRoleUri, request.param)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implemented
@pytest.mark.parametrize( | ||
'share_fixture_name', | ||
['session_share_1', 'session_share_consrole_1'], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my previous comment do you think it's possible to use parametrized fixtures here as well to avoid the getfixturevalue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implemented
session = boto3.Session() | ||
param_client = session.client('ssm', os.environ.get('AWS_REGION', 'us-east-1')) | ||
parameter_path = f"/dataall/{os.environ.get('ENVNAME', 'dev')}/toolingAccount" | ||
print(parameter_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
"AWS": "arn:aws:iam::{account_id}:root" | ||
"AWS": ["arn:aws:iam::{account_id}:root", | ||
"arn:aws:iam::{IAMClient.get_tooling_account_id()}:root", | ||
"arn:aws:sts::{account_id}:assumed-role/{test_role_name}/{test_role_name}"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am no IAM expert but do we need this? I think the first principal (line 46) will allow all roles from account_id
to assume this role. Check this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesn't work. Assumed role is processed differently. I tried without it and got Access Denied, so I had to explicitly add this.
def __init__(self, session, region): | ||
if not session: | ||
session = boto3.Session() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shorthand...
def __init__(self, region, session = boto3.Session()):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks awesome 👍
@@ -34,7 +43,9 @@ def create_role(self, account_id, role_name): | |||
{{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd make this a dict and then do a json.dumps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"accountId": "...", | ||
"region": "us-east-1" | ||
}, | ||
"persistent_cross_acc_env_1": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you decide to have a persistent environment for speed or there are other reasons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Speed up.
- Later we will need persistent shares as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @dlpzx made a very good point in another PR about using persistent env/shares.
If we use them without forcing an update (and wait for it to complete) then we might not be testing the latest changes but if we do that then we might as well create a new env every time.
I think we should be able to use persistent envs with the argument of speed only for not very significant features AND obviously to test backwards compatibility (but for that we should still force update and wait).
Feature or Bugfix
Detail
share_base
delete_env
requiresenv_object
notenvUri
Relates
Security
Please answer the questions below briefly where applicable, or write
N/A
. Based onOWASP 10.
fetching data from storage outside the application (e.g. a database, an S3 bucket)?
eval
or similar functions are used?By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.