Source code for idem_aws.states.aws.iam.user_ssh_key

"""State module for managing IAM User SSH Keys."""
import copy
from typing import Any
from typing import Dict

__contracts__ = ["resource"]


[docs]async def present( hub, ctx, name: str, user_name: str, ssh_public_key_body: str, resource_id: str = None, status: str = None, ) -> Dict[str, Any]: """Uploads an SSH public key and associates it with the specified IAM user. The SSH public key uploaded by this operation can be used only for authenticating the associated IAM user to an CodeCommit repository. Args: name(str): A name of the ssh public key to be added/modified. user_name(str): The name (friendly name, not ARN) of the IAM user to associate the SSH public key with. This parameter allows (through its regex pattern ) a string of characters consisting of upper and lowercase alphanumeric characters with no spaces. You can also include any of the following characters: _+=,.@- ssh_public_key_body(str): The SSH public key. The public key must be encoded in ssh-rsa format or PEM format. The minimum bit-length of the public key is 2048 bits. For example, you can generate a 2048-bit key, and the resulting PEM file is 1679 bytes long. resource_id(str, Optional): The Id of the ssh public key to be added/modified. status (str, Optional): The status of the SSH public key. The value can be 'Acitve' or 'Inactive'. Defaults 'Active'. Request Syntax: .. code-block:: sls [iam-user-ssh-key-name]: aws.iam.user_ssh_key.present: - user_name: 'string' - ssh_public_key_body: 'string' - resource_id: 'string' - status: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls APKASYLXZO1EQCR0ZAL7: aws.iam.user_ssh_key.present: - user_name: my-iam-user-1 - ssh_public_key_body: ssh-rsa AAAB3adeaadasdamasdasdasdsadsdasdadasda8ad800q3q3= - resource_id: my-iam-user-id - status: Active """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) is_ssh_key_associated = False before_ret = None resource_updated = False if resource_id: before_ret = ( await hub.tool.aws.iam.user_ssh_key.get_ssh_key_if_attached_to_user( ctx, user_name=user_name, ssh_public_key_id=resource_id ) ) if before_ret and before_ret["result"]: # ssh public key already exists for user. We can only modify status (Active | Inactive) # Check if status is modified and update if modified. ret_ssh_public_key_body = await hub.exec.boto3.client.iam.get_ssh_public_key( ctx=ctx, UserName=user_name, SSHPublicKeyId=resource_id, Encoding="SSH", ) result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_ssh_key_to_present( raw_resource=before_ret["ret"], idem_resource_name=resource_id, ret_ssh_public_key_body=ret_ssh_public_key_body, ) plan_state = copy.deepcopy(result["old_state"]) if status is not None and result["old_state"].get("status") != status: if not ctx.get("test", False): update_ret = await hub.exec.boto3.client.iam.update_ssh_public_key( ctx, UserName=user_name, SSHPublicKeyId=resource_id, Status=status ) result["result"] = update_ret["result"] if not result["result"]: result["comment"] = update_ret["comment"] return result result["comment"] += [ f"Updated aws.iam.user_ssh_key '{name}' status to '{status}'" ] resource_updated = resource_updated or bool(update_ret["ret"]) else: plan_state["status"] = status result["comment"] += [ f"Would update ssh public key '{name}' of user '{user_name}' status to {status}" ] is_ssh_key_associated = True else: if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "user_name": user_name, "ssh_public_key_body": ssh_public_key_body, "status": status, }, ) result["comment"] += [ f"Would upload the ssh public key and attach it to iam user '{user_name}'" ] return result ret = await hub.exec.boto3.client.iam.upload_ssh_public_key( ctx, UserName=user_name, SSHPublicKeyBody=ssh_public_key_body, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result name = ret["ret"].get("SSHPublicKey").get("SSHPublicKeyId") result["comment"] += [ f"Uploaded the ssh public key and attached it to iam user '{user_name}'" ] is_ssh_key_associated = True if is_ssh_key_associated: if ctx.get("test", False): result["new_state"] = plan_state elif (not before_ret) or resource_updated: after_ret = ( await hub.tool.aws.iam.user_ssh_key.get_ssh_key_if_attached_to_user( ctx, user_name=user_name, ssh_public_key_id=name ) ) if after_ret and after_ret["result"]: created_resource_id = after_ret["ret"].get("SSHPublicKeyId") ret_ssh_public_key_body = ( await hub.exec.boto3.client.iam.get_ssh_public_key( ctx=ctx, UserName=user_name, SSHPublicKeyId=created_resource_id, Encoding="SSH", ) ) result[ "new_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_ssh_key_to_present( raw_resource=after_ret["ret"], idem_resource_name=created_resource_id, ret_ssh_public_key_body=ret_ssh_public_key_body, ) else: result["comment"] = after_ret["comment"] result["result"] = False else: result["new_state"] = copy.deepcopy(result["old_state"]) return result
[docs]async def absent( hub, ctx, name: str, user_name: str = None, resource_id: str = None, ) -> Dict[str, Any]: """Deletes the specified SSH public key. The SSH public key deleted by this operation is used only for authenticatingthe associated IAM user to an CodeCommit repository. Args: name(str): The name of the ssh public key to be deleted. user_name(str, Optional): The name (friendly name, not ARN) of the IAM user to delete the ssh public key from. resource_id(str, Optional) : The ID of the ssh public key to be deleted. Request Syntax: .. code-block:: sls [iam-user-ssh-key-id]: aws.iam.user_ssh_key.absent: - name: 'string' - user_name: 'string' - resource_id: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls APKASYLXZO1EQCR0ZAL7: aws.iam.user_ssh_key.absent: - user_name: my-iam-user-1 - name: APKASYLXZO1EQCR0ZAL7 - resource_id: APKASYLXZO1EQCR0ZAL7 """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not resource_id and not user_name: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.iam.user_ssh_key", name=name ) return result if not resource_id or not user_name: result["comment"] = hub.tool.aws.comment_utils.missing_args_for_absent_comment( resource_type="aws.iam.user_ssh_key", name=name, args=["resource_id", "user_name"], ) result["result"] = False return result before_ret = await hub.tool.aws.iam.user_ssh_key.get_ssh_key_if_attached_to_user( ctx, user_name=user_name, ssh_public_key_id=resource_id ) if not before_ret["result"]: result["result"] = False result["comment"] = before_ret["comment"] return result if not before_ret["ret"]: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.iam.user_ssh_key", name=name ) elif ctx.get("test", False): ret_ssh_public_key_body = await hub.exec.boto3.client.iam.get_ssh_public_key( ctx=ctx, UserName=user_name, SSHPublicKeyId=resource_id, Encoding="SSH", ) result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_ssh_key_to_present( raw_resource=before_ret["ret"], idem_resource_name=resource_id, ret_ssh_public_key_body=ret_ssh_public_key_body, ) result["comment"] = hub.tool.aws.comment_utils.would_delete_comment( resource_type="aws.iam.user_ssh_key", name=name ) return result else: ret_ssh_public_key_body = await hub.exec.boto3.client.iam.get_ssh_public_key( ctx=ctx, UserName=user_name, SSHPublicKeyId=resource_id, Encoding="SSH", ) result[ "old_state" ] = hub.tool.aws.iam.conversion_utils.convert_raw_user_ssh_key_to_present( raw_resource=before_ret["ret"], idem_resource_name=resource_id, ret_ssh_public_key_body=ret_ssh_public_key_body, ) ret = await hub.exec.boto3.client.iam.delete_ssh_public_key( ctx, UserName=user_name, SSHPublicKeyId=name ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] result["result"] = False return result result["comment"] = hub.tool.aws.comment_utils.delete_comment( resource_type="aws.iam.user_ssh_key", name=name ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Describe the resource in a way that can be recreated/managed with the corresponding "present" function Lists all ssh public keys of all IAM users. Returns: Dict[str, Any] Examples: .. code-block:: bash $ idem describe aws.iam.user_ssh_key """ result = {} # Fetch all users and then ssh public keys for each of them users = await hub.exec.boto3.client.iam.list_users(ctx) if not users["result"]: hub.log.warning(f"Could not describe user {users['comment']}") return {} user_names = [user.get("UserName") for user in users["ret"]["Users"]] for user_name in user_names: ret_attached_ssh_public_keys = ( await hub.exec.boto3.client.iam.list_ssh_public_keys( ctx=ctx, UserName=user_name ) ) if not ret_attached_ssh_public_keys["result"]: hub.log.warning( f"Could not get attached ssh public keys with user {user_name} with error" f" {ret_attached_ssh_public_keys['comment']} . Describe will skip this user and continue." ) else: ssh_public_keys_list = ret_attached_ssh_public_keys["ret"].get( "SSHPublicKeys" ) if ssh_public_keys_list: for resource in ssh_public_keys_list: resource_id = resource.get("SSHPublicKeyId") # get ssh key body ret_ssh_public_key_body = ( await hub.exec.boto3.client.iam.get_ssh_public_key( ctx=ctx, UserName=user_name, SSHPublicKeyId=resource_id, Encoding="SSH", ) ) resource_translated = hub.tool.aws.iam.conversion_utils.convert_raw_user_ssh_key_to_present( raw_resource=resource, idem_resource_name=resource_id, ret_ssh_public_key_body=ret_ssh_public_key_body, ) result[resource_id] = { "aws.iam.user_ssh_key.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource_translated.items() ] } return result