Source code for idem_aws.exec.aws.guardduty.detector
"""Exec module for managing Guardduty detectors."""
__func_alias__ = {"list_": "list"}
from typing import Dict
[docs]async def get(hub, ctx, resource_id: str, name: str = None) -> Dict:
"""Get info about AWS Guardduty detector based on the detector_id passed.
Args:
resource_id(str):
AWS Guardduty Detector id
name(str, Optional):
Name of the Idem state
"""
result = dict(comment=[], ret=None, result=True)
before = await hub.exec.boto3.client.guardduty.get_detector(
ctx, DetectorId=resource_id
)
if not before["result"]:
if "BadRequestException" in str(before["comment"]):
result["comment"].append(
hub.tool.aws.comment_utils.get_empty_comment(
resource_type="aws.guardduty.detector", name=name
)
)
return result
result["comment"] += list(before["comment"])
result["result"] = False
return result
result[
"ret"
] = await hub.tool.aws.guardduty.conversion_utils.convert_raw_detector_to_present_async(
ctx, raw_resource=before, idem_resource_name=resource_id
)
return result
[docs]async def list_(hub, ctx, name: str = None) -> Dict:
"""List AWS guard duty detectors.
Args:
name(str, Optional):
Name of the Idem state for logging purposes.
Returns:
Dict[str, Any]:
Return detectors.
"""
result = dict(comment=[], ret=[], result=True)
before = await hub.exec.boto3.client.guardduty.list_detectors(ctx)
if not before["result"]:
result["comment"] += list(before["comment"])
result["result"] = False
return result
if not before["ret"]["DetectorIds"]:
result["comment"].append(
hub.tool.aws.comment_utils.list_empty_comment(
resource_type="aws.guardduty.detector", name=name
)
)
return result
for detector_id in before["ret"]["DetectorIds"]:
detector_ret = await get(hub, ctx, resource_id=detector_id)
if not detector_ret["result"]:
hub.log.warning(
f"Could not get detector info for id {detector_id}, hence skipping it in list"
)
result["comment"].append(detector_ret["comment"])
continue
result["ret"].append(detector_ret["ret"])
return result