Source code for idem_aws.exec.aws.iam.access_key

from typing import Any
from typing import Dict

try:
    import pgpy

    HAS_LIBS = (True,)
except ImportError as e:
    HAS_LIBS = False, str(e)


def __virtual__(hub):
    return HAS_LIBS


__func_alias__ = {"list_": "list"}


[docs]async def list_( hub, ctx, user_name: str, access_key_id: str = None, status: str = None ) -> Dict[str, Any]: """ Returns a list of access keys for a user_name, optionally filtering by status and access_key_id. Args: user_name (str): The user_name to list keys for. access_key_id (str, Optional): match exact access key to return status (str, Optional): match "Active" or "Inactive" keys; "Active" means the key is valid for API calls. Returns: dict[str, Any]: Returns a list of access keys for the user Examples: Calling this exec module function from the cli .. code-block:: bash idem exec aws.iam.access_key.list user_name="a_user" Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.iam.access_key.list - kwargs: user_name: a_user """ # we manually filter status, so we must validate it if status not in [None, "Active", "Inactive"]: return { "result": False, "ret": None, "comment": [ f"Could not list access keys: '{status}' isn't a valid status", ], } ret = await hub.exec.boto3.client.iam.list_access_keys(ctx, UserName=user_name) if not ret["result"]: return { "result": False, "ret": None, "comment": [f"Could not list access keys: {ret['comment']}"], } access_keys = [] for access_key_raw in ret["ret"]["AccessKeyMetadata"]: tmp_access_key = ( hub.tool.aws.iam.conversion_utils.convert_raw_access_key_to_camel_case( access_key_raw ) ) if access_key_id and access_key_id != tmp_access_key.get("access_key_id"): continue if status and status != tmp_access_key.get("status"): continue idem_resource_name = f"{tmp_access_key.get('user_name', '')}-{tmp_access_key.get('access_key_id', '')}" access_key = hub.tool.aws.iam.conversion_utils.convert_access_key_to_present( tmp_access_key, idem_resource_name=idem_resource_name ) access_keys.append(access_key) result = {"result": True, "ret": access_keys, "comment": []} if not len(access_keys): if status and access_key_id: result["comment"] += [ f"List for {user_name} was successful, but no key matched {access_key_id} and {status}", ] elif access_key_id: result["comment"] += [ f"List for {user_name} was successful, but no key matched {access_key_id}", ] elif status: result["comment"] += [ f"List for {user_name} was successful, but no key matched {status}", ] else: result["comment"] += [ f"List for {user_name} was successful, but there were no keys.", ] return result
[docs]async def update( hub, ctx, user_name: str, access_key_id: str, status: str ) -> Dict[str, Any]: """ Updates the status of an existing access key. Does not execute if ctx["test"] is True. Args: user_name (str): Exact user that owns the key access_key_id (str): Exact access key to update status (str): "Active" or "Inactive", meaning it is valid for API calls Returns: dict[str, Any]: Status of the update Examples: Calling this exec module function from the cli .. code-block:: bash idem exec aws.iam.access_key.update user_name="a_user" access_key_id="ABCDEF0123456789ABCD" status="Active" Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.iam.access_key.update - kwargs: user_name: a_user access_key_id: ABCDEF0123456789ABCD status: Active """ result = {"result": True, "ret": None, "comment": []} # this should never happen and would result in unexpected behavior if (not user_name) or (not access_key_id) or (not status): result["result"] = False result["comment"] = ( "user_name, access_key_id and status all need to be supplied", ) return result ret = await hub.exec.boto3.client.iam.update_access_key( ctx, AccessKeyId=access_key_id, Status=status, UserName=user_name ) if not ret["result"]: result["result"] = False result["comment"] += [ f"Could not update access key {access_key_id} for {user_name}", ] result["comment"] += ret["comment"] else: result["comment"] += [ f"Access Key {access_key_id} for {user_name} set to '{status}'", ] return result
[docs]async def delete(hub, ctx, user_name: str, access_key_id: str) -> Dict[str, Any]: """ Deletes an existing access key, does not execute if ctx["test"] is set. "result" is True if delete succeeded or no such key or user exists. "result" is False if there is an error deleting the key Args: user_name (str): Exact user owning the key access_key_id (str): Exact access key to delete Returns: dict[str, Any]: Status of the deletion Examples: Calling this exec module function from the cli .. code-block:: bash idem exec aws.iam.access_key.delete user_name="a_user" access_key_id="ABCDEF0123456789ABCD" Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.iam.access_key.delete - kwargs: user_name: a_user access_key_id: ABCDEF0123456789ABCD """ result = {"result": True, "ret": None, "comment": []} # this should never happen and would result in unexpected behavior assert user_name and access_key_id ret = await hub.exec.boto3.client.iam.delete_access_key( ctx, AccessKeyId=access_key_id, UserName=user_name ) if not ret["result"]: if "NoSuchEntityException" in str(ret["comment"]): result["comment"] += [ f"Access key {access_key_id} for {user_name} does not exist" ] else: result["result"] = False result["comment"] += [ f"Could not delete access key '{access_key_id}'" ] + list(ret["comment"]) else: result["comment"] = [f"Access key '{access_key_id}' deleted"] return result
[docs]async def create(hub, ctx, user_name: str, pgp_key: str = None) -> Dict[str, Any]: """ Creates a new access key for the specified user. Args: user_name (str): Exact user the key will be created for. pgp_key (str, Optional): Base 64 encoded public pgp key with which we will encrypt the returned secret_access_key. Returns: dict[str, Any]: Access key ID, Secret access key, status Examples: Calling this exec module function from the cli .. code-block:: bash idem exec aws.iam.access_key.create user_name="a_user" pgp_key="-----BEGIN PGP PUBLIC KEY BLOCK-----0123456789ABCDEFxxxETC-----END PGP PUBLIC KEY BLOCK-----" Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.iam.access_key.create - kwargs: user_name: a_user pgp_key: -----BEGIN PGP PUBLIC KEY BLOCK----- 0123456789ABCDEFxxxETC -----END PGP PUBLIC KEY BLOCK----- """ result = {"result": True, "ret": None, "comment": []} ret = await hub.exec.boto3.client.iam.create_access_key(ctx, UserName=user_name) if not ret["result"]: result["result"] = False result["comment"] += [f"Could not create access key: {ret['comment']}"] else: access_key = ( hub.tool.aws.iam.conversion_utils.convert_raw_access_key_to_camel_case( ret["ret"]["AccessKey"] ) ) if pgp_key: decoded_pgp_key = hub.tool.aws.b64.decode(pgp_key) loaded_pgp_key, _ = pgpy.PGPKey.from_blob(decoded_pgp_key) secret_access_key = pgpy.PGPMessage.new( access_key["secret_access_key"], compression=pgpy.constants.CompressionAlgorithm.Uncompressed, ) encrypted_secret_access_key = loaded_pgp_key.encrypt(secret_access_key) access_key["secret_access_key"] = hub.tool.aws.b64.encode( encrypted_secret_access_key ) result["ret"] = access_key return result