"""State module for managing Amazon KMS Keys."""
import copy
from typing import Any
from typing import Dict
from typing import List
__contracts__ = ["resource"]
[docs]async def present(
hub,
ctx,
name: str,
resource_id: str = None,
description: str = "",
key_usage: str = "ENCRYPT_DECRYPT",
key_spec: str = "SYMMETRIC_DEFAULT",
key_state: str = "Enabled",
origin: str = "AWS_KMS",
multi_region: bool = False,
policy: str = None,
bypass_policy_lockout_safety_check: bool = False,
enable_key_rotation: bool = False,
tags: List[Dict[str, Any]] or Dict[str, Any] = None,
timeout: Dict = None,
) -> Dict[str, Any]:
"""Creates an AWS KMS key.
Update limitations:
* ``policy`` can be updated, but cannot be cleared once set.
* ``multi_region``, ``key_usage``, and ``key_spec`` cannot be updated.
* ``enable_key_rotation`` cannot be enabled on asymmetric KMS keys.
Args:
name(str):
An Idem name of the resource.
resource_id(str, Optional):
The ID or ARN of the key in Amazon Web Services.
description(str, Optional):
A description of the KMS key.
key_usage(str, Optional):
Determines the cryptographic operations for which you can use the KMS key.
The default value is ``ENCRYPT_DECRYPT``.
Valid values are ``ENCRYPT_DECRYPT``, ``GENERATE_VERIFY_MAC``, ``SIGN_VERIFY``.
key_spec(str, Optional):
Specifies the type of KMS key to create. The default value is ``SYMMETRIC_DEFAULT``.
Valid values are ``SYMMETRIC_DEFAULT``, ``HMAC_224``, ``HMAC_256``, ``HMAC_384``, ``HMAC_512``,
``RSA_2048``, ``RSA_3072``, ``RSA_4096``, ``ECC_NIST_P256``, ``ECC_NIST_P384``, ``ECC_NIST_P521``,
``ECC_SECG_P256K1``, ``SM2``.
key_state(str, Optional):
Whether the key is enabled or not. The default value is ``Enabled``.
Valid values are ``Enabled``, ``Disabled``.
origin(str, Optional):
The source of the key material for the KMS key. The default value is ``AWS_KMS``.
Valid values are ``AWS_KMS``, ``EXTERNAL``, ``AWS_CLOUDHSM``.
multi_region(bool, Optional):
Creates a multi-Region primary key that you can replicate into other Amazon Web Services Regions.
Default value is ``False``.
policy(str, Optional):
The key policy to attach to the KMS key. If you do not specify a key policy,
KMS attaches a default key policy to the KMS key.
bypass_policy_lockout_safety_check(bool, Optional):
A flag to indicate whether to bypass the key policy lockout safety check. Default value is ``False``.
.. warning::
Setting this value to ``True`` increases the risk that the KMS key becomes unmanageable.
Do not set this value to ``True`` indiscriminately.
enable_key_rotation(bool, Optional):
Whether to enable or disable automatic rotation of the key material of the specified symmetric
encryption KMS key. Default value is ``False``.
tags(dict or list, Optional):
Dict in the format of ``{tag-key: tag-value}`` or List of tags in the format of
``[{"TagKey": tag-key, "TagValue": tag-value}]`` to associate with the key.
To use this parameter, you must have ``kms:TagResource`` permission in an IAM policy.
* TagKey (*str*):
The key of the tag.
* TagValue (*str*):
The value of the tag.
timeout(dict, Optional):
Timeout configuration for update of AWS KMS key.
* update (*dict, Optional*):
Timeout configuration when updating a KMS key.
* delay (*int, Optional*):
The amount of time in seconds to wait between attempts. Default value is ``2``.
* max_attempts (*int, Optional*):
Max attempts of waiting for change. Default value is ``120``.
Request Syntax:
.. code-block:: sls
[idem_test_aws_kms_key]:
aws.kms.key.present:
- name: 'string'
- resource_id: 'string'
- description: 'string'
- key_usage: 'ENCRYPT_DECRYPT|GENERATE_VERIFY_MAC|SIGN_VERIFY'
- key_spec: 'SYMMETRIC_DEFAULT|HMAC_224|HMAC_256|HMAC_384|HMAC_512|RSA_2048|RSA_3072|RSA_4096|ECC_NIST_P256|
ECC_NIST_P384|ECC_NIST_P521|ECC_SECG_P256K1|SM2'
- key_state: 'Enabled|Disabled'
- origin: 'AWS_KMS|EXTERNAL|AWS_CLOUDHSM'
- multi_region: True|False
- policy: 'string'
- bypass_policy_lockout_safety_check: True|False
- enable_key_rotation: True|False
- tags:
- TagKey: 'string'
TagValue: 'string'
- timeout:
update:
delay: int
max_attemps: int
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
idem_test_aws_kms_key:
aws.kms.key.present:
- name: idem_test_kms_key
- description: 'AWS KMS KEY'
- key_usage: 'ENCRYPT_DECRYPT'
- key_spec: 'SYMMETRIC_DEFAULT'
- key_state: 'Enabled'
- origin: 'AWS_KMS'
- multi_region: False
- policy:
Version: '2012-10-17'
Statement:
- Sid: 'EnableIAMUserPermissions'
Effect: 'Allow'
Principal:
AWS: 'arn:aws:iam::111122223333:root'
Action: ['kms:*']
Resource: '*'
- bypass_policy_lockout_safety_check: False
- enable_key_rotation: False
- tags:
- TagKey: provider
TagValue: idem
"""
result = dict(comment=[], old_state=None, new_state=None, name=name, result=True)
plan_state = {}
updated_vals = {}
if isinstance(tags, List):
tags = hub.tool.aws.tag_utils.convert_tag_list_to_dict_tagkey(tags)
policy = hub.tool.aws.state_comparison_utils.standardise_json(policy)
if resource_id:
before = await hub.exec.aws.kms.key.get(ctx, name=name, resource_id=resource_id)
if not before["result"] or not before["ret"]:
result["result"] = False
result["comment"] = before["comment"]
return result
result["comment"] = hub.tool.aws.comment_utils.already_exists_comment(
resource_type="aws.kms.key", name=name
)
result["old_state"] = copy.deepcopy(before["ret"])
# Used for 'test'
plan_state = copy.deepcopy(result["old_state"])
# Update key tags if tags are specified
if tags is not None and tags != result["old_state"].get("tags"):
hub.log.debug(f"aws.kms.key '{name}' tags update")
# For tag operations key_id is used and not resource_id
update_ret = await hub.tool.aws.kms.tag.update_tags(
ctx=ctx,
key_id=resource_id,
old_tags=result["old_state"].get("tags", []),
new_tags=tags,
)
result["result"] = update_ret["result"]
result["comment"] += update_ret["comment"]
if not result["result"]:
return result
plan_state["tags"] = update_ret["ret"]
updated_vals["tags"] = tags
# Enable/Disable key
# No updates for key_state "PendingDeletion", which means the key is scheduled to be deleted.
old_state = result["old_state"].get("key_state", "")
if key_state != old_state:
if ctx.get("test", False):
result["comment"] += [f"Would update state on aws.kms.key '{name}'."]
plan_state["key_state"] = key_state
else:
update_ret = None
if old_state == "Enabled" and key_state == "Disabled":
update_ret = await hub.exec.boto3.client.kms.disable_key(
ctx, KeyId=result["old_state"]["resource_id"]
)
elif old_state == "Disabled" and key_state == "Enabled":
update_ret = await hub.exec.boto3.client.kms.enable_key(
ctx, KeyId=result["old_state"]["resource_id"]
)
if update_ret:
hub.log.debug(
f"Updated the state of aws.kms.key '{name}' to '{key_state}'."
)
result["comment"] += [
f"Updated aws.kms.key '{name}' state to '{key_state}'."
]
updated_vals["key_state"] = key_state
else:
hub.log.warning(
f"Failed to update the state of aws.kms.key '{name}' to '{key_state}'. {update_ret['comment']} "
)
result["result"] = update_ret["result"]
result["comment"] += update_ret["comment"]
return result
# Update policy and/or bypass flag if needed and if policy is set
update_policy = (
policy
and not hub.tool.aws.state_comparison_utils.are_policies_equal(
result["old_state"].get("policy"), policy
)
)
update_bypass_lockout = bypass_policy_lockout_safety_check != result[
"old_state"
].get("bypass_policy_lockout_safety_check", False)
if policy and (update_policy or update_bypass_lockout):
if ctx.get("test", False):
if update_policy:
result["comment"] += [
f"Would update policy of aws.kms.key '{name}'."
]
if update_bypass_lockout:
result["comment"] += [
f"Would update bypass_policy_lockout_safety_check of aws.kms.key '{name}'."
]
plan_state["policy"] = policy
plan_state[
"bypass_policy_lockout_safety_check"
] = bypass_policy_lockout_safety_check
else:
update_ret = await hub.exec.boto3.client.kms.put_key_policy(
ctx,
KeyId=result["old_state"]["resource_id"],
Policy=policy,
PolicyName="default",
BypassPolicyLockoutSafetyCheck=bypass_policy_lockout_safety_check,
)
if update_ret["result"]:
if update_policy:
result["comment"] += [f"Updated aws.kms.key '{name}' policy."]
updated_vals["policy"] = policy
if update_bypass_lockout:
result["comment"] += [
f"Updated aws.kms.key '{name}' bypass_policy_lockout_safety_check."
]
updated_vals[
"bypass_policy_lockout_safety_check"
] = bypass_policy_lockout_safety_check
else:
hub.log.warning(
f"Failed to update the policy and/or bypass_policy_lockout_safety_check of "
f"aws.kms.key '{name}': {update_ret['comment']}"
)
result["result"] = update_ret["result"]
result["comment"] += update_ret["comment"]
return result
# Update description if needed
if description and description != result["old_state"].get("description"):
if ctx.get("test", False):
result["comment"] += [
f"Would update description of aws.kms.key '{name}'."
]
plan_state["description"] = description
else:
update_ret = await hub.exec.boto3.client.kms.update_key_description(
ctx,
KeyId=result["old_state"]["resource_id"],
Description=description,
)
if update_ret["result"]:
result["comment"] += [f"Updated aws.kms.key '{name}' description."]
updated_vals["description"] = description
else:
hub.log.warning(
f"Failed to update the description of aws.kms.key '{name}': {update_ret['comment']}"
)
result["result"] = update_ret["result"]
result["comment"] += update_ret["comment"]
return result
else:
try:
if ctx.get("test", False):
result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state(
enforced_state={},
desired_state={
"name": name,
"resource_id": f"key-{name}-resource_id",
"arn": f"key-{name}-arn",
"description": description,
"key_usage": key_usage,
"key_spec": key_spec,
"origin": origin,
"multi_region": multi_region,
"policy": policy,
"bypass_policy_lockout_safety_check": bypass_policy_lockout_safety_check,
"enable_key_rotation": enable_key_rotation,
"tags": tags,
},
)
result["comment"] += [f"Would create aws.kms.key {name}"]
return result
ret = await hub.exec.boto3.client.kms.create_key(
ctx,
Description=description,
KeyUsage=key_usage,
KeySpec=key_spec,
Origin=origin,
MultiRegion=multi_region,
Policy=policy,
BypassPolicyLockoutSafetyCheck=bypass_policy_lockout_safety_check,
Tags=hub.tool.aws.tag_utils.convert_tag_dict_to_list_tagkey(tags)
if tags
else None,
)
result["result"] = ret["result"]
if not result["result"]:
result["comment"] = ret["comment"]
return result
resource_id = ret["ret"]["KeyMetadata"]["KeyId"]
result["comment"] += [
f"Created aws.kms.key '{name}' with description '{description}'."
]
except hub.tool.boto3.exception.ClientError as e:
result["comment"] = f"{e.__class__.__name__}: {e}"
result["result"] = False
# Set key rotation:
# Enable it on a newly created key or Enable/Disable on an existing key,
# only if the key_state is Enabled
if (enable_key_rotation and result.get("old_state") is None) or (
result.get("old_state")
and (
key_state == "Enabled"
or (key_state is None and result["old_state"].get("key_state") == "Enabled")
)
and enable_key_rotation != result["old_state"].get("enable_key_rotation", False)
):
update_ret = await hub.tool.aws.kms.key.set_key_rotation(
ctx=ctx,
resource_id=resource_id,
enable_key_rotation=enable_key_rotation,
)
plan_state["enable_kms_rotation"] = enable_key_rotation
result["comment"] += update_ret["comment"]
if not update_ret["result"]:
result["result"] = False
return result
if ctx.get("test", False):
result["new_state"] = plan_state
else:
if updated_vals:
wait_updates_res = await hub.tool.aws.kms.key.wait_for_updates(
ctx,
timeout=timeout.get("update") if timeout else None,
resource_id=resource_id,
updates=updated_vals,
)
if wait_updates_res["result"] is False:
# Do not fail just add to comments
result["comment"] += wait_updates_res["comment"]
result["new_state"] = wait_updates_res["ret"]
else:
after = await hub.exec.aws.kms.key.get(
ctx, name=name, resource_id=resource_id
)
if not after["result"]:
result["result"] = False
result["comment"] = after["comment"]
return result
result["new_state"] = copy.deepcopy(after["ret"])
return result
[docs]async def absent(
hub, ctx, name: str, resource_id: str = None, pending_window_in_days: int = 7
) -> Dict[str, Any]:
"""Deletes an AWS KMS key.
Key cannot be immediately deleted but can be scheduled to be deleted.
Once the key is set to be deleted in ``pending_window_in_days`` a deletion date is set on the key and it cannot
be modified. So deleting again with a different ``pending_window_in_days`` is ignored. Also key can be disabled
using the "present" function with ``key_state: 'Disabled'``.
Args:
name(str):
An Idem name of the resource.
resource_id(str, Optional):
The ID or ARN of the key in Amazon Web Services.
.. warning::
Idem automatically considers this resource being absent if this field is not specified.
pending_window_in_days(int, Optional):
The waiting period, specified in number of days. After the waiting period ends, KMS deletes the KMS key.
Default value is ``7``.
Returns:
Dict[str, Any]
Request Syntax:
.. code-block:: sls
[idem_test_aws_kms_key]:
aws.kms.key.absent:
- name: 'string'
- resource_id: 'string'
- pending_window_in_days: 'int'
Examples:
.. code-block:: sls
idem_test_aws_kms_key:
aws.kms.key.absent:
- name: idem_test_kms_key
- resource_id: 1234abcd-12ab-34cd-56ef-1234567890ab
- pending_window_in_days: 2
"""
result = dict(comment=[], old_state=None, new_state=None, name=name, result=True)
if not resource_id:
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.kms.key", name=name
)
return result
before = await hub.exec.aws.kms.key.get(ctx, name=name, resource_id=resource_id)
if not before:
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.kms.key", name=name
)
return result
else:
result["old_state"] = copy.deepcopy(before["ret"])
if before["ret"].get("deletion_date", None):
result[
"comment"
] = f"aws.kms.key '{resource_id}' already scheduled to be deleted in '{pending_window_in_days}' days"
return result
if ctx.get("test", False):
result[
"comment"
] = f"Would schedule deletion of aws.kms.key '{resource_id}' in {pending_window_in_days} days"
return result
try:
# Minimum deletion schedule 7 days (from 7 - 30)
ret = await hub.exec.boto3.client.kms.schedule_key_deletion(
ctx,
KeyId=resource_id,
PendingWindowInDays=pending_window_in_days,
)
result["result"] = ret["result"]
if not result["result"]:
result["comment"] = ret["comment"]
result["result"] = False
return result
result[
"comment"
] = f"aws.kms.key '{resource_id}' is scheduled for deletion in {pending_window_in_days} days"
except hub.tool.boto3.exception.ClientError as e:
result["comment"] += [f"{e.__class__.__name__}: {e}"]
try:
after = await hub.exec.aws.kms.key.get(
ctx, name=name, resource_id=(resource_id if resource_id else name)
)
if not after["result"]:
result["result"] = False
result["comment"] = after["comment"]
return result
result["new_state"] = copy.deepcopy(after["ret"])
except Exception as e:
result["comment"] += [str(e)]
result["result"] = False
return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]:
"""Describes AWS KMS keys in a way that can be recreated/managed with the corresponding "present" function.
Returns:
Dict[str, Dict[str, Any]]
Examples:
.. code-block:: bash
$ idem describe aws.kms.key
"""
result = {}
ret = await hub.exec.boto3.client.kms.list_keys(ctx)
if not ret["result"]:
hub.log.warning(f"Could not describe aws.kms.keys {ret['comment']}")
return {}
for key in ret["ret"]["Keys"]:
key_details = await hub.exec.boto3.client.kms.describe_key(
ctx, KeyId=key.get("KeyId", None)
)
if key_details.get("result") is False:
hub.log.debug(
f"Failed to describe key with Id {key.get('KeyId')}: {key_details.get('comment')}"
)
continue
try:
translated_resource = await hub.tool.aws.kms.conversion_utils.convert_raw_key_to_present_async(
ctx, raw_resource=key_details["ret"]["KeyMetadata"]
)
except Exception as e:
hub.log.debug(f"Failed to convert key with Id {key.get('KeyId')}: {e}")
continue
result[translated_resource["resource_id"]] = {
"aws.kms.key.present": [
{parameter_key: parameter_value}
for parameter_key, parameter_value in translated_resource.items()
]
}
return result