Source code for idem_aws.states.aws.guardduty.organization_admin_account

"""State module for enabling organization admin account for Guardduty."""
import copy
from typing import Any
from typing import Dict

__contracts__ = ["resource"]


[docs]async def present(hub, ctx, name: str, resource_id: str) -> Dict[str, Any]: r"""Enables an Amazon Web Services account within the organization as the GuardDuty delegated administrator. Args: name(str): An Idem name of the resource. resource_id(str): An identifier of the resource in the provider. Here it is the Amazon Web Services Account ID for the organization account to be enabled as a GuardDuty delegated administrator. Request Syntax: .. code-block:: sls resource_is_present: aws.guardduty.organization_admin_account.present: - name: 'string' - resource_id: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls resource_is_present: aws.guardduty.organization_admin_account.present: - name: value - resource_id: value """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) resource_created = False before = await hub.exec.aws.guardduty.organization_admin_account.list( ctx, name=name ) if not before["result"]: result["result"] = before["result"] result["comment"] = before["comment"] return result if before and before["ret"]: admin_accounts = before["ret"] # list call might return more than one account, so check whether it contains resource_id filtered_admin_accounts = filter( lambda admin: admin.get("resource_id") == resource_id and admin.get("admin_status") == "ENABLED", admin_accounts, ) accounts = list(filtered_admin_accounts) if len(accounts) != 0: # resource_id is already an admin result["comment"] = hub.tool.aws.comment_utils.already_exists_comment( resource_type="aws.guardduty.organization_admin_account", name=name ) result["old_state"] = accounts[0] else: # some other account is an admin, so end the operation enabled_account_id = admin_accounts[0]["resource_id"] result["comment"] = ( f"{enabled_account_id} is already enabled as organization_admin_account. " f"{resource_id} can only be enabled if {enabled_account_id} is removed as organization_admin_account" ) result["result"] = False return result else: resource_created = True if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_id": resource_id, }, ) result["comment"] = hub.tool.aws.comment_utils.would_create_comment( resource_type="aws.guardduty.organization_admin_account", name=name ) return result ret = await hub.exec.boto3.client.guardduty.enable_organization_admin_account( ctx, AdminAccountId=resource_id ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] = hub.tool.aws.comment_utils.create_comment( resource_type="aws.guardduty.organization_admin_account", name=name ) if resource_created: # fetch resource after = await hub.exec.aws.guardduty.organization_admin_account.list( ctx, name=name ) if not after["result"]: result["result"] = after["result"] result["comment"] = after["comment"] return result admin_accounts = after["ret"] filtered_admin_accounts = filter( lambda admin: admin.get("resource_id") == resource_id and admin.get("admin_status") == "ENABLED", admin_accounts, ) account = list(filtered_admin_accounts)[0] result["new_state"] = account else: # already exists result["new_state"] = copy.deepcopy(result["old_state"]) return result
[docs]async def absent(hub, ctx, name: str, resource_id: str = None) -> Dict[str, Any]: r"""Disables an Amazon Web Services account within the Organization as the GuardDuty delegated administrator. Args: name(str): An Idem name of the resource. resource_id(str, Optional): The Amazon Web Services Account ID for the organizations account to be disabled as a GuardDuty delegated administrator. Returns: Dict[str, Any] Examples: .. code-block:: sls resource_is_absent: aws.guardduty.organization_admin_account.absent: - name: value - resource_id: value """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) before = await hub.exec.aws.guardduty.organization_admin_account.list( ctx, name=name ) if not before["ret"]: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.guardduty.organization_admin_account", name=name ) else: admin_accounts = before["ret"] # list call might return more than one account, so check whether it contains resource_id filtered_admin_accounts = filter( lambda admin: admin.get("resource_id") == resource_id and admin.get("admin_status") == "ENABLED", admin_accounts, ) eligible_accounts = list(filtered_admin_accounts) if len(eligible_accounts) != 0: # resource_id is the admin, disable it eligible_account = eligible_accounts[0] if ctx.get("test", False): result["old_state"] = eligible_account result["comment"] = hub.tool.aws.comment_utils.would_delete_comment( resource_type="aws.guardduty.organization_admin_account", name=name ) return result else: result["old_state"] = eligible_account try: ret = await hub.exec.boto3.client.guardduty.disable_organization_admin_account( ctx, AdminAccountId=resource_id ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] = hub.tool.aws.comment_utils.delete_comment( resource_type="aws.guardduty.organization_admin_account", name=name, ) except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] else: # already absent result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.guardduty.organization_admin_account", name=name ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: r"""Describe the resource in a way that can be recreated/managed with the corresponding "present" function. Lists the accounts configured as GuardDuty delegated administrators. Returns: Dict[str, Dict[str, Any]] Examples: .. code-block:: bash $ idem describe aws.guardduty.organization_admin_account """ result = {} ret = await hub.exec.aws.guardduty.organization_admin_account.list( ctx, name="list organization admins" ) if not ret["result"]: hub.log.warning(f"Could not list organization_admin_account {ret['comment']}") return {} for organization_admin_account in ret["ret"]: resource_id = organization_admin_account["resource_id"] # to avoid any other state overriding describe if just resource_id is the state_id resource_key = f"aws-guardduty-org_admin_account-{resource_id}" result[resource_key] = { "aws.guardduty.organization_admin_account.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in organization_admin_account.items() ] } return result