import copy
from typing import Any
from typing import Dict
__contracts__ = ["resource"]
[docs]async def present(
hub,
ctx,
name: str,
user_name: str,
resource_id: str = None,
status: str = "Active",
pgp_key: str = None,
secret_access_key: str = None,
) -> Dict[str, Any]:
"""
Ensures an AWS key has the assigned status.
This will create a new access key for a user if no access_key_id is passed for this state, either via
access_key_id or resource_id.
If a new access key is created, the secret access key will also be returned. This can optionally be encrypted
with a base 64 encoded PGP public key.
Args:
name(str): An Idem name describing the resource.
user_name(str): AWS IAM user name that the key belongs to.
resource_id(str, Optional): AWS IAM access key ID.
status(str, Optional): "Active" or "Inactive"; Active keys are valid for API calls. Defaults to "Active".
pgp_key(str, Optional): A base 64 encode PGP public key, used to encrypt the secret access key if a new key is created.
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
name_describing_key:
aws.iam.access_key.present:
- user_name: aws_user
- status: Active
"""
# secret_access_key is used internally to make sure the secret_access_key isn't lost after the 1st present run.
# This parameter should not be inputted on sls and hence it is not added to the function description.
result = {
"name": name,
"old_state": None,
"new_state": None,
"comment": [],
"result": True,
}
# ----- look up key
if resource_id:
list_ret = await hub.exec.aws.iam.access_key.list(
ctx, access_key_id=resource_id, user_name=user_name
)
if not list_ret["result"]:
result["result"] = False
result["comment"] += [f"Error listing access keys"] + list(
list_ret["comment"]
)
return result
elif len(list_ret["ret"]) == 0:
# Return error if the specified access key id doesn't exist on AWS.
message = (
f"The specified access_key_id {resource_id} in aws.iam.key {name} does not exist, and keys "
"cannot be recreated by id."
)
result["comment"] += [message] + list_ret["comment"]
result["result"] = False
return result
before = list_ret["ret"][0]
if secret_access_key:
before["secret_access_key"] = secret_access_key
result["old_state"] = before
if before["status"] == status:
result["comment"] += [f"No changes necessary for aws.iam.key {name}"]
result["new_state"] = copy.deepcopy(result["old_state"])
return result
else:
# only update is necessary
if not ctx.get("test", False):
update_ret = await hub.exec.aws.iam.access_key.update(
ctx, user_name=user_name, access_key_id=resource_id, status=status
)
if not update_ret["result"]:
result["comment"] += update_ret["comment"]
result["result"] = False
return result
result["comment"] += [f"Updated aws.iam.key {name} to {status}"]
else:
result["comment"] += [f"Would update aws.iam.key {name} to {status}"]
result["new_state"] = copy.deepcopy(result["old_state"])
result["new_state"]["status"] = status
else:
if ctx.get("test"):
result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state(
enforced_state={},
desired_state={
"name": name,
"user_name": user_name,
"secret_access_key": "SECRET_ACCESS_KEY",
"status": status if status else "Active",
},
)
result["comment"] += hub.tool.aws.comment_utils.would_create_comment(
resource_type="aws.iam.access_key", name=name
)
return result
create_ret = await hub.exec.aws.iam.access_key.create(ctx, user_name, pgp_key)
if not create_ret["result"]:
result["result"] = False
result["comment"] += create_ret["comment"]
return result
access_key_id = create_ret["ret"]["access_key_id"]
result[
"new_state"
] = hub.tool.aws.iam.conversion_utils.convert_access_key_to_present(
create_ret["ret"], name
)
result["comment"] += hub.tool.aws.comment_utils.create_comment(
resource_type="aws.iam.access_key", name=name
)
# Create can't set status. So update the status if the status should be "Inactive"
if status == "Inactive":
update_ret = await hub.exec.aws.iam.access_key.update(
ctx, user_name=user_name, access_key_id=access_key_id, status=status
)
if not update_ret["result"]:
result["comment"] += update_ret["comment"]
result["result"] = False
result["new_state"]["status"] = "Inactive"
return result
[docs]async def absent(
hub,
ctx,
name: str,
user_name: str = None,
resource_id: str = None,
) -> Dict[str, Any]:
"""
Ensure the specified access key does not exist.
Both user_name and resource_id must be passed in.
Args:
name(str): An Idem name describing the resource.
user_name(str, Optional): AWS IAM user name that the key belongs to.
resource_id(str, Optional): AWS IAM access key ID.
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
name_describing_key:
aws.iam.access_key.absent:
- user_name: value
- resource_id: value
"""
result = {
"name": name,
"old_state": None,
"new_state": None,
"comment": [],
"result": True,
}
if not resource_id:
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.iam.access_key", name=name
)
return result
if not user_name:
result["comment"] = hub.tool.aws.comment_utils.missing_args_for_absent_comment(
resource_type="aws.iam.access_key", name=name, args=["user_name"]
)
result["result"] = False
return result
# ------ look up key
list_ret = await hub.exec.aws.iam.access_key.list(
ctx, access_key_id=resource_id, user_name=user_name
)
result["comment"] = list_ret["comment"]
if not list_ret["result"]:
result["result"] = False
return result
if len(list_ret["ret"]) == 0:
result["comment"] += hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.iam.access_key", name=name
)
return result
access_key = list_ret["ret"][0]
result["old_state"] = access_key
# ------ delete key
# delete logic is handled inside exec's delete, including ctx["test"]
if ctx.get("test"):
result["comment"] += hub.tool.aws.comment_utils.would_delete_comment(
resource_type="aws.iam.access_key", name=name
)
return result
delete_ret = await hub.exec.aws.iam.access_key.delete(
ctx, access_key_id=resource_id, user_name=user_name
)
result["result"] = delete_ret["result"]
if not result["result"]:
result["comment"] += delete_ret["comment"]
else:
result["comment"] = hub.tool.aws.comment_utils.delete_comment(
resource_type="aws.iam.access_key", name=name
)
return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]:
"""
Describe access keys and their current status in a way that can be managed via the "present" function.
We describe all access keys for all users the logged in user can list.
Returns:
Dict[str, Any]
Examples:
.. code-block:: bash
$ idem describe aws.iam.access_key
"""
users_list = await hub.exec.aws.iam.user.list(ctx)
if not users_list["result"]:
hub.log.warning(f"Could not describe keys: {users_list['comment']}")
return {}
users = [u["user_name"] for u in users_list["ret"]]
result = {}
for user in users:
result.update(await _describe_one_user(hub, ctx, user))
# The above is pretty slow, if/when we add random exponential backoff on
# failures, switch to this code:
# multi_result = await asyncio.gather(
# *[_describe_one_user(hub, ctx, u["user_name"]) for u in users]
# )
# for r in multi_result:
# result.update(r)
return result
async def _describe_one_user(hub, ctx, user_name: str) -> Dict[str, Dict[str, Any]]:
keys = await hub.exec.aws.iam.access_key.list(ctx, user_name=user_name)
if not keys["result"]:
hub.log.debug(
f"Could not list aws.iam.access_key for user {user_name}: {keys['comment']}"
)
return {}
result = {}
for access_key in keys["ret"]:
# iterate every loop to give the user another indication if keys are skipped
if "resource_id" not in access_key or "status" not in access_key:
# All values are "optionally" returned from AWS but managing a key
# without the id and status would be impossible.
hub.log.debug(
f"Can not describe an aws key without an id and status {access_key}"
)
continue
idem_declaration_id = f"iam-access-key-{access_key['name']}"
result[idem_declaration_id] = {
"aws.iam.access_key.present": [{k: v} for k, v in access_key.items()]
}
return result