Source code for idem_aws.states.aws.ec2.flow_log

"""
hub.exec.boto3.client.ec2.create_flow_logs
hub.exec.boto3.client.ec2.delete_flow_logs
hub.exec.boto3.client.ec2.describe_flow_logs
"""
import copy
from dataclasses import field
from dataclasses import make_dataclass
from typing import Any
from typing import Dict
from typing import List

__contracts__ = ["resource"]

TREQ = {
    "present": {
        "require": ["aws.ec2.subnet.present"],
    },
}


[docs]async def present( hub, ctx, name: str, resource_type: str, traffic_type: str, log_format: str = None, max_aggregation_interval: int = None, resource_id: str = None, log_destination: str = None, log_destination_type: str = None, log_group_name: str = None, iam_role: str = None, destination_options: make_dataclass( "DestinationOptionsRequest", [ ("FileFormat", str, field(default=None)), ("HiveCompatiblePartitions", bool, field(default=None)), ("PerHourPartition", bool, field(default=None)), ], ) = None, resource_ids: List[str] = None, tags: Dict[str, Any] or List[ make_dataclass( "Tag", [("Key", str, field(default=None)), ("Value", str, field(default=None))], ) ] = None, ) -> Dict[str, Any]: """Creates a flow log to capture information about IP traffic for a specific network interface, subnet, or VPC. Flow log data for a monitored network interface is recorded as flow log records, which are log events consisting of fields that describe the traffic flow. For more information, see Flow log records in the Amazon Virtual Private Cloud User Guide. When publishing to CloudWatch Logs, flow log records are published to a log group, and each network interface has a unique log stream in the log group. When publishing to Amazon S3, flow log records for all of the monitored network interfaces are published to a single log file object that is stored in the specified bucket. For more information, see VPC Flow Logs in the Amazon Virtual Private Cloud User Guide. Args: name(str): Name of the resource. log_group_name(str, Optional): log_group_name if the log_destination_type is cloudwatch. resource_type(str): Type of resource flow-log is attached to (Subnet, VPC, NetworkInterface). traffic_type(str): Type of traffic to be recorded (REJECT, ALL, ACCEPT). log_destination_type(str, Optional): S3 bucket or Default: cloud-watch-logs. log_destination(str, Optional): S3 bucket ARN. log_format(str, Optional): Syntax to be used to print log statements. max_aggregation_interval(int, Optional): Max interval during which packets are aggregated and then stored in log record. resource_id(str, Optional): AWS Flow log ID. iam_role(str, Optional): ARN of IAM role to be used to post in cloud-watch-logs. destination_options(dict[str, Any], Optional): The destination options. Defaults to None. * FileFormat (str, Optional): The format for the flow log. The default is plain-text. * HiveCompatiblePartitions (bool, Optional): Indicates whether to use Hive-compatible prefixes for flow logs stored in Amazon S3. The default is false. * PerHourPartition (bool, Optional): Indicates whether to partition the flow log per hour. This reduces the cost and response time for queries. The default is false. resource_ids([str]): list of resource_ids flow-log is attached to. tags(dict or list, Optional): Dict in the format of {tag-key: tag-value} or List of tags in the format of [{"Key": tag-key, "Value": tag-value}] to associate with the flow log resource. Defaults to None. * (Key, Optional): The key of the tag. Constraints: Tag keys are case-sensitive and accept a maximum of 127 Unicode characters. May not begin with aws:. * (Value, Optional): The value of the tag. Constraints: Tag values are case-sensitive and accept a maximum of 256 Unicode characters. Request Syntax: .. code-block:: sls [flow-log-id]: aws.ec2.flow_log.present: - name: 'string' - log_group_name: 'string' - resource_type: 'integer' - traffic_type: 'string' - tags: - Key: 'string' Value: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls fl-09c0787e693332a0a: aws.ec2.flow_log.present: - traffic_type: REJECT - log_destination_type: s3 """ result = dict(comment="", old_state=None, new_state=None, name=name, result=True) before = None resource_updated = False if resource_id: before = await hub.exec.aws.ec2.flow_log.get( ctx, name=name, resource_id=resource_id ) if isinstance(tags, List): tags_dict = hub.tool.aws.tag_utils.convert_tag_list_to_dict(tags) else: tags_dict = tags if before and before["result"] and before["ret"]: result["old_state"] = copy.deepcopy(before["ret"]) plan_state = copy.deepcopy(result["old_state"]) old_tags = before["ret"].get("tags") if tags_dict is not None: if not hub.tool.aws.state_comparison_utils.compare_dicts( tags_dict, old_tags ): result["comment"] = f"'{name}' already exists. Updating tags" update_ret = await hub.tool.aws.ec2.tag.update_tags( ctx, resource_id=resource_id, old_tags=old_tags, new_tags=tags_dict, ) if not update_ret["result"]: result["comment"] = update_ret["comment"] result["result"] = False if ctx.get("test", False) and update_ret["ret"] is not None: plan_state["tags"] = update_ret["ret"] resource_updated = resource_updated or bool(update_ret["result"]) else: if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_type": resource_type, "traffic_type": traffic_type, "log_format": log_format, "max_aggregation_interval": max_aggregation_interval, "resource_id": resource_id, "log_destination": log_destination, "log_destination_type": log_destination_type, "log_group_name": log_group_name, "iam_role": iam_role, "destination_options": destination_options, "resource_ids": resource_ids, "tags": tags_dict, }, ) result["comment"] = f"Would create aws.ec2.flow_log {name}" return result try: if isinstance(tags, Dict): tags_list = hub.tool.aws.tag_utils.convert_tag_dict_to_list(tags) else: tags_list = tags ret = await hub.exec.boto3.client.ec2.create_flow_logs( ctx, DeliverLogsPermissionArn=iam_role, TagSpecifications=[{"ResourceType": "vpc-flow-log", "Tags": tags_list}] if tags_list is not None else None, DestinationOptions=destination_options, LogDestination=log_destination, LogDestinationType=log_destination_type, LogFormat=log_format, LogGroupName=log_group_name, MaxAggregationInterval=max_aggregation_interval, ResourceIds=resource_ids, ResourceType=resource_type, TrafficType=traffic_type, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] = f"Created '{name}'" resource_id = ret["ret"]["FlowLogIds"][0] resource_updated = True except hub.tool.boto3.exception.ClientError as e: result["comment"] = f"{e.__class__.__name__}: {e}" if ctx.get("test", False): result["new_state"] = plan_state elif resource_updated: after = await hub.exec.aws.ec2.flow_log.get( ctx=ctx, name=name, resource_id=resource_id ) if after and after["result"] and after["ret"]: result["new_state"] = copy.deepcopy(after["ret"]) else: result["new_state"] = copy.deepcopy(result["old_state"]) return result
[docs]async def absent(hub, ctx, name: str, resource_id: str = None) -> Dict[str, Any]: """Deletes a flow log. Args: name(str): Name of the resource. resource_id(str, Optional): AWS Flow log ID. Request Syntax: .. code-block:: sls [flow-log-name]: aws.ec2.flow_log.present: - name: 'string' - resource_id: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls resource_is_absent: aws.ec2.flow_log.absent: - name: value - resource_id: value """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not resource_id: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.ec2.flow_log", name=name ) return result before = await hub.exec.aws.ec2.flow_log.get( ctx=ctx, name=name, resource_id=resource_id ) if not before["result"]: if "InvalidFlowLogId.NotFound" in str(before["comment"]): result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.ec2.flow_log", name=name ) else: result["result"] = False result["comment"] = before["comment"] return result elif not before["ret"]: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.ec2.flow_log", name=name ) return result result["old_state"] = before["ret"] if ctx.get("test", False): result["comment"] = hub.tool.aws.comment_utils.would_delete_comment( resource_type="aws.ec2.flow_log", name=name ) else: ret = await hub.exec.boto3.client.ec2.delete_flow_logs( ctx, FlowLogIds=[resource_id] ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] = hub.tool.aws.comment_utils.delete_comment( resource_type="aws.ec2.flow_log", name=name ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """ Describe the resource in a way that can be recreated/managed with the corresponding "present" function Describes one or more flow logs. To view the information in your flow logs (the log streams for the network interfaces), you must use the CloudWatch Logs console or the CloudWatch Logs API. Returns: Dict[str, Any] Examples: .. code-block:: bash $ idem describe aws_auto.ec2.flow_log """ result = {} ret = await hub.exec.boto3.client.ec2.describe_flow_logs(ctx) if not ret["result"]: hub.log.warning(f"Could not describe flow_log {ret['comment']}") return result for flow_log in ret["ret"]["FlowLogs"]: translated_resource = ( hub.tool.aws.ec2.conversion_utils.convert_raw_flow_log_to_present(flow_log) ) result[flow_log["FlowLogId"]] = { "aws.ec2.flow_log.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in translated_resource.items() ] } return result