Source code for idem_azure.states.azure.key_vault.vault

"""States module for managing Key Vault."""
import copy
from dataclasses import field
from dataclasses import make_dataclass
from typing import Any
from typing import Dict
from typing import List

__contracts__ = ["resource"]
__reconcile_wait__ = {"static": {"wait_in_seconds": 20}}


[docs]async def present( hub, ctx, name: str, location: str, resource_group_name: str, vault_name: str, sku: make_dataclass( "sku", [("name", str, field(default=None)), ("family", str, field(default=None))], ) = None, tenant_id: str = None, soft_delete_retention_days: int = None, subscription_id: str = None, resource_id: str = None, tags: Dict = None, enabled_for_deployment: bool = None, enabled_for_disk_encryption: bool = None, enabled_for_template_deployment: bool = None, enable_rbac_authorization: bool = None, public_network_access_enabled: str = None, purge_protection_enabled: bool = None, access_policies: List[ make_dataclass( "accessPolicies", [ ("tenant_id", str, field(default=None)), ("object_id", str, field(default=None)), ("key_permissions", List[str], field(default=None)), ("secret_permissions", List[str], field(default=None)), ("storage_permissions", List[str], field(default=None)), ("certificate_permissions", List[str], field(default=None)), ], ) ] = None, network_acls: make_dataclass( "networkAcls", [ ("bypass", str, field(default=None)), ("default_action", str, field(default=None)), ("ip_rules", List[str], field(default=None)), ("virtual_network_subnet_ids", List[str], field(default=None)), ], ) = None, ) -> Dict: r"""Create or update key vault. Args: name(str): The identifier for this state. location(str): Resource location. Changing this forces a new resource to be created. resource_group_name(str): The name of the resource group. vault_name(str): The name of the key vault. subscription_id(str, Optional): Subscription Unique id. resource_id(str, Optional): Key vault resource id on Azure sku(Dict, Optional): The SKU of the key vault. tags(Dict, Optional): Resource tags. tenant_id(str, Optional): Tenant id of azure account. enabled_for_deployment(bool, Optional): Boolean flag to specify whether Azure Virtual Machines are permitted to retrieve certificates stored as secrets from the key vault. enabled_for_disk_encryption(bool, Optional): Boolean flag to specify whether Azure Disk Encryption is permitted to retrieve secrets from the vault and unwrap keys. enabled_for_template_deployment(bool, Optional): Boolean flag to specify whether Azure Resource Manager is permitted to retrieve secrets from the key vault. enable_rbac_authorization(bool, Optional): Boolean flag to specify whether Azure Key Vault uses Role Based Access Control (RBAC) for authorization of data actions. public_network_access_enabled(str, Optional): Whether public network access is allowed for this Key Vault. Defaults to true soft_delete_retention_days(int, Optional): The number of days that items should be retained for once soft-deleted. This value can be between 7 and 90 (the default) days. purge_protection_enabled(bool, Optional): Is Purge Protection enabled for this Key Vault? access_policies(List[Dict[str, Any]], Optional): Key vault access policies. network_acls(Dict[str, Any], Optional): Key vault Network Acl. Returns: Dict Examples: .. code-block:: sls resource_is_present: test-azure-key-vault: azure.key_vault.vault.present: - name: test-key-vault - vault_name: test-key-vault - resource_group_name: azure-resource-group - location: eastus - sku: family: A name: Premium - soft_delete_retention_days: 8 - tags: test: test-Ashu - enabled_for_deployment: false - enabled_for_disk_encryption: true - enabled_for_template_deployment: true - enable_rbac_authorization: true - public_network_access_enabled: Enabled - purge_protection_enabled: true - access_policies: - object_id: 00000000-0000-0000-0000-000000000000 certificate_permissions: - Get - List - Update - Create - Import - Delete - Recover - Backup - Restore - ManageContacts - ManageIssuers - GetIssuers - ListIssuers - SetIssuers - DeleteIssuers key_permissions: - Get - List - Update - Create - Import - Delete - Recover - Backup - Restore - GetRotationPolicy - SetRotationPolicy - Rotate secret_permissions: - Get - List - Set - Delete - Recover - Backup - Restore tenant_id: 00000000-0000-0000-0000-000000000000 - network_acls: bypass: AzureServices default_action: Deny ip_rules: [] virtual_network_subnet_ids: [] """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": [], } if subscription_id is None: subscription_id = ctx.acct.subscription_id if tenant_id is None: tenant_id = ctx.acct.tenant if resource_id is None: resource_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.KeyVault/vaults/{vault_name}" response_get = await hub.exec.azure.key_vault.vault.get( ctx, resource_id=resource_id, raw=True ) if response_get["result"]: if response_get["ret"] is None: if ctx.get("test", False): # Return a proposed state by Idem state --test result[ "new_state" ] = hub.tool.azure.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_group_name": resource_group_name, "subscription_id": subscription_id, "location": location, "resource_id": resource_id, "vault_name": vault_name, "sku": sku, "soft_delete_retention_days": soft_delete_retention_days, "purge_protection_enabled": purge_protection_enabled, "tenant_id": tenant_id, "access_policies": access_policies, "network_acls": network_acls, "enabled_for_deployment": enabled_for_deployment, "enabled_for_disk_encryption": enabled_for_disk_encryption, "enabled_for_template_deployment": enabled_for_template_deployment, "enable_rbac_authorization": enable_rbac_authorization, "public_network_access_enabled": public_network_access_enabled, "tags": tags, }, ) result["comment"].append(f"Would create azure.key_vault.vault '{name}'") return result else: # PUT operation to create a resource payload = hub.tool.azure.key_vault.vault.convert_present_to_key_vault( location=location, tags=tags, sku=sku, tenant_id=tenant_id, enabled_for_deployment=enabled_for_deployment, enabled_for_disk_encryption=enabled_for_disk_encryption, enabled_for_template_deployment=enabled_for_template_deployment, enable_rbac_authorization=enable_rbac_authorization, public_network_access_enabled=public_network_access_enabled, soft_delete_retention_days=soft_delete_retention_days, purge_protection_enabled=purge_protection_enabled, access_policies=access_policies, network_acls=network_acls, ) response_put = await hub.exec.request.json.put( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2022-07-01", success_codes=[200, 201], json=payload, ) if not response_put["result"]: hub.log.debug( f"Could not create key vault {response_put['comment']} {response_put['ret']}" ) result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_put) ) result["result"] = False return result result[ "new_state" ] = hub.tool.azure.key_vault.vault.convert_key_vault_to_present( resource=response_put["ret"], idem_resource_name=name, resource_group_name=resource_group_name, vault_name=vault_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append(f"Created azure.key_vault.vault '{name}'") return result else: existing_resource = response_get["ret"] result[ "old_state" ] = hub.tool.azure.key_vault.vault.convert_key_vault_to_present( resource=existing_resource, idem_resource_name=name, resource_group_name=resource_group_name, vault_name=vault_name, resource_id=resource_id, subscription_id=subscription_id, ) # Generate a new PUT operation payload with new values new_payload = hub.tool.azure.key_vault.vault.update_key_vault_payload( existing_resource, {"tags": tags} ) if ctx.get("test", False): if new_payload["ret"] is None: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"].append( f"azure.key_vault.vault '{name}' doesn't need to be updated." ) else: result[ "new_state" ] = hub.tool.azure.key_vault.vault.convert_key_vault_to_present( resource=new_payload["ret"], idem_resource_name=name, resource_group_name=resource_group_name, vault_name=vault_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append( f"Would update azure.key_vault.vault '{name}'" ) return result # PUT operation to update a resource if new_payload["ret"] is None: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"].append( f"azure.key_vault.vault '{name}' doesn't need to be updated." ) return result result["comment"].extend(new_payload["comment"]) response_put = await hub.exec.request.json.put( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2022-07-01", success_codes=[200, 201], json=new_payload["ret"], ) if not response_put["result"]: hub.log.debug( f"Could not update azure.key_vault.vault {response_put['comment']} {response_put['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_put) ) return result result[ "new_state" ] = hub.tool.azure.key_vault.vault.convert_key_vault_to_present( resource=response_put["ret"], idem_resource_name=name, resource_group_name=resource_group_name, vault_name=vault_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append(f"Updated azure.key_vault.vault '{name}'") return result else: hub.log.debug( f"Could not get azure.key_vault.vault {response_get['comment']} {response_get['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_get) ) return result
[docs]async def absent( hub, ctx, name: str, resource_group_name: str, vault_name: str, subscription_id: str = None, ) -> dict: r"""Delete a key vault. Args: name(str): The identifier for this state. resource_group_name(str): The name of the resource group. vault_name(str): The name of the key vault. subscription_id(str, Optional): Subscription Unique id. Returns: Dict Examples: .. code-block:: sls resource_is_absent: azure.key_vault.vault.absent: - name: my-kv - subscription_id: my-subscription - resource_group_name: my-resource-group - vault_name: my-fp """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": [], } if subscription_id is None: subscription_id = ctx.acct.subscription_id resource_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.KeyVault/vaults/{vault_name}" response_get = await hub.exec.azure.key_vault.vault.get( ctx, resource_id=resource_id, raw=True ) if response_get["result"]: if response_get["ret"]: result[ "old_state" ] = hub.tool.azure.key_vault.vault.convert_key_vault_to_present( resource=response_get["ret"], idem_resource_name=name, resource_group_name=resource_group_name, vault_name=vault_name, resource_id=resource_id, subscription_id=subscription_id, ) if ctx.get("test", False): result["comment"].append( f"Would delete azure.key_vault.vault '{vault_name}'" ) return result response_delete = await hub.exec.request.raw.delete( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2022-07-01", success_codes=[200, 202], ) if not response_delete["result"]: hub.log.debug( f"Could not delete azure.key_vault.vault {response_delete['comment']} {response_delete['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_delete) ) return result result["comment"].append(f"Deleted azure.key_vault.vault '{vault_name}'") return result else: # If Azure returns 'Not Found' error, it means the resource has been absent. result["comment"].append( f"azure.key_vault.vault '{vault_name}' already absent" ) return result else: hub.log.debug( f"Could not azure.key_vault.vault '{name}' {response_get['comment']} {response_get['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_get) ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: r"""Describe the resource in a way that can be recreated/managed with the corresponding "present" function. Lists all key vault under the same subscription. Returns: Dict[str, Any] Examples: .. code-block:: bash $ idem describe azure.key_vault.vault """ result = {} ret_list = await hub.exec.azure.key_vault.vault.list(ctx) if not ret_list["ret"]: hub.log.debug(f"Could not describe key_vault vault {ret_list['comment']}") return result for resource in ret_list["ret"]: resource_id = resource["resource_id"] result[resource_id] = { "azure.key_vault.vault.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource.items() ] } return result