Source code for idem_aws.states.aws.iam.role_policy

"""State module for managing IAM Role Policies."""
import asyncio
import copy
import re
from typing import Any
from typing import Dict

__contracts__ = ["resource"]

TREQ = {
    "absent": {
        "require": [
            "aws.iam.role_policy_attachment.absent",
        ],
    },
    "present": {
        "require": ["aws.iam.role.present"],
    },
}


[docs]async def present( hub, ctx, name: str, role_name: str, policy_document: Dict or str, resource_id: str = None, ) -> Dict[str, Any]: """Adds or updates an inline policy document that is embedded in the specified IAM role. When you embed an inline policy in a role, the inline policy is used as part of the role's access (permissions) policy. The role's trust policy is created at the same time as the role, using aws.iam.role.present. A role can also have a managed policy attached to it. To attach a managed policy to a role, use aws.iam.role_policy_attachment.present. To create a new managed policy, use aws.iam.policy.present. For information about policies, see Managed policies and inline policies in the IAM User Guide. For information about the maximum number of inline policies that you can embed with a role, see IAM and STS quotas in the IAM User Guide. Args: name(str): The name of the AWS IAM policy. role_name(str): The name of the role to associate the policy with. This parameter allows (through its regex pattern) a string of characters consisting of upper and lowercase alphanumeric characters with no spaces. You can also include any of the following characters: _+=,.@- policy_document(dict or str): The policy document. You must provide policies in JSON format in IAM. However, for CloudFormation templates formatted in YAML, you can provide the policy in JSON or YAML format. CloudFormation always converts a YAML policy to JSON format before submitting it to IAM. resource_id(str, Optional): The role name and policy name with a separator '-'. Format: [role_name]-[policy_name] Request Syntax: .. code-block:: sls [iam-role-policy-name]: aws.iam.role_policy.present: - resource_id: 'string' - role_name: 'string' - policy_document: 'dict or string' Returns: Dict[str, Any] Examples: .. code-block:: sls idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037: aws.iam.role_policy.present: - role_name: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4 - resource_id: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4-idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037 - policy_document: '{"Version": "2012-10-17", "Statement": {"Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::example_bucket"}}' """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) policy_name = name if resource_id: if not re.search(f"^({role_name})-({policy_name})$", resource_id): result[ "comment" ] = f"Incorrect aws.iam.role_policy resource_id: {resource_id}. Expected id {role_name}-{policy_name}" result["result"] = False return result resource = await hub.tool.boto3.resource.create( ctx, "iam", "RolePolicy", role_name=role_name, name=policy_name, ) before = await hub.tool.boto3.resource.describe(resource) # Standardise on the json format policy_document = hub.tool.aws.state_comparison_utils.standardise_json( policy_document ) if before: result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present(before) result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"] = [ f"aws.iam.role_policy '{policy_name}' for role '{role_name}' already exists" ] if ( policy_document and not hub.tool.aws.state_comparison_utils.are_policies_equal( result["old_state"]["policy_document"], policy_document ) ): if ctx.get("test", False): result["comment"] += [ "Would update aws.iam.role_policy policy_document", ] result["new_state"] = copy.deepcopy(result["old_state"]) result["new_state"]["policy_document"] = policy_document return result update_ret = await hub.exec.boto3.client.iam.put_role_policy( ctx, RoleName=role_name, PolicyName=name, PolicyDocument=policy_document, ) if update_ret["result"]: result["comment"] += ["Updated aws.iam.role_policy policy_document"] else: result["comment"] += update_ret["comment"] result["result"] = update_ret["result"] else: try: if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "role_name": role_name, "resource_id": f"{role_name}-{name}", "policy_document": policy_document, }, ) result["comment"] += [ f"Would create aws.iam.role_policy '{policy_name}' for role '{role_name}'" ] return result ret = await hub.exec.boto3.client.iam.put_role_policy( ctx, RoleName=role_name, PolicyName=name, PolicyDocument=policy_document, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] += [f"Created '{policy_name}' for role '{role_name}'"] except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] result["result"] = False try: after = await hub.tool.boto3.resource.describe(resource) result[ "new_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present(after) except Exception as e: result["comment"] += [str(e)] result["result"] = False return result
[docs]async def absent( hub, ctx, name: str, role_name: str = None, resource_id: str = None ) -> Dict[str, Any]: """Deletes the specified inline policy that is embedded in the specified IAM role. A role can also have managed policies attached to it. To detach a managed policy from a role, use aws.iam.role_policy_attachment.absent. For more information about policies, refer to Managed policies and inline policies in the IAM User Guide. Args: name(str): The name of the AWS IAM policy. role_name(str, Optional): The name of the AWS IAM role. Idem automatically considers this resource being absent if this field is not specified. resource_id(str, Optional): The role name and policy name with a separator '-'. Format: [role_name]-[policy_name]. If not specified, Idem will use "name" parameter to identify the role policy on AWS. Request Syntax: .. code-block:: sls [role_policy-resource-id]: aws.iam.role_policy.absent: - name: "string" - role_name: "string" - resource_id: "string" Returns: Dict[str, Any] Examples: .. code-block:: sls idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037: aws.iam.role_policy.absent: - name: idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037 - role_name: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4 - resource_id: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4-idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037 """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not role_name: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.iam.role_policy", name=name ) return result policy_name = name if resource_id: if not re.search(f"^({role_name})-({policy_name})$", resource_id): result[ "comment" ] = f"Incorrect aws.iam.role_policy resource_id: {resource_id}. Expected id {role_name}-{policy_name}" result["result"] = False return result resource = await hub.tool.boto3.resource.create( ctx, "iam", "RolePolicy", role_name=role_name, name=policy_name, ) before = await hub.tool.boto3.resource.describe(resource) if not before: result["comment"] += [f"'{policy_name}' for role '{role_name}' already absent"] else: try: result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present( before ) if ctx.get("test", False): result["comment"] += [f"Would delete aws.iam.role_policy {resource_id}"] return result ret = await hub.exec.boto3.client.iam.delete_role_policy( ctx, RoleName=role_name, PolicyName=policy_name, ) result["result"] = ret["result"] if not result["result"]: result["comment"] += [ret["comment"]] result["result"] = False return result result["comment"] += [f"Deleted '{policy_name}' for role '{role_name}'"] except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Describe the resource in a way that can be recreated/managed with the corresponding "present" function Lists the names of the inline policies that are embedded in of all IAM roles. An IAM role can also have managed policies attached to it. These managed polices are not listed with this describe function. To list the managed policies that are attached to a role, use aws.iam.role_policy_attachment.describe. If there are no inline policies embedded with the specified role, the operation returns an empty dict. Returns: Dict[str, Any] Examples:` .. code-block:: bash $ idem describe aws.iam.role_policy """ result = {} # To describe all the role policies of all the roles, we first need to list all the roles, then get all the # policy names of each role, then get each policy ret_roles = await hub.exec.boto3.client.iam.list_roles(ctx) if not ret_roles["result"]: hub.log.warning(f"Could not describe role {ret_roles['comment']}") return {} for role in ret_roles["ret"]["Roles"]: role_name = role.get("RoleName") # List all role policy names of each role try: ret_role_policies = await hub.exec.boto3.client.iam.list_role_policies( ctx=ctx, RoleName=role_name ) if not ret_role_policies["result"]: hub.log.warning( f"Failed on fetching role policies with role {role_name} " f"with error {ret_role_policies['comment']}. Describe will skip this role and continue." ) continue except hub.tool.boto3.exception.ClientError as e: hub.log.warning( f"Failed on fetching role policies with role {role_name} with error" f" {e.__class__.__name__}: {e} Describe will skip this role and continue." ) continue if ret_role_policies: if not ret_role_policies["result"]: hub.log.warning( f"Could not describe role_policy with role {role_name} with error " f"{ret_role_policies['comment']}. Describe will skip this role and continue." ) else: # Get each policy according to the role name and role policy name for get_role_policy in asyncio.as_completed( [ hub.exec.boto3.client.iam.get_role_policy( ctx=ctx, RoleName=role_name, PolicyName=role_policy_name ) for role_policy_name in ret_role_policies["ret"].get( "PolicyNames", list() ) ] ): ret_role_policy = await get_role_policy if not ret_role_policy["result"]: hub.log.warning( f"Could not get a role_policy with role {role_name} with error" f" {ret_role_policy['comment']} . Describe will skip this role policy and continue." ) else: resource = ret_role_policy["ret"] translated_resource = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present( resource ) resource_key = ( f"iam-role-policy-{translated_resource['resource_id']}" ) result[resource_key] = { "aws.iam.role_policy.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in translated_resource.items() ] } return result