Source code for idem_azure.states.azure.network.firewall

"""States module for managing Firewall."""
import copy
from dataclasses import field
from dataclasses import make_dataclass
from typing import Any
from typing import Dict
from typing import List

__contracts__ = ["resource"]
__reconcile_wait__ = {"static": {"wait_in_seconds": 20}}


[docs]async def present( hub, ctx, name: str, location: str, resource_group_name: str, firewall_name: str, subscription_id: str = None, tags: Dict = None, resource_id: str = None, zones: List[str] = None, sku: make_dataclass( "sku", [("name", str, field(default=None)), ("tier", str, field(default=None))], ) = None, firewall_policy_id: str = None, ip_configuration: List[ make_dataclass( "ipConfiguration", [ ("name", str, field(default=None)), ("subnet_id", str, field(default=None)), ("public_ip_address_id", str, field(default=None)), ], ) ] = None, management_ip_configuration: make_dataclass( "managementIpConfiguration", [ ("name", str, field(default=None)), ("subnet_id", str, field(default=None)), ("public_ip_address_id", str, field(default=None)), ], ) = None, ) -> Dict: r"""Create or update firewall. Args: name(str): The identifier for this state. location(str): Resource location. Changing this forces a new resource to be created. resource_group_name(str): The name of the resource group. subscription_id(str, Optional): Subscription Unique id. resource_id(str, Optional): firewall resource id on Azure tags(dict[str, str], Optional): Resource tags. firewall_name(str): The name of the firewall. zones(list[str]): A list of availability zones denoting where the resource needs to come from. sku(dict[str, Any], Optional): The SKU of the Firewall. * name(str): SKU name of the Firewall. Possible values are AZFW_Hub and AZFW_VNet. Changing this forces a new resource to be created. * tier(str): SKU tier of the Firewall. Possible values are Premium, Standard and Basic. firewall_policy_id(str): The ID of the Firewall Policy applied to this Firewall. ip_configuration(list[dict[str, Any]], Optional): IP configuration of the Firewall. * name(str): SKU name of the Firewall. Possible values are AZFW_Hub and AZFW_VNet. Changing this forces a new resource to be created. * subnet_id(str): SKU tier of the Firewall. Possible values are Premium, Standard and Basic. * public_ip_address_id(str): SKU tier of the Firewall. Possible values are Premium, Standard and Basic. management_ip_configuration(dict[str, Any], Optional): Management IP configuration of the Firewall. * name(str): Specifies the name of the IP Configuration. * subnet_id(str): Reference to the subnet associated with the IP Configuration. Changing this forces a new resource to be created. * public_ip_address_id(str): The ID of the Public IP Address associated with the firewall. Returns: Dict Examples: .. code-block:: sls resource_is_present: azure.network.firewall.present: - name: my_firewall - subscription_id: my_sub_id - resource_group_name: my_rg-1 - firewall_name: my-firewall - location: eastus - tags: key: valuer - zones: - 1 - sku: name: AZFW_VNet tier: Premium - firewall_policy_id: my_fp_id - ip_configuration: - name: name subnet_id: my_sub_id public_ip_address_id: my_public_ip_address - management_ip_configuration: name: name subnet_id: my_AzureFirewallManagementSubnet_subnet public_ip_address_id: my_management_public_ip_addess """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": [], } if subscription_id is None: subscription_id = ctx.acct.subscription_id if resource_id is None: resource_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Network/azureFirewalls/{firewall_name}" response_get = await hub.exec.azure.network.firewall.get( ctx, resource_id=resource_id, raw=True ) if response_get["result"]: if response_get["ret"] is None: if ctx.get("test", False): # Return a proposed state by Idem state --test result[ "new_state" ] = hub.tool.azure.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_group_name": resource_group_name, "subscription_id": subscription_id, "firewall_name": firewall_name, "tags": tags, "location": location, "resource_id": resource_id, "zones": zones, "sku": sku, "firewall_policy_id": firewall_policy_id, "ip_configuration": ip_configuration, "management_ip_configuration": management_ip_configuration, }, ) result["comment"].append( f"Would create azure.network.firewall '{name}'" ) return result else: # PUT operation to create a resource payload = ( hub.tool.azure.network.firewall.convert_present_to_raw_firewall( location=location, tags=tags, zones=zones, sku=sku, firewall_policy_id=firewall_policy_id, ip_configuration=ip_configuration, management_ip_configuration=management_ip_configuration, ) ) response_put = await hub.exec.request.json.put( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2022-07-01", success_codes=[200, 201], json=payload, ) if not response_put["result"]: hub.log.debug( f"Could not create firewall {response_put['comment']} {response_put['ret']}" ) result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_put) ) result["result"] = False return result result[ "new_state" ] = hub.tool.azure.network.firewall.convert_raw_firewall_to_present( resource=response_put["ret"], idem_resource_name=name, resource_group_name=resource_group_name, firewall_name=firewall_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append(f"Created azure.network.firewall '{name}'") return result else: existing_resource = response_get["ret"] result[ "old_state" ] = hub.tool.azure.network.firewall.convert_raw_firewall_to_present( resource=existing_resource, idem_resource_name=name, resource_group_name=resource_group_name, firewall_name=firewall_name, resource_id=resource_id, subscription_id=subscription_id, ) # Generate a new PUT operation payload with new values new_payload = hub.tool.azure.network.firewall.update_firewall_payload( existing_resource, {"tags": tags} ) if ctx.get("test", False): if new_payload["ret"] is None: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"].append( f"azure.network.firewall '{name}' doesn't need to be updated." ) else: result[ "new_state" ] = hub.tool.azure.network.firewall.convert_raw_firewall_to_present( resource=new_payload["ret"], idem_resource_name=name, resource_group_name=resource_group_name, firewall_name=firewall_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append( f"Would update azure.network.firewall '{name}'" ) return result # PUT operation to update a resource if new_payload["ret"] is None: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"].append( f"azure.network.firewall '{name}' doesn't need to be updated." ) return result result["comment"].extend(new_payload["comment"]) response_put = await hub.exec.request.json.put( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2021-06-01", success_codes=[200, 201], json=new_payload["ret"], ) if not response_put["result"]: hub.log.debug( f"Could not update azure.network.firewall {response_put['comment']} {response_put['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_put) ) return result result[ "new_state" ] = hub.tool.azure.network.firewall.convert_raw_firewall_to_present( resource=response_put["ret"], idem_resource_name=name, resource_group_name=resource_group_name, firewall_name=firewall_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append(f"Updated azure.network.firewall '{name}'") return result else: hub.log.debug( f"Could not get azure.network.firewall {response_get['comment']} {response_get['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_get) ) return result
[docs]async def absent( hub, ctx, name: str, resource_group_name: str, firewall_name: str, subscription_id: str = None, ) -> dict: r"""Delete a firewall. Args: name(str): The identifier for this state. resource_group_name(str): The name of the resource group. firewall_name(str): The name of the firewall. subscription_id(str, Optional): Subscription Unique id. Returns: Dict Examples: .. code-block:: sls resource_is_absent: azure.network.firewall.absent: - name: my-fp - subscription_id: my-subscription - resource_group_name: my-resource-group - firewall_name: my-fp """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": [], } if subscription_id is None: subscription_id = ctx.acct.subscription_id resource_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Network/azureFirewalls/{firewall_name}" response_get = await hub.exec.azure.network.firewall.get( ctx, resource_id=resource_id, raw=True ) if response_get["result"]: if response_get["ret"]: result[ "old_state" ] = hub.tool.azure.network.firewall.convert_raw_firewall_to_present( resource=response_get["ret"], idem_resource_name=name, resource_group_name=resource_group_name, firewall_name=firewall_name, resource_id=resource_id, subscription_id=subscription_id, ) if ctx.get("test", False): result["comment"].append( f"Would delete azure.network.firewall '{firewall_name}'" ) return result if ( response_get["ret"]["properties"] and response_get["ret"]["properties"]["provisioningState"] != "Deleting" ): response_delete = await hub.exec.request.raw.delete( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2022-07-01", success_codes=[200, 202], ) if not response_delete["result"]: hub.log.debug( f"Could not delete azure.network.firewall {response_delete['comment']} {response_delete['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments( response_delete ) ) return result result["comment"].append( f"Deleted azure.network.firewall '{firewall_name}'" ) return result else: # If Azure returns 'Not Found' error, it means the resource has been absent. result["comment"].append( f"azure.network.firewall '{firewall_name}' already absent" ) return result else: hub.log.debug( f"Could not azure.network.firewall '{name}' {response_get['comment']} {response_get['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_get) ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: r"""Describe the resource in a way that can be recreated/managed with the corresponding "present" function. Lists all firewall under the same subscription. Returns: Dict[str, Any] Examples: .. code-block:: bash $ idem describe azure.network.firewall """ result = {} ret_list = await hub.exec.azure.network.firewall.list(ctx) if not ret_list["ret"]: hub.log.debug(f"Could not describe network firewall {ret_list['comment']}") return result for resource in ret_list["ret"]: resource_id = resource["resource_id"] result[resource_id] = { "azure.network.firewall.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource.items() ] } return result