"""State module for managing IAM Role Policies."""
import asyncio
import copy
import re
from typing import Any
from typing import Dict
__contracts__ = ["resource"]
TREQ = {
"absent": {
"require": [
"aws.iam.role_policy_attachment.absent",
],
},
"present": {
"require": ["aws.iam.role.present"],
},
}
[docs]async def present(
hub,
ctx,
name: str,
role_name: str,
policy_document: Dict or str,
resource_id: str = None,
) -> Dict[str, Any]:
"""Adds or updates an inline policy document that is embedded in the specified IAM role.
When you embed an inline policy in a role, the inline policy is used as part of the role's access (permissions) policy.
The role's trust policy is created at the same time as the role, using aws.iam.role.present. A role can also have a
managed policy attached to it. To attach a managed policy to a role, use aws.iam.role_policy_attachment.present.
To create a new managed policy, use aws.iam.policy.present.
For information about policies, see Managed policies and inline policies in the IAM User Guide.
For information about the maximum number of inline policies that you can embed with a role, see IAM and STS quotas in
the IAM User Guide.
Args:
name(str):
The name of the AWS IAM policy.
role_name(str):
The name of the role to associate the policy with. This parameter allows (through its regex pattern)
a string of characters consisting of upper and lowercase alphanumeric characters with no spaces.
You can also include any of the following characters: _+=,.@-
policy_document(dict or str):
The policy document. You must provide policies in JSON format in IAM.
However, for CloudFormation templates formatted in YAML, you can provide the policy in JSON or YAML format.
CloudFormation always converts a YAML policy to JSON format before submitting it to IAM.
resource_id(str, Optional):
The role name and policy name with a separator '-'. Format: [role_name]-[policy_name]
Request Syntax:
.. code-block:: sls
[iam-role-policy-name]:
aws.iam.role_policy.present:
- resource_id: 'string'
- role_name: 'string'
- policy_document: 'dict or string'
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037:
aws.iam.role_policy.present:
- role_name: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4
- resource_id: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4-idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037
- policy_document: '{"Version": "2012-10-17", "Statement": {"Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::example_bucket"}}'
"""
result = dict(comment=[], old_state=None, new_state=None, name=name, result=True)
policy_name = name
if resource_id:
if not re.search(f"^({role_name})-({policy_name})$", resource_id):
result[
"comment"
] = f"Incorrect aws.iam.role_policy resource_id: {resource_id}. Expected id {role_name}-{policy_name}"
result["result"] = False
return result
resource = await hub.tool.boto3.resource.create(
ctx,
"iam",
"RolePolicy",
role_name=role_name,
name=policy_name,
)
before = await hub.tool.boto3.resource.describe(resource)
# Standardise on the json format
policy_document = hub.tool.aws.state_comparison_utils.standardise_json(
policy_document
)
if before:
result[
"old_state"
] = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present(before)
result["new_state"] = copy.deepcopy(result["old_state"])
result["comment"] = [
f"aws.iam.role_policy '{policy_name}' for role '{role_name}' already exists"
]
if (
policy_document
and not hub.tool.aws.state_comparison_utils.are_policies_equal(
result["old_state"]["policy_document"], policy_document
)
):
if ctx.get("test", False):
result["comment"] += [
"Would update aws.iam.role_policy policy_document",
]
result["new_state"] = copy.deepcopy(result["old_state"])
result["new_state"]["policy_document"] = policy_document
return result
update_ret = await hub.exec.boto3.client.iam.put_role_policy(
ctx,
RoleName=role_name,
PolicyName=name,
PolicyDocument=policy_document,
)
if update_ret["result"]:
result["comment"] += ["Updated aws.iam.role_policy policy_document"]
else:
result["comment"] += update_ret["comment"]
result["result"] = update_ret["result"]
else:
try:
if ctx.get("test", False):
result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state(
enforced_state={},
desired_state={
"name": name,
"role_name": role_name,
"resource_id": f"{role_name}-{name}",
"policy_document": policy_document,
},
)
result["comment"] += [
f"Would create aws.iam.role_policy '{policy_name}' for role '{role_name}'"
]
return result
ret = await hub.exec.boto3.client.iam.put_role_policy(
ctx,
RoleName=role_name,
PolicyName=name,
PolicyDocument=policy_document,
)
result["result"] = ret["result"]
if not result["result"]:
result["comment"] = ret["comment"]
return result
result["comment"] += [f"Created '{policy_name}' for role '{role_name}'"]
except hub.tool.boto3.exception.ClientError as e:
result["comment"] += [f"{e.__class__.__name__}: {e}"]
result["result"] = False
try:
after = await hub.tool.boto3.resource.describe(resource)
result[
"new_state"
] = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present(after)
except Exception as e:
result["comment"] += [str(e)]
result["result"] = False
return result
[docs]async def absent(
hub, ctx, name: str, role_name: str = None, resource_id: str = None
) -> Dict[str, Any]:
"""Deletes the specified inline policy that is embedded in the specified IAM role.
A role can also have managed policies attached to it. To detach a managed policy from a role, use
aws.iam.role_policy_attachment.absent. For more information about policies, refer to Managed policies and inline
policies in the IAM User Guide.
Args:
name(str):
The name of the AWS IAM policy.
role_name(str, Optional):
The name of the AWS IAM role. Idem automatically considers this resource being absent if this field is not
specified.
resource_id(str, Optional):
The role name and policy name with a separator '-'. Format: [role_name]-[policy_name]. If not specified,
Idem will use "name" parameter to identify the role policy on AWS.
Request Syntax:
.. code-block:: sls
[role_policy-resource-id]:
aws.iam.role_policy.absent:
- name: "string"
- role_name: "string"
- resource_id: "string"
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037:
aws.iam.role_policy.absent:
- name: idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037
- role_name: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4
- resource_id: idem-test-role-e9528a79-a327-4a83-9912-c9b90044f1e4-idem-test-role-policy-930323cb-91cf-42a2-ad9b-3f195c776037
"""
result = dict(comment=[], old_state=None, new_state=None, name=name, result=True)
if not role_name:
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.iam.role_policy", name=name
)
return result
policy_name = name
if resource_id:
if not re.search(f"^({role_name})-({policy_name})$", resource_id):
result[
"comment"
] = f"Incorrect aws.iam.role_policy resource_id: {resource_id}. Expected id {role_name}-{policy_name}"
result["result"] = False
return result
resource = await hub.tool.boto3.resource.create(
ctx,
"iam",
"RolePolicy",
role_name=role_name,
name=policy_name,
)
before = await hub.tool.boto3.resource.describe(resource)
if not before:
result["comment"] += [f"'{policy_name}' for role '{role_name}' already absent"]
else:
try:
result[
"old_state"
] = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present(
before
)
if ctx.get("test", False):
result["comment"] += [f"Would delete aws.iam.role_policy {resource_id}"]
return result
ret = await hub.exec.boto3.client.iam.delete_role_policy(
ctx,
RoleName=role_name,
PolicyName=policy_name,
)
result["result"] = ret["result"]
if not result["result"]:
result["comment"] += [ret["comment"]]
result["result"] = False
return result
result["comment"] += [f"Deleted '{policy_name}' for role '{role_name}'"]
except hub.tool.boto3.exception.ClientError as e:
result["comment"] += [f"{e.__class__.__name__}: {e}"]
return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]:
"""Describe the resource in a way that can be recreated/managed with the corresponding "present" function
Lists the names of the inline policies that are embedded in of all IAM roles. An IAM role can also have managed policies
attached to it. These managed polices are not listed with this describe function. To list the managed policies that
are attached to a role, use aws.iam.role_policy_attachment.describe. If there are no inline policies embedded with
the specified role, the operation returns an empty dict.
Returns:
Dict[str, Any]
Examples:`
.. code-block:: bash
$ idem describe aws.iam.role_policy
"""
result = {}
# To describe all the role policies of all the roles, we first need to list all the roles, then get all the
# policy names of each role, then get each policy
ret_roles = await hub.exec.boto3.client.iam.list_roles(ctx)
if not ret_roles["result"]:
hub.log.warning(f"Could not describe role {ret_roles['comment']}")
return {}
for role in ret_roles["ret"]["Roles"]:
role_name = role.get("RoleName")
# List all role policy names of each role
try:
ret_role_policies = await hub.exec.boto3.client.iam.list_role_policies(
ctx=ctx, RoleName=role_name
)
if not ret_role_policies["result"]:
hub.log.warning(
f"Failed on fetching role policies with role {role_name} "
f"with error {ret_role_policies['comment']}. Describe will skip this role and continue."
)
continue
except hub.tool.boto3.exception.ClientError as e:
hub.log.warning(
f"Failed on fetching role policies with role {role_name} with error"
f" {e.__class__.__name__}: {e} Describe will skip this role and continue."
)
continue
if ret_role_policies:
if not ret_role_policies["result"]:
hub.log.warning(
f"Could not describe role_policy with role {role_name} with error "
f"{ret_role_policies['comment']}. Describe will skip this role and continue."
)
else:
# Get each policy according to the role name and role policy name
for get_role_policy in asyncio.as_completed(
[
hub.exec.boto3.client.iam.get_role_policy(
ctx=ctx, RoleName=role_name, PolicyName=role_policy_name
)
for role_policy_name in ret_role_policies["ret"].get(
"PolicyNames", list()
)
]
):
ret_role_policy = await get_role_policy
if not ret_role_policy["result"]:
hub.log.warning(
f"Could not get a role_policy with role {role_name} with error"
f" {ret_role_policy['comment']} . Describe will skip this role policy and continue."
)
else:
resource = ret_role_policy["ret"]
translated_resource = hub.tool.aws.iam.conversion_utils.convert_raw_role_policy_to_present(
resource
)
resource_key = (
f"iam-role-policy-{translated_resource['resource_id']}"
)
result[resource_key] = {
"aws.iam.role_policy.present": [
{parameter_key: parameter_value}
for parameter_key, parameter_value in translated_resource.items()
]
}
return result