Source code for idem_azure.states.azure.policy.policy_definitions

"""State module for managing Policy Definition."""
import copy
from typing import Any
from typing import Dict


__contracts__ = ["resource"]


[docs]async def present( hub, ctx, name: str, policy_definition_name: str, policy_type: str, mode: str, subscription_id: str = None, display_name: str = None, description: str = None, policy_rule: Dict = None, metadata: Dict = None, parameters: Dict = None, resource_id: str = None, ) -> Dict: r"""Create or update Policy Definitions. Args: name(str): The identifier for this state. policy_definition_name(str): The name of the policy Definition. policy_type(str): The policy type. Possible values are BuiltIn, Custom and NotSpecified. mode(str): The policy mode that allows you to specify which resource types will be evaluated. Some examples are All, Indexed, Microsoft.KeyVault.Data. subscription_id(str, Optional): Subscription Unique id. display_name(str): The display name of the policy definition. description(str, Optional): The description of the policy definition. policy_rule(dict, Optional): The policy rule for the policy definition. metadata(dict, Optional): The metadata for the policy definition. parameters(dict, Optional): Parameters for the policy definition. resource_id(str, Optional): Policy Definition resource id on Azure Returns: Dict Examples: .. code-block:: sls policy_definition_is_present: azure.policy.policy_definitions.present: - name: value - policy_definition_name: value - subscription_id: value - policy_type: value - mode: value - display_name: value - description: value - metadata: version: 1.0.0 category: RoleDefinitions - parameters: roleDefinitionIds: type: Array metadata: displayName: Approved Role Definitions description: The list of role definition Ids. strongType: roleDefinitionIds - policy_rule: if: allOf: - field: type equals: Microsoft.Authorization/roleAssignments - not: field: Microsoft.Authorization/roleAssignments/roleDefinitionId in: "[parameters('roleDefinitionIds')]" then: effect: deny """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": [], } if subscription_id is None: subscription_id = ctx.acct.subscription_id if resource_id is None: resource_id = f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/policyDefinitions/{policy_definition_name}" response_get = await hub.exec.azure.policy.policy_definitions.get( ctx, resource_id=resource_id, raw=True ) if not response_get["result"]: hub.log.debug( f"Could not get azure.policy.policy_definitions {response_get['comment']} {response_get['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_get) ) return result if not response_get["ret"]: if ctx.get("test", False): # Return a proposed state by Idem state --test result["new_state"] = hub.tool.azure.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "policy_definition_name": policy_definition_name, "subscription_id": subscription_id, "policy_type": policy_type, "mode": mode, "display_name": display_name, "description": description, "policy_rule": policy_rule, "metadata": metadata, "parameters": parameters, "resource_id": resource_id, }, ) result["comment"].append( f"Would create azure.policy.policy_definitions '{name}'" ) return result else: # PUT operation to create a resource payload = hub.tool.azure.policy.policy_definition.convert_present_to_raw_policy_definition( policy_type=policy_type, description=description, display_name=display_name, mode=mode, metadata=metadata, policy_rule=policy_rule, parameters=parameters, ) response_put = await hub.exec.request.json.put( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2021-06-01", success_codes=[200, 201], json=payload, ) if not response_put["result"]: hub.log.debug( f"Could not create Policy Definitions {response_put['comment']} {response_put['ret']}" ) result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_put) ) result["result"] = False return result result[ "new_state" ] = hub.tool.azure.policy.policy_definition.convert_raw_policy_definition_to_present( resource=response_put["ret"], idem_resource_name=name, policy_definition_name=policy_definition_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append( f"Created azure.policy.policy_definitions '{name}'" ) return result else: existing_resource = response_get["ret"] result[ "old_state" ] = hub.tool.azure.policy.policy_definition.convert_raw_policy_definition_to_present( resource=existing_resource, idem_resource_name=name, policy_definition_name=policy_definition_name, resource_id=resource_id, subscription_id=subscription_id, ) # Generate a new PUT operation payload with new values new_payload = ( hub.tool.azure.policy.policy_definition.update_policy_definition_payload( existing_resource, { "policy_type": policy_type, "display_name": display_name, "description": description, "mode": mode, "policy_rule": policy_rule, "metadata": metadata, "parameters": parameters, }, ) ) if ctx.get("test", False): if new_payload["ret"] is None: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"].append( f"azure.policy.policy_definitions '{name}' has no property need to be updated." ) else: result[ "new_state" ] = hub.tool.azure.policy.policy_definition.convert_raw_policy_definition_to_present( resource=new_payload["ret"], idem_resource_name=name, policy_definition_name=policy_definition_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append( f"Would update azure.policy.policy_definitions '{name}'" ) return result # PUT operation to update a resource if new_payload["ret"] is None: result["new_state"] = copy.deepcopy(result["old_state"]) result["comment"].append( f"azure.policy.policy_definitions '{name}' has no property need to be updated." ) return result result["comment"].extend(new_payload["comment"]) response_put = await hub.exec.request.json.put( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2021-06-01", success_codes=[200, 201], json=new_payload["ret"], ) if not response_put["result"]: hub.log.debug( f"Could not update azure.policy.policy_definitions {response_put['comment']} {response_put['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_put) ) return result result[ "new_state" ] = hub.tool.azure.policy.policy_definition.convert_raw_policy_definition_to_present( resource=response_put["ret"], idem_resource_name=name, policy_definition_name=policy_definition_name, resource_id=resource_id, subscription_id=subscription_id, ) result["comment"].append(f"Updated azure.policy.policy_definitions '{name}'") return result
[docs]async def absent( hub, ctx, name: str, policy_definition_name: str, subscription_id: str = None ) -> Dict: r"""Delete Policy Definition. Args: name(str): The identifier for this state. policy_definition_name(str): The name of the policy definition to delete. subscription_id(str, Optional): Subscription Unique id. Returns: Dict Examples: .. code-block:: sls resource_is_absent: azure.policy.policy_definitions.absent: - name: value - policy_definition_name: value - subscription_id: value """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": [], } if subscription_id is None: subscription_id = ctx.acct.subscription_id resource_id = f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/policyDefinitions/{policy_definition_name}" response_get = await hub.exec.azure.policy.policy_definitions.get( ctx, resource_id=resource_id, ) if not response_get["result"]: hub.log.debug( f"Could not get azure.policy.policy_definitions '{name}' {response_get['comment']} {response_get['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_get) ) return result if response_get["ret"]: result["old_state"] = response_get["ret"] result["old_state"]["name"] = name if ctx.get("test", False): result["comment"].append( f"Would delete azure.policy.policy_definitions '{name}'" ) return result response_delete = await hub.exec.request.raw.delete( ctx, url=f"{ctx.acct.endpoint_url}{resource_id}?api-version=2021-06-01", success_codes=[200, 202], ) if not response_delete["result"]: hub.log.debug( f"Could not delete azure.policy.policy_definitions {response_delete['comment']} {response_delete['ret']}" ) result["result"] = False result["comment"].extend( hub.tool.azure.result_utils.extract_error_comments(response_delete) ) return result result["comment"].append(f"Deleted azure.policy.policy_definitions '{name}'") return result else: # If Azure returns 'Not Found' error, it means the resource has been absent. result["comment"].append( f"azure.policy.policy_definitions '{name}' already absent" ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: r"""Describe the resource in a way that can be recreated/managed with the corresponding "present" function. Lists all Policy Definitions under the same subscription. Returns: Dict[str, Any] Examples: .. code-block:: bash $ idem describe azure.policy.policy_definitions """ result = {} ret_list = await hub.exec.azure.policy.policy_definitions.list(ctx) if not ret_list["ret"]: hub.log.debug( f"Could not describe policy policy_definitions {ret_list['comment']}" ) return result for resource in ret_list["ret"]: resource_id = resource["resource_id"] result[resource_id] = { "azure.policy.policy_definitions.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource.items() ] } return result