Source code for idem_aws.states.aws.s3.bucket_policy

"""State module for managing AWS S3 bucket policy."""
import copy
from typing import Any
from typing import Dict

__contracts__ = ["resource"]

TREQ = {
    "present": {
        "require": [
            "aws.s3.bucket.present",
        ],
    },
}

BUCKET_NOT_EXISTS_ERROR_MESSAGE = "ClientError: An error occurred (404) when calling the HeadBucket operation: Not Found"


[docs]async def present( hub, ctx, name: str, bucket: str, policy: str, confirm_remove_self_bucket_access: bool = None, expected_bucket_owner: str = None, resource_id: str = None, ) -> Dict[str, Any]: """Applies an Amazon S3 bucket policy to an Amazon S3 bucket. If you are using an identity other than the root user of the Amazon Web Services account that owns the bucket, the calling identity must have the PutBucketPolicy permissions on the specified bucket and belong to the bucket owner's account in order to use this operation. If you don't have PutBucketPolicy permissions, Amazon S3 returns a 403 Access Denied error. If you have the correct permissions, but you're not using an identity that belongs to the bucket owner's account, Amazon S3 returns a 405 Method Not Allowed error. Args: name(str): The name of the bucket policy. bucket(str): The name of the S3 bucket policy(str): The bucket policy as a JSON document. confirm_remove_self_bucket_access(bool, Optional): Set this parameter to true to confirm that you want to remove your permissions to change this bucket policy in the future. expected_bucket_owner(str, Optional): The account ID of the expected bucket owner. If the bucket is owned by a different account, the request will fail with an HTTP 403 (Access Denied) error. resource_id(str, Optional): S3 Bucket policy ID Request Syntax: .. code-block:: yaml [bucket_name]-policy: aws.s3.bucket_policy.present: - bucket: "string" - policy: "string" - confirm_remove_self_bucket_access: "bool" - expected_bucket_owner: "string" Returns: dict[str, Any] Examples: .. code-block:: yaml my-s3-bucket-bb7bb32e9533-policy: aws.s3.bucket_policy.present: - bucket: my-s3-bucket-bb7bb32e9533 - policy: '{"Version":"2012-10-17","Statement":[{"Sid":"PublicReadGetObject","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::418235808912:root"},"Action":"s3:GetObject","Resource":"arn:aws:s3:::thebugbucket3/*"}]}' - confirm_remove_self_bucket_access: False - expected_bucket_owner: 1239234249 """ result = dict(comment=[], name=name, old_state=None, new_state=None, result=True) # Check if Policy state name has correct format i.e. [bucket_name]-policy if name.split("-policy")[0] != bucket: result["comment"] += [ f"Incorrect aws.s3.bucket_policy name: {name}. Expected name {bucket}-policy" ] result["result"] = False return result before = None resource_updated = False plan_state = None # Standardise on the json format policy = hub.tool.aws.state_comparison_utils.standardise_json(policy) # Get current bucket policy for the bucket if resource_id: before = await hub.exec.boto3.client.s3.get_bucket_policy( ctx, Bucket=bucket, ExpectedBucketOwner=expected_bucket_owner ) if not before["result"] or not before["ret"]: result["comment"] += [ f"Failed to get existing Bucket policy '{name}'" ] + before["comment"] result["result"] = before["result"] return result result[ "old_state" ] = hub.tool.aws.s3.conversion_utils.convert_raw_bucket_policy_to_present( raw_resource=before["ret"], bucket=bucket, name=name ) plan_state = copy.deepcopy(result["old_state"]) result["comment"] += [f"Bucket policy '{name}' already present."] if not hub.tool.aws.state_comparison_utils.are_policies_equal( result["old_state"]["policy"], policy ): resource_updated = True plan_state["policy"] = policy # Handling test scenario seperately if ctx.get("test", False): if not before: result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "policy": policy, "bucket": bucket, "resource_id": bucket + "-policy", }, ) result["comment"] += [f"Would create aws.s3.bucket_policy '{name}'"] else: result["new_state"] = plan_state if resource_updated: result["comment"] += [f"Would update aws.s3.bucket_policy '{name}'"] else: result["comment"] += [f"No change in policy."] return result # Check if bucket exists if not ctx.get("test", False): ret = await hub.exec.boto3.client.s3.head_bucket(ctx, Bucket=bucket) if not ret["result"]: if BUCKET_NOT_EXISTS_ERROR_MESSAGE in ret["comment"]: result["comment"] += [ f"aws.s3.bucket with name: {bucket} does not exist" ] else: result["comment"] = ret["comment"] result["result"] = False return result # If policy is not present or if it is updated if (not before) or resource_updated: ret = await hub.exec.boto3.client.s3.put_bucket_policy( ctx, Bucket=bucket, Policy=policy, ConfirmRemoveSelfBucketAccess=confirm_remove_self_bucket_access, ExpectedBucketOwner=expected_bucket_owner, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result if resource_updated: result["comment"] += [f"Policy updated successfully"] else: result["comment"] += [f"Bucket policy '{name}' created."] after = await hub.exec.boto3.client.s3.get_bucket_policy( ctx, Bucket=bucket, ExpectedBucketOwner=expected_bucket_owner ) result[ "new_state" ] = hub.tool.aws.s3.conversion_utils.convert_raw_bucket_policy_to_present( raw_resource=after["ret"], bucket=bucket, name=name ) else: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"] += [f"No change in policy."] return result
[docs]async def absent( hub, ctx, name: str, bucket: str = None, resource_id: str = None, expected_bucket_owner: str = None, ) -> Dict[str, Any]: """Deletes the policy of specified s3 bucket. Args: name(str): The name of the bucket policy. bucket(str, Optional): The name of the S3 bucket resource_id(str, Optional): S3 Bucket policy ID. Idem automatically considers this resource being absent if this field is not specified. expected_bucket_owner(str, Optional): The account ID of the expected bucket owner. If the bucket is owned by a different account, the request will fail with an HTTP 403 (Access Denied) error. Request Syntax: .. code-block:: sls [bucket_name]-policy: aws.s3.bucket_policy.absent: - bucket: string - expected_bucket_owner: string Returns: dict[str, Any] Examples: .. code-block:: yaml my-s3-bucket-bb7bb32e9533-policy: aws.s3.bucket_policy.absent: - bucket: my-s3-bucket-bb7bb32e9533 - expected_bucket_owner: 1239234249 """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not resource_id: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.s3.bucket_policy", name=name ) return result if not bucket: result["comment"] = hub.tool.aws.comment_utils.missing_args_for_absent_comment( resource_type="aws.s3.bucket_policy", name=name, args=["bucket"] ) result["result"] = False return result if resource_id.split("-policy")[0] != bucket: result["comment"] += [f"Incorrect aws.s3.bucket name: '{bucket}'"] result["result"] = False return result # Check if bucket exists. ret = await hub.exec.boto3.client.s3.head_bucket(ctx, Bucket=bucket) if not ret["result"]: if BUCKET_NOT_EXISTS_ERROR_MESSAGE in ret["comment"]: result["comment"] += [f"aws.s3.bucket with name: '{bucket}' does not exist"] else: result["comment"] = ret["comment"] result["result"] = False return result before = await hub.exec.boto3.client.s3.get_bucket_policy( ctx, Bucket=bucket, ExpectedBucketOwner=expected_bucket_owner ) if before and before["result"]: result[ "old_state" ] = hub.tool.aws.s3.conversion_utils.convert_raw_bucket_policy_to_present( raw_resource=before["ret"], bucket=bucket, name=name ) if ctx.get("test", False): result["comment"] += [f"Would delete aws.s3.bucket_policy '{name}'"] return result ret = await hub.exec.boto3.client.s3.delete_bucket_policy( ctx, Bucket=bucket, ExpectedBucketOwner=expected_bucket_owner ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] += [f"Deleted policy '{name}' for bucket '{bucket}'"] return result else: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.s3.bucket_policy", name=name ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Obtain S3 bucket policy for each bucket under the given context for any user. Returns: dict[str, Any] Examples: .. code-block:: bash $ idem describe aws.s3.bucket_policy """ result = {} # To describe the policy of all the buckets, we first need to list all the buckets, then get the # policy of each bucket ret = await hub.exec.boto3.client.s3.list_buckets(ctx) if not ret["result"]: hub.log.warning(f"Could not describe S3 buckets {ret['comment']}") return {} for bucket in ret["ret"]["Buckets"]: bucket_name = bucket.get("Name") # get policy for each bucket bucket_policy = await hub.exec.boto3.client.s3.get_bucket_policy( ctx, Bucket=bucket_name ) if bucket_policy["result"]: bucket_policy_name = f"{bucket_name}-policy" # translate policy to a common format policy_translated = ( hub.tool.aws.s3.conversion_utils.convert_raw_bucket_policy_to_present( raw_resource=bucket_policy["ret"], bucket=bucket_name, name=bucket_policy_name, ) ) result[bucket_policy_name] = { "aws.s3.bucket_policy.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in policy_translated.items() ] } return result