Source code for idem_aws.states.aws.iam.user_policy

"""State module for managing IAM User Policies."""
import asyncio
import copy
import re
from typing import Any
from typing import Dict

__contracts__ = ["resource"]

TREQ = {
    "absent": {
        "require": [
            "aws.iam.user_policy_attachment.absent",
        ],
    },
    "present": {
        "require": ["aws.iam.user.present"],
    },
}


[docs]async def present( hub, ctx, name: str, user_name: str, policy_document: Dict or str, resource_id: str = None, ) -> Dict[str, Any]: """Adds or updates an inline policy document that is embedded in the specified IAM user. An IAM user can also have a managed policy attached to it. To attach a managed policy to a user, use AttachUserPolicy. To create a new managed policy, use CreatePolicy. For information about policies, see Managed policies and inline policies in the IAM User Guide. For information about the maximum number of inline policies that you can embed in a user, see IAM and STS quotas in the IAM User Guide. Because policy documents can be large, you should use POST rather than GET when calling PutUserPolicy. For general information about using the Query API with IAM, see Making query requests in the IAM User Guide. Args: name(str): The name of the AWS IAM policy. user_name(str): The UserPolicy's user_name identifier policy_document(dict): The policy document. IAM stores policies in JSON format. However, resources that were created using CloudFormation templates can be formatted in YAML. CloudFormation always converts a YAML policy to JSON format before submitting it to IAM. resource_id(str, Optional): The user name and policy name with a separator '-'. Format: [user_name]-[policy_name] Request Syntax: .. code-block:: sls [iam-user-policy-name]: aws.iam.user_policy.present: - resource_id: 'string' - name: 'string' - user_name: 'string' - policy_document: 'dict or string' Returns: Dict[str, Any] Examples: .. code-block:: sls idem-test-user-policy: aws.iam.user_policy.present: - resource_id: idem-test-user-idem-test-user-policy - name: idem-test-user-policy - user_name: idem-test-user - policy_document: '{"Version": "2012-10-17", "Statement": {"Effect": "Allow", "Action": ["cloudwatch:ListMetrics", "cloudwatch:GetMetricStatistics"], "Resource":"*"}}' """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) policy_name = name if resource_id: if not re.search(f"^({user_name})-({policy_name})$", resource_id): result[ "comment" ] = f"Incorrect aws.iam.user_policy resource_id: {resource_id}. Expected id {user_name}-{policy_name}" result["result"] = False return result resource = await hub.tool.boto3.resource.create( ctx, "iam", "UserPolicy", user_name=user_name, name=policy_name, ) before = await hub.tool.boto3.resource.describe(resource) # Standardise on the json format, user policy should not be sorted policy_document = hub.tool.aws.state_comparison_utils.standardise_json( policy_document, sort_keys=False ) if before: result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_policy_to_present(before) if not hub.tool.aws.state_comparison_utils.are_policies_equal( result["old_state"].get("policy_document"), policy_document ): if ctx.get("test", False): plan_state = copy.deepcopy(result["old_state"]) plan_state["policy_document"] = policy_document result["new_state"] = plan_state result["comment"] += [ f"Would update aws.iam.user_policy '{policy_name}' for user '{user_name}'" ] return result # call to update user-policy fret = await _update_user_policy( hub, ctx, user_name, policy_name, policy_document ) result["result"] = fret["result"] if not result["result"]: result["comment"] = fret["comment"] return result result["comment"] += [ f"Updated aws.iam.user_policy '{policy_name}' for user '{user_name}'" ] else: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"] += [ f"aws.iam.user_policy '{policy_name}' for user '{user_name}' already exists" ] return result else: try: if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": policy_name, "user_name": user_name, "policy_document": policy_document, }, ) result["comment"] += [ f"Would create aws.iam.user_policy '{policy_name}' for user '{user_name}'" ] return result # call to create user-policy fret = await _update_user_policy(hub, ctx, user_name, name, policy_document) result["result"] = fret["result"] if not result["result"]: result["comment"] = fret["comment"] return result result["comment"] += [ f"Created aws.iam.user_policy '{policy_name}' for user '{user_name}'" ] except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] result["result"] = False try: after = await hub.tool.boto3.resource.describe(resource) result[ "new_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_policy_to_present(after) except Exception as e: result["comment"] += [str(e)] result["result"] = False return result
[docs]async def absent( hub, ctx, name: str, user_name: str = None, resource_id: str = None ) -> Dict[str, Any]: """ Deletes the specified inline policy that is embedded in the specified IAM user. A user can also have managed policies attached to it. To detach a managed policy from a user, use DetachUserPolicy. For more information about policies, refer to Managed policies and inline policies in the IAM User Guide. Args: name(str): The name of the AWS IAM policy. user_name(str, Optional): The UserPolicy's user_name identifier. Idem automatically considers this resource being absent if this field is not specified. resource_id(str, Optional): The user name and policy name with a separator '-'. Format: [user_name]-[policy_name]. If not specified, Idem will use "name" parameter to identify the IAM policy on AWS. Request Syntax: .. code-block:: sls [iam-user-policy-name]: aws.iam.user_policy.present: - name: 'string' - resource_id: 'string' - user_name: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls idem-test-user-policy: aws.iam.user_policy.absent: - name: idem-test-user-policy - resource_id: idem-test-user-idem-test-user-policy - user_name: idem-test-user """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not user_name: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.iam.user_policy", name=name ) return result policy_name = name if resource_id: if not re.search(f"^({user_name})-({policy_name})$", resource_id): result[ "comment" ] = f"Incorrect aws.iam.user_policy resource_id: {resource_id}. Expected id {user_name}-{policy_name}" result["result"] = False return result resource = await hub.tool.boto3.resource.create( ctx, "iam", "UserPolicy", user_name=user_name, name=policy_name, ) before = await hub.tool.boto3.resource.describe(resource) if not before: result["comment"] += [ f"aws.iam.user_policy '{policy_name}' for user '{user_name}' already absent" ] return result else: result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_policy_to_present(before) if ctx.get("test", False): result["comment"] += [ f"Would delete aws.iam.user_policy '{policy_name}' for user '{user_name}'" ] return result ret = await hub.exec.boto3.client.iam.delete_user_policy( ctx, UserName=user_name, PolicyName=policy_name, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] result["result"] = False return result result["comment"] += [ f"Deleted aws.iam.user_policy '{policy_name}' for user '{user_name}'" ] return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Describe the resource in a way that can be recreated/managed with the corresponding "present" function Lists the names of the inline policies embedded in the specified IAM user. An IAM user can also have managed policies attached to it. To list the managed policies that are attached to a user, use ListAttachedUserPolicies. For more information about policies, see Managed policies and inline policies in the IAM User Guide. You can paginate the results using the MaxItems and Marker parameters. If there are no inline policies embedded with the specified user, the operation returns an empty list. Returns: Dict[str, Any] Examples: .. code-block:: bash $ idem describe aws_auto.iam.user_policy """ result = {} # Fetch all users and then policies for each of them users = await hub.exec.boto3.client.iam.list_users(ctx) if not users["result"]: hub.log.warning(f"Could not describe users {users['comment']}") return {} for user in users["ret"]["Users"]: user_name = user.get("UserName") # List all user policy names of each user try: user_policies = await hub.exec.boto3.client.iam.list_user_policies( ctx, UserName=user_name ) if not user_policies["result"]: hub.log.warning( f"Could not fetch user policies for user {user_name} " f"due to error {user_policies['comment']}. Skip this user and continue." ) continue except hub.tool.boto3.exception.ClientError as e: hub.log.warning( f"Could not fetch user policies for user {user_name} due to error" f" {e.__class__.__name__}: {e}. Skip this user and continue." ) continue if user_policies: if not user_policies["result"]: hub.log.warning( f"Could not describe user_policy for user {user_name} due to error " f"{user_policies['comment']}. Skip this user and continue." ) else: for user_policy in asyncio.as_completed( [ hub.exec.boto3.client.iam.get_user_policy( ctx=ctx, UserName=user_name, PolicyName=user_policy_name ) for user_policy_name in user_policies["ret"].get( "PolicyNames", [] ) ] ): ret_user_policy = await user_policy if not ret_user_policy["result"]: hub.log.warning( f"Could not get user_policy for user {user_name} due to error" f" {ret_user_policy['comment']} . Skip this user policy and continue." ) else: resource = ret_user_policy["ret"] resource_translated = hub.tool.aws.iam.conversion_utils.convert_raw_user_policy_to_present( resource ) resource_key = ( f"iam-user-policy-{resource_translated['resource_id']}" ) result[resource_key] = { "aws.iam.user_policy.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource_translated.items() ] } return result
async def _update_user_policy( hub, ctx, user_name: str, name: str, policy_document ) -> Dict[str, Any]: result = dict(comment=[], result=True) ret = await hub.exec.boto3.client.iam.put_user_policy( ctx, UserName=user_name, PolicyName=name, PolicyDocument=policy_document, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result