"""
hub.exec.boto3.client.ec2.create_flow_logs
hub.exec.boto3.client.ec2.delete_flow_logs
hub.exec.boto3.client.ec2.describe_flow_logs
"""
import copy
from dataclasses import field
from dataclasses import make_dataclass
from typing import Any
from typing import Dict
from typing import List
__contracts__ = ["resource"]
TREQ = {
"present": {
"require": ["aws.ec2.subnet.present"],
},
}
[docs]async def present(
hub,
ctx,
name: str,
resource_type: str,
traffic_type: str,
log_format: str = None,
max_aggregation_interval: int = None,
resource_id: str = None,
log_destination: str = None,
log_destination_type: str = None,
log_group_name: str = None,
iam_role: str = None,
destination_options: make_dataclass(
"DestinationOptionsRequest",
[
("FileFormat", str, field(default=None)),
("HiveCompatiblePartitions", bool, field(default=None)),
("PerHourPartition", bool, field(default=None)),
],
) = None,
resource_ids: List[str] = None,
tags: Dict[str, Any]
or List[
make_dataclass(
"Tag",
[("Key", str, field(default=None)), ("Value", str, field(default=None))],
)
] = None,
) -> Dict[str, Any]:
"""Creates a flow log to capture information about IP traffic for a specific network interface, subnet,
or VPC.
Flow log data for a monitored network interface is recorded as flow log records, which are log events
consisting of fields that describe the traffic flow. For more information, see Flow log records in the Amazon
Virtual Private Cloud User Guide. When publishing to CloudWatch Logs, flow log records are published to a log
group, and each network interface has a unique log stream in the log group. When publishing to Amazon S3, flow
log records for all of the monitored network interfaces are published to a single log file object that is stored
in the specified bucket. For more information, see VPC Flow Logs in the Amazon Virtual Private Cloud User Guide.
Args:
name(str):
Name of the resource.
log_group_name(str, Optional):
log_group_name if the log_destination_type is cloudwatch.
resource_type(str):
Type of resource flow-log is attached to (Subnet, VPC, NetworkInterface).
traffic_type(str):
Type of traffic to be recorded (REJECT, ALL, ACCEPT).
log_destination_type(str, Optional):
S3 bucket or Default: cloud-watch-logs.
log_destination(str, Optional):
S3 bucket ARN.
log_format(str, Optional):
Syntax to be used to print log statements.
max_aggregation_interval(int, Optional):
Max interval during which packets are aggregated and then stored in log record.
resource_id(str, Optional):
AWS Flow log ID.
iam_role(str, Optional):
ARN of IAM role to be used to post in cloud-watch-logs.
destination_options(dict[str, Any], Optional):
The destination options. Defaults to None.
* FileFormat (str, Optional):
The format for the flow log. The default is plain-text.
* HiveCompatiblePartitions (bool, Optional):
Indicates whether to use Hive-compatible prefixes for flow logs stored in Amazon S3. The default is
false.
* PerHourPartition (bool, Optional):
Indicates whether to partition the flow log per hour. This reduces the cost and response time for
queries. The default is false.
resource_ids([str]):
list of resource_ids flow-log is attached to.
tags(dict or list, Optional):
Dict in the format of {tag-key: tag-value} or List of tags in the format of
[{"Key": tag-key, "Value": tag-value}] to associate with the flow log resource. Defaults to None.
* (Key, Optional):
The key of the tag. Constraints: Tag keys are case-sensitive and accept a maximum of 127 Unicode
characters. May not begin with aws:.
* (Value, Optional):
The value of the tag. Constraints: Tag values are case-sensitive and accept a maximum of 256
Unicode characters.
Request Syntax:
.. code-block:: sls
[flow-log-id]:
aws.ec2.flow_log.present:
- name: 'string'
- log_group_name: 'string'
- resource_type: 'integer'
- traffic_type: 'string'
- tags:
- Key: 'string'
Value: 'string'
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
fl-09c0787e693332a0a:
aws.ec2.flow_log.present:
- traffic_type: REJECT
- log_destination_type: s3
"""
result = dict(comment="", old_state=None, new_state=None, name=name, result=True)
before = None
resource_updated = False
if resource_id:
before = await hub.exec.aws.ec2.flow_log.get(
ctx, name=name, resource_id=resource_id
)
if isinstance(tags, List):
tags_dict = hub.tool.aws.tag_utils.convert_tag_list_to_dict(tags)
else:
tags_dict = tags
if before and before["result"] and before["ret"]:
result["old_state"] = copy.deepcopy(before["ret"])
plan_state = copy.deepcopy(result["old_state"])
old_tags = before["ret"].get("tags")
if tags_dict is not None:
if not hub.tool.aws.state_comparison_utils.compare_dicts(
tags_dict, old_tags
):
result["comment"] = f"'{name}' already exists. Updating tags"
update_ret = await hub.tool.aws.ec2.tag.update_tags(
ctx,
resource_id=resource_id,
old_tags=old_tags,
new_tags=tags_dict,
)
if not update_ret["result"]:
result["comment"] = update_ret["comment"]
result["result"] = False
if ctx.get("test", False) and update_ret["ret"] is not None:
plan_state["tags"] = update_ret["ret"]
resource_updated = resource_updated or bool(update_ret["result"])
else:
if ctx.get("test", False):
result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state(
enforced_state={},
desired_state={
"name": name,
"resource_type": resource_type,
"traffic_type": traffic_type,
"log_format": log_format,
"max_aggregation_interval": max_aggregation_interval,
"resource_id": resource_id,
"log_destination": log_destination,
"log_destination_type": log_destination_type,
"log_group_name": log_group_name,
"iam_role": iam_role,
"destination_options": destination_options,
"resource_ids": resource_ids,
"tags": tags_dict,
},
)
result["comment"] = f"Would create aws.ec2.flow_log {name}"
return result
try:
if isinstance(tags, Dict):
tags_list = hub.tool.aws.tag_utils.convert_tag_dict_to_list(tags)
else:
tags_list = tags
ret = await hub.exec.boto3.client.ec2.create_flow_logs(
ctx,
DeliverLogsPermissionArn=iam_role,
TagSpecifications=[{"ResourceType": "vpc-flow-log", "Tags": tags_list}]
if tags_list is not None
else None,
DestinationOptions=destination_options,
LogDestination=log_destination,
LogDestinationType=log_destination_type,
LogFormat=log_format,
LogGroupName=log_group_name,
MaxAggregationInterval=max_aggregation_interval,
ResourceIds=resource_ids,
ResourceType=resource_type,
TrafficType=traffic_type,
)
result["result"] = ret["result"]
if not result["result"]:
result["comment"] = ret["comment"]
return result
result["comment"] = f"Created '{name}'"
resource_id = ret["ret"]["FlowLogIds"][0]
resource_updated = True
except hub.tool.boto3.exception.ClientError as e:
result["comment"] = f"{e.__class__.__name__}: {e}"
if ctx.get("test", False):
result["new_state"] = plan_state
elif resource_updated:
after = await hub.exec.aws.ec2.flow_log.get(
ctx=ctx, name=name, resource_id=resource_id
)
if after and after["result"] and after["ret"]:
result["new_state"] = copy.deepcopy(after["ret"])
else:
result["new_state"] = copy.deepcopy(result["old_state"])
return result
[docs]async def absent(hub, ctx, name: str, resource_id: str = None) -> Dict[str, Any]:
"""Deletes a flow log.
Args:
name(str): Name of the resource.
resource_id(str, Optional): AWS Flow log ID.
Request Syntax:
.. code-block:: sls
[flow-log-name]:
aws.ec2.flow_log.present:
- name: 'string'
- resource_id: 'string'
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
resource_is_absent:
aws.ec2.flow_log.absent:
- name: value
- resource_id: value
"""
result = dict(comment=[], old_state=None, new_state=None, name=name, result=True)
if not resource_id:
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.ec2.flow_log", name=name
)
return result
before = await hub.exec.aws.ec2.flow_log.get(
ctx=ctx, name=name, resource_id=resource_id
)
if not before["result"]:
if "InvalidFlowLogId.NotFound" in str(before["comment"]):
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.ec2.flow_log", name=name
)
else:
result["result"] = False
result["comment"] = before["comment"]
return result
elif not before["ret"]:
result["comment"] = hub.tool.aws.comment_utils.already_absent_comment(
resource_type="aws.ec2.flow_log", name=name
)
return result
result["old_state"] = before["ret"]
if ctx.get("test", False):
result["comment"] = hub.tool.aws.comment_utils.would_delete_comment(
resource_type="aws.ec2.flow_log", name=name
)
else:
ret = await hub.exec.boto3.client.ec2.delete_flow_logs(
ctx, FlowLogIds=[resource_id]
)
result["result"] = ret["result"]
if not result["result"]:
result["comment"] = ret["comment"]
return result
result["comment"] = hub.tool.aws.comment_utils.delete_comment(
resource_type="aws.ec2.flow_log", name=name
)
return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]:
"""
Describe the resource in a way that can be recreated/managed with the corresponding "present" function
Describes one or more flow logs. To view the information in your flow logs (the log streams for the network
interfaces), you must use the CloudWatch Logs console or the CloudWatch Logs API.
Returns:
Dict[str, Any]
Examples:
.. code-block:: bash
$ idem describe aws_auto.ec2.flow_log
"""
result = {}
ret = await hub.exec.boto3.client.ec2.describe_flow_logs(ctx)
if not ret["result"]:
hub.log.warning(f"Could not describe flow_log {ret['comment']}")
return result
for flow_log in ret["ret"]["FlowLogs"]:
translated_resource = (
hub.tool.aws.ec2.conversion_utils.convert_raw_flow_log_to_present(flow_log)
)
result[flow_log["FlowLogId"]] = {
"aws.ec2.flow_log.present": [
{parameter_key: parameter_value}
for parameter_key, parameter_value in translated_resource.items()
]
}
return result