Source code for idem_vault.states.vault.secrets.kv_v1.key

import copy
from typing import Dict

import dict_tools.data

TREQ = {
    "unique": ["present", "absent"],
}

__contracts__ = ["resource"]


[docs]async def present( hub, ctx, name: str, *, resource_id: (str, "alias=path"), key: str, value: str, ) -> Dict: """ Creates or updates a secret key stored with Vault KV_v1 secret engine. Args: name (str): An Idem name of the resource. resource_id (str): The full logical path to write the data. This argument can also be specified using the alias "path." This should be prefixed with 'secret/'. key (str): Key of the secret value(str, Optional): Value of the secret Request Syntax: .. code-block:: sls [vault-secret-name]: vault.secrets.kv_v1.key.present: - resource_id: 'string' # Can also be specified as "path" - key: 'string' - value: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls my-secret: vault.secrets.kv_v1.key.present: - resource_id: secret/test # Can also be specified as "path" - key: my-birthday - value: 2012-10-17 """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": (), } read_ret = await hub.exec.vault.secrets.kv_v1.secret.get(ctx, path=resource_id) data_to_update = {} if not read_ret["result"]: if "InvalidPath" not in str(read_ret["comment"]): result["result"] = False result["comment"] = read_ret["comment"] return result else: data_to_update = dict_tools.data.SafeNamespaceDict(read_ret["ret"]["data"]) if data_to_update.get(key): result["old_state"] = { "name": name, "path": resource_id, "key": key, "value": data_to_update.get(key), } action = "create" if data_to_update.get(key) is None else "update" if ( result["old_state"] is not None and result["old_state"]["key"] == key and result["old_state"]["value"] and result["old_state"]["value"] == value ): result["comment"] = ( f"vault.secrets.kv_v1.key '{name}' nothing to be updated.", ) result["new_state"] = copy.deepcopy(result["old_state"]) return result if ctx.get("test", False): result["new_state"] = { "name": name, "path": resource_id, "key": key, "value": value, } result["comment"] = (f"Would {action} vault.secrets.kv_v1.key '{key}'.",) # calculating changes here to show keys that would be updated result["changes"] = hub.tool.vault.secret.calculate_changes( old_state={"data": result["old_state"]}, new_state={"data": result["new_state"]}, ) return result data_to_update[key] = value write_ret = await hub.exec.hvac.client.secrets.kv.v1.create_or_update_secret( ctx, path=resource_id, secret=data_to_update ) if not write_ret["result"]: result["result"] = False result["comment"] = write_ret["comment"] return result result["new_state"] = { "name": name, "path": resource_id, "key": key, "value": value, } # calculating changes here to show keys that would be updated result["changes"] = hub.tool.vault.secret.calculate_changes( old_state={"data": result["old_state"]}, new_state={"data": result["new_state"]} ) if result["old_state"] is None or "create" == action: result["comment"] = (f"Created vault.secrets.kv_v1.key '{key}'.",) else: result["comment"] = (f"Updated vault.secrets.kv_v1.key '{key}'.",) return result
[docs]async def absent( hub, ctx, name: str, *, resource_id: (str, "alias=path"), key: str, ) -> Dict: """ Deletes a secret key stored with Vault KV_v1 secret engine. Args: name (str): An Idem name of the resource. resource_id (str): The full logical path to write the data. This argument can also be specified using the alias "path." This should be prefixed with 'secret/'. key (str): Secret key to the deleted Request Syntax: .. code-block:: sls [vault-secret-name]: vault.secrets.kv_v1.key.absent: - resource_id: 'string' # Can also be specified as "path" - key: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls my-secret: vault.secrets.kv_v1.key.absent: - resource_id: secret/test # Can also be specified as "path" - key: my-birthday """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": (), } current_data = {} read_ret = await hub.exec.vault.secrets.kv_v1.secret.get(ctx, path=resource_id) if not read_ret["result"]: if "InvalidPath" in str(read_ret["comment"]): result["comment"] = (f"vault.secrets.kv_v1.key '{key}' is already absent.",) else: result["result"] = False result["comment"] = read_ret["comment"] return result current_data = dict_tools.data.SafeNamespaceDict(read_ret["ret"]["data"]) if key not in current_data: result["comment"] = (f"vault.secrets.kv_v1.key '{key}' is already absent.",) return result # "data" is not populated to reduce data exposure. result["old_state"] = {"name": name, "path": resource_id} if key not in current_data: result["comment"] = (f"vault.secrets.kv_v1.key '{key}' is already absent.",) return result if ctx.get("test", False): result["comment"] = (f"Would delete vault.secrets.kv_v1.key '{key}'.",) return result del current_data[key] if not current_data: # Delete path if all keys are removed. Path cannot exist with no keys ret = await hub.exec.hvac.client.secrets.kv.v1.delete_secret( ctx, path=resource_id ) else: ret = await hub.exec.hvac.client.secrets.kv.v1.create_or_update_secret( ctx, path=resource_id, secret=current_data ) if not ret["result"]: result["result"] = False result["comment"] = read_ret["comment"] else: result["comment"] = (f"Deleted vault.secrets.kv_v1.key '{key}'.",) return result
[docs]async def describe(hub, ctx): """ Vault doesn't allow enumeration of keys """ return {}