Source code for idem_aws.states.aws.guardduty.detector

"""State module for managing Amazon GuardDuty Detector."""
import copy
from dataclasses import field
from dataclasses import make_dataclass
from typing import Any
from typing import Dict

__contracts__ = ["resource"]


[docs]async def present( hub, ctx, name: str, enable: bool = True, resource_id: str = None, client_token: str = None, finding_publishing_frequency: str = None, data_sources: make_dataclass( """Describes which data sources will be enabled for the detector.""" "DataSourceConfiguration", [ ( "S3Logs", make_dataclass( """Describes whether S3 data event logs are enabled as a data source.""" "S3LogsConfiguration", [("Enable", bool)], ), field(default=None), ), ( "Kubernetes", make_dataclass( """Describes whether any Kubernetes logs are enabled as data sources.""" "KubernetesConfiguration", [ ( "AuditLogs", make_dataclass( """The status of Kubernetes audit logs as a data source.""" "KubernetesAuditLogsConfiguration", [("Enable", bool)], ), ) ], ), field(default=None), ), ( "MalwareProtection", make_dataclass( """Describes whether Malware Protection is enabled as a data source.""" "MalwareProtectionConfiguration", [ ( "ScanEc2InstanceWithFindings", make_dataclass( """Describes the configuration of Malware Protection for EC2 instances with findings.""" "ScanEc2InstanceWithFindingsConfiguration", [("EbsVolumes", bool, field(default=None))], ), field(default=None), ) ], ), field(default=None), ), ], ) = None, tags: Dict[str, str] = None, ) -> Dict[str, Any]: """Creates an Amazon GuardDuty detector. A detector is a resource that represents the GuardDuty service. To start using GuardDuty, you must create a detector in each Region where you enable the service. You can have only one detector per account per Region. All data sources are enabled in a new detector by default. Args: name(str): An Idem name of the resource. resource_id(str, Optional): The ID of the detector in Amazon Web Services. enable(bool, Optional): A Boolean value that specifies whether the detector is to be enabled. Default value is ``True``. client_token(str, Optional): The idempotency token for the create request. This field is auto_populated if not provided. finding_publishing_frequency(str, Optional): A value that specifies how frequently updated findings are exported. Valid values are ``FIFTEEN_MINUTES``, ``ONE_HOUR``, ``SIX_HOURS``. data_sources(dict, Optional): Describes which data sources will be enabled for the detector. * S3Logs (*dict, Optional*): Describes whether S3 data event logs are enabled as a data source. * Enable (*bool*): The status of S3 data event logs as a data source. * Kubernetes (*dict, Optional*): Describes whether any Kubernetes logs are enabled as data sources. * AuditLogs (*dict*): The status of Kubernetes audit logs as a data source. * Enable (*bool*): The status of Kubernetes audit logs as a data source. * MalwareProtection (*dict, Optional*): Describes whether Malware Protection is enabled as a data source. * ScanEc2InstanceWithFindings (*dict, Optional*): Describes the configuration of Malware Protection for EC2 instances with findings. EbsVolumes (*bool, Optional*): Describes the configuration for scanning EBS volumes as data source. tags(dict, Optional): Dict in the format of ``{tag-key: tag-value}`` to associate with the detector. Request Syntax: .. code-block:: sls [idem_test_aws_guardduty_detector]: aws.guaardduty.detector.present: - name: 'string' - enable: True|False - client_token: 'string' - finding_publishing_frequency: 'FIFTEEN_MINUTES|ONE_HOUR|SIX_HOURS' - data_sources: S3Logs: Enable: True|False Kubernetes: AuditLogs: Enable: True|False MalwareProtection: ScanEc2InstanceWithFindings: EbsVolumes: True|False - tags: 'string': 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls idem_test_aws_guardduty_detector: aws.guardduty.detector.present: - name: idem_test_guardduty_detector - enable: True - finding_publishing_frequency: 'ONE_HOUR' - data_sources: S3Logs: Enable: true - tags: provider: idem """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) before = None resource_updated = False if resource_id: before = await hub.exec.aws.guardduty.detector.get( ctx, resource_id=resource_id, name=name ) if not before["result"]: result["result"] = False result["comment"] = before["comment"] return result if not before or not before["ret"]: if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "enable": enable, "finding_publishing_frequency": finding_publishing_frequency, "data_sources": data_sources, "tags": tags, }, ) result["comment"] = hub.tool.aws.comment_utils.would_create_comment( resource_type="aws.guardduty.detector", name=name ) return result try: ret = await hub.exec.boto3.client.guardduty.create_detector( ctx, Enable=enable, ClientToken=client_token, FindingPublishingFrequency=finding_publishing_frequency, DataSources=data_sources, Tags=tags, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] = result[ "comment" ] + hub.tool.aws.comment_utils.create_comment( resource_type="aws.guardduty.detector", name=name ) resource_id = ret["ret"]["DetectorId"] except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] result["result"] = False return result else: try: account_details = await hub.exec.boto3.client.sts.get_caller_identity(ctx) account_id = account_details["ret"]["Account"] region_name = ctx["acct"]["region_name"] detector_arn = hub.tool.aws.arn_utils.build( service="guardduty", region=region_name, account_id=account_id, resource="detector/" + resource_id, ) result["old_state"] = copy.deepcopy(before["ret"]) plan_state = copy.deepcopy(result["old_state"]) update_ret = await hub.tool.aws.guardduty.detector.update( ctx, before=before, detector_id=resource_id, finding_publishing_frequency=finding_publishing_frequency, data_sources=data_sources, ) result["comment"] += update_ret["comment"] result["result"] = update_ret["result"] resource_updated = resource_updated or bool(update_ret["ret"]) if update_ret["ret"] and ctx.get("test", False): if "finding_publishing_frequency" in update_ret["ret"]: plan_state["finding_publishing_frequency"] = update_ret["ret"][ "finding_publishing_frequency" ] if "data_sources" in update_ret["ret"]: plan_state["data_sources"] = update_ret["ret"]["data_sources"] if (tags is not None) and (result["old_state"].get("tags", {}) != tags): update_tags_ret = await hub.tool.aws.guardduty.detector.update_tags( ctx=ctx, resource_arn=detector_arn, old_tags=result["old_state"].get("tags", {}), new_tags=tags, ) if not result["result"]: result["comment"] = update_tags_ret["comment"] result["result"] = False return result result["comment"] += update_tags_ret["comment"] result["result"] = result["result"] and update_tags_ret["result"] resource_updated = resource_updated or bool(update_tags_ret["ret"]) if ctx.get("test", False) and update_tags_ret["ret"] is not None: plan_state["tags"] = update_tags_ret["ret"].get("tags") if not resource_updated: result["comment"] += [f"{name} already exists"] if ctx.get("test", False) and resource_updated: result["comment"] = ( hub.tool.aws.comment_utils.would_update_comment( resource_type="aws.guardduty.detector", name=name ) + result["comment"] ) except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] result["result"] = False return result try: if ctx.get("test", False): result["new_state"] = plan_state elif (not before) or (not before["ret"]) or resource_updated: after = await hub.exec.aws.guardduty.detector.get( ctx, resource_id=resource_id, name=name ) if not after["result"] or not after["ret"]: result["result"] = False result["comment"] = after["comment"] return result result["new_state"] = copy.deepcopy(after["ret"]) else: result["new_state"] = copy.deepcopy(result["old_state"]) except Exception as e: result["comment"] += [str(e)] result["result"] = False return result
[docs]async def absent( hub, ctx, name: str, resource_id: str = None, ) -> Dict[str, Any]: """Deletes an Amazon GuardDuty detector. Args: name(str): An Idem name of the resource. resource_id(str, Optional): The ID of the detector in Amazon Web Services. Request syntax: .. code-block:: sls [idem_test_aws_guardduty_detector]: aws.guardduty.detector.absent: - name: 'string' - resource_id: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls idem_test_aws_guardduty_detector: aws.guardduty.detector.absent: - name: idem_test_guardduty_detector - resource_id: cebf7ced6562d943d61f76a915e32563 """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not resource_id: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.guardduty.detector", name=name ) return result before = await hub.exec.aws.guardduty.detector.get( ctx, resource_id=resource_id, name=name ) if not before["result"] or not before["ret"]: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.guardduty.detector", name=name ) elif ctx.get("test", False): result["old_state"] = copy.deepcopy(before["ret"]) result["comment"] = hub.tool.aws.comment_utils.would_delete_comment( resource_type="aws.guardduty.detector", name=name ) return result else: result["old_state"] = copy.deepcopy(before["ret"]) try: ret = await hub.exec.boto3.client.guardduty.delete_detector( ctx, DetectorId=resource_id ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] result["result"] = False return result result["comment"] = hub.tool.aws.comment_utils.delete_comment( resource_type="aws.guardduty.detector", name=name ) except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Describes AWS GuardDuty detectors in a way that can be recreated/managed with the corresponding "present" function. Returns: Dict[str, Dict[str, Any]] Examples: .. code-block:: bash $ idem describe aws.guardduty.detector """ result = {} ret = await hub.exec.boto3.client.guardduty.list_detectors(ctx) if not ret["ret"]["DetectorIds"]: hub.log.warning(f"Could not list detector {ret['comment']}") return {} for resource_id in ret["ret"]["DetectorIds"]: detector_id = resource_id detector = await hub.exec.boto3.client.guardduty.get_detector( ctx, DetectorId=resource_id ) resource_translated = await hub.tool.aws.guardduty.conversion_utils.convert_raw_detector_to_present_async( ctx, raw_resource=detector, idem_resource_name=detector_id ) result[detector_id] = { "aws.guardduty.detector.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource_translated.items() ] } return result