"""State module for managing network Security Rules."""
import copy
from typing import Any
from typing import Dict
from typing import List
__contracts__ = ["resource"]
[docs]async def present(
hub,
ctx,
name: str,
resource_group_name: str,
network_security_group_name: str,
security_rule_name: str,
priority: int,
direction: str,
access: str,
protocol: str,
description: str = None,
source_port_range: str = None,
source_port_ranges: List[str] = None,
source_address_prefix: str = None,
source_address_prefixes: List[str] = None,
source_application_security_groups: List[str] = None,
destination_port_range: str = None,
destination_port_ranges: List[str] = None,
destination_address_prefix: str = None,
destination_address_prefixes: List[str] = None,
destination_application_security_groups: List[str] = None,
subscription_id: str = None,
resource_id: str = None,
) -> Dict[str, Any]:
r"""Create or update Security Rules.
Args:
name(str): The identifier for this state.
resource_group_name(str): The name of the resource group.
network_security_group_name(str): The name of the network security group.
security_rule_name(str): The name of the security rule.
priority(int): The priority of the security rule. The value can be between 100 and 4096.The priority number must
be unique for each rule in the collection. The lower the priority number, the higher the priority of the rule.
direction(str): The direction of the rule. The direction specifies if rule will be evaluated on incoming or outgoing traffic.
access(str): The network traffic is allowed or denied.
protocol(str): Network protocol this rule applies to.
description(str, Optional): A description for this rule.
source_port_range(str, Optional): The source port or range. Integer or range between 0 and 65535. Asterisk '*'
can also be used to match all ports.
source_port_ranges(list[str], Optional): The source port ranges. Either this or source_port_range need to be provided.
source_address_prefix(str, Optional): The source address prefix. CIDR or source IP range. Asterisk '*' can also be used to match all
source IPs. Default tags such as 'VirtualNetwork', 'AzureLoadBalancer' and 'Internet' can also be used. If
this is an ingress rule, specifies where network traffic originates from.
source_address_prefixes(list[str], Optional): The CIDR or source IP ranges.
source_application_security_groups(list[str], Optional): The list of resource id of the application security group.
Either one of source_address_prefix, source_address_prefixes or source_application_security_groups needs to be provided.
destination_port_range(str, Optional): The destination port or range. Integer or range between 0 and 65535.
Asterisk '*' can also be used to match all ports.
destination_port_ranges(list[str], Optional): The destination port ranges. Either this or destination_port_range need to be provided.
destination_address_prefix(str, Optional): The destination address prefix. CIDR or destination IP range.
Asterisk '*' can also be used to match all source IPs. Default tags such as 'VirtualNetwork',
'AzureLoadBalancer' and 'Internet' can also be used.
destination_address_prefixes(list[str], Optional): The destination address prefixes. CIDR or destination IP ranges.
destination_application_security_groups(list[str], Optional): The list of resource id of the application security group.
Either one of destination_address_prefix, destination_address_prefixes or destination_application_security_groups needs to be provided.
subscription_id(str, Optional): Subscription Unique id.
resource_id(str, Optional): Security rule resource id.
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
resource_is_present:
azure.network.security_rules.present:
- name: rule1
- resource_id: /subscriptions/12345678-1234-1234-1234-aaabc1234aaa/resourceGroups/rg-1/providers/Microsoft.Network/networkSecurityGroups/nsg-1/securityRules/rule1
- resource_group_name: rg-1
- subscription_id: 12345678-1234-1234-1234-aaabc1234aaa
- network_security_group_name: nsg-1
- security_rule_name: rule1
- protocol: '*'
- source_address_prefix: '*'
- destination_port_range: '80'
- destination_address_prefix: '*'
- access: Allow
- priority: 130
- direction: Inbound
- source_port_ranges:
- '20'
- '50'
- '80'
"""
result = {
"name": name,
"result": True,
"old_state": None,
"new_state": None,
"comment": [],
}
if subscription_id is None:
subscription_id = ctx.acct.subscription_id
if resource_id is None:
resource_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Network/networkSecurityGroups/{network_security_group_name}/securityRules/{security_rule_name}"
response_get = await hub.exec.azure.network.security_rules.get(
ctx, resource_id=resource_id, raw=True
)
if not response_get["result"]:
hub.log.debug(
f"Could not get azure.network.security_rules {response_get['comment']} {response_get['ret']}"
)
result["result"] = False
result["comment"].extend(
hub.tool.azure.result_utils.extract_error_comments(response_get)
)
return result
if not response_get["ret"]:
if ctx.get("test", False):
result["new_state"] = hub.tool.azure.test_state_utils.generate_test_state(
enforced_state={},
desired_state={
"name": name,
"resource_group_name": resource_group_name,
"network_security_group_name": network_security_group_name,
"security_rule_name": security_rule_name,
"priority": priority,
"direction": direction,
"access": access,
"protocol": protocol,
"description": description,
"source_port_range": source_port_range,
"source_port_ranges": source_port_ranges,
"source_address_prefix": source_address_prefix,
"source_address_prefixes": source_address_prefixes,
"source_application_security_groups": source_application_security_groups,
"destination_port_range": destination_port_range,
"destination_port_ranges": destination_port_ranges,
"destination_address_prefix": destination_address_prefix,
"destination_address_prefixes": destination_address_prefixes,
"destination_application_security_groups": destination_application_security_groups,
"subscription_id": subscription_id,
"resource_id": resource_id,
},
)
result["comment"].append(
f"Would create azure.network.security_rules '{name}'"
)
return result
else:
# PUT operation to create a resource
payload = hub.tool.azure.network.security_rules.convert_present_to_raw_security_rules(
priority=priority,
direction=direction,
access=access,
protocol=protocol,
description=description,
source_port_range=source_port_range,
source_port_ranges=source_port_ranges,
source_address_prefix=source_address_prefix,
source_address_prefixes=source_address_prefixes,
source_application_security_groups=source_application_security_groups,
destination_port_range=destination_port_range,
destination_port_ranges=destination_port_ranges,
destination_address_prefix=destination_address_prefix,
destination_address_prefixes=destination_address_prefixes,
destination_application_security_groups=destination_application_security_groups,
)
response_put = await hub.exec.request.json.put(
ctx,
url=f"{ctx.acct.endpoint_url}/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Network/networkSecurityGroups/{network_security_group_name}/securityRules/{security_rule_name}?api-version=2022-07-01",
success_codes=[200, 201],
json=payload,
)
if not response_put["result"]:
hub.log.debug(
f"Could not create azure.network.security_rules {response_put['comment']} {response_put['ret']}"
)
result["comment"].extend(
hub.tool.azure.result_utils.extract_error_comments(response_put)
)
result["result"] = False
return result
result[
"new_state"
] = hub.tool.azure.network.security_rules.convert_raw_security_rules_to_present(
resource=response_put["ret"], idem_resource_name=name
)
result["comment"].append(f"Created azure.network.security_rules '{name}'")
return result
else:
existing_resource = response_get["ret"]
result[
"old_state"
] = hub.tool.azure.network.security_rules.convert_raw_security_rules_to_present(
resource=existing_resource, idem_resource_name=name
)
# Generate a new PUT operation payload with new values
new_payload = hub.tool.azure.network.security_rules.update_security_rules_payload(
response_get["ret"],
{
"priority": priority,
"direction": direction,
"access": access,
"protocol": protocol,
"description": description,
"source_port_range": source_port_range,
"source_port_ranges": source_port_ranges,
"source_address_prefix": source_address_prefix,
"source_address_prefixes": source_address_prefixes,
"source_application_security_groups": source_application_security_groups,
"destination_port_range": destination_port_range,
"destination_port_ranges": destination_port_ranges,
"destination_address_prefix": destination_address_prefix,
"destination_address_prefixes": destination_address_prefixes,
"destination_application_security_groups": destination_application_security_groups,
},
)
if new_payload["ret"] is None:
result["new_state"] = copy.deepcopy(result["old_state"])
result["comment"].append(
f"azure.network.security_rules '{name}' has no property need to be updated."
)
return result
else:
if ctx.get("test", False):
result[
"new_state"
] = hub.tool.azure.network.security_rules.convert_raw_security_rules_to_present(
resource=new_payload["ret"], idem_resource_name=name
)
result["comment"].append(
f"Would update azure.network.security_rules '{name}'"
)
return result
else:
# PUT call to update the resource
response_put = await hub.exec.request.json.put(
ctx,
url=f"{ctx.acct.endpoint_url}/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Network/networkSecurityGroups/{network_security_group_name}/securityRules/{security_rule_name}?api-version=2022-07-01",
success_codes=[200, 201],
json=new_payload["ret"],
)
if not response_put["result"]:
hub.log.debug(
f"Could not update azure.network.security_rules {response_put['comment']} {response_put['ret']}"
)
result["result"] = False
result["comment"].extend(
hub.tool.azure.result_utils.extract_error_comments(response_put)
)
return result
result[
"new_state"
] = hub.tool.azure.network.security_rules.convert_raw_security_rules_to_present(
resource=response_put["ret"], idem_resource_name=name
)
result["comment"].append(
f"Updated azure.network.security_rules '{name}'"
)
return result
[docs]async def absent(
hub,
ctx,
name: str,
resource_group_name: str,
network_security_group_name: str,
security_rule_name: str,
subscription_id: str = None,
) -> Dict[str, Any]:
r"""Delete Security Rule.
Args:
name(str): The identifier for this state.
resource_group_name(str): The name of the resource group.
network_security_group_name(str): The name of the network security group.
security_rule_name(str): The name of the security rule.
subscription_id(str, Optional): Subscription Unique id.
Returns:
Dict[str, Any]
Examples:
.. code-block:: sls
resource_is_absent:
azure.network.security_rules.absent:
- name: value
- resource_group_name: value
- network_security_group_name: value
- security_rule_name: value
"""
result = dict(name=name, result=True, comment=[], old_state=None, new_state=None)
if subscription_id is None:
subscription_id = ctx.acct.subscription_id
resource_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Network/networkSecurityGroups/{network_security_group_name}/securityRules/{security_rule_name}"
response_get = await hub.exec.azure.network.security_rules.get(
ctx,
resource_id=resource_id,
)
if not response_get["result"]:
hub.log.debug(
f"Could not get azure.network.security_rules {response_get['comment']} {response_get['ret']}"
)
result["result"] = False
result["comment"].extend(
hub.tool.azure.result_utils.extract_error_comments(response_get)
)
return result
if response_get["ret"]:
result["old_state"] = response_get["ret"]
result["old_state"]["name"] = name
if ctx.get("test", False):
result["comment"].append(
f"Would delete azure.network.security_rules '{name}'"
)
return result
response_delete = await hub.exec.request.raw.delete(
ctx,
url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2022-07-01",
success_codes=[200, 202, 204],
)
if not response_delete["result"]:
hub.log.debug(
f"Could not delete azure.network.security_rules {response_delete['comment']} {response_delete['ret']}"
)
result["result"] = False
result["comment"].extend(
hub.tool.azure.result_utils.extract_error_comments(response_delete)
)
return result
result["comment"].append(f"Deleted azure.network.security_rules '{name}'")
return result
else:
# If Azure returns 'Not Found' error, it means the resource is absent.
result["comment"].append(
f"azure.network.security_rules '{name}' already absent"
)
return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]:
r"""Describe the resource in a way that can be recreated/managed with the corresponding "present" function.
Lists all Security Rules under the same subscription.
Returns:
Dict[str, Any]
Examples:
.. code-block:: bash
$ idem describe azure.network.security_rules
"""
result = {}
ret_list = await hub.exec.azure.network.security_rules.list(ctx)
if not ret_list["ret"]:
hub.log.debug(
f"Could not describe network security_rules {ret_list['comment']}"
)
return result
for resource in ret_list["ret"]:
resource_id = resource["resource_id"]
result[resource_id] = {
"azure.network.security_rules.present": [
{parameter_key: parameter_value}
for parameter_key, parameter_value in resource.items()
]
}
return result