Source code for idem_vault.states.vault.secrets.kv_v1.secret

import copy
import json
from json.decoder import JSONDecodeError
from typing import Dict


__contracts__ = ["resource"]

[docs]async def present( hub, ctx, name: str, *, resource_id: (str, "alias=path"), data: str or Dict, disable_read: bool = False, ) -> Dict: """ Creates or updates a secret stored with Vault KV_v1 secret engine. Args: name (str): An Idem name of the resource. resource_id (str): The full logical path to write the data. This argument can also be specified using the alias "path." This should be prefixed with 'secret/'. data(str or dict, Optional): Data to be written in the format of a JSON string or a JSON object that can be stringfied. disable_read (bool, Optional): Set this field to True if the vault authentication does not have read access. However, if the value is True, this Idem state operation is not idempotent, and Idem state comment output will always assume it is a "create" operation. Defaults to False. Request Syntax: .. code-block:: sls [vault-secret-name]: vault.secrets.kv_v1.secret.present: - resource_id: 'string' # Can also be specified as "path" - data: 'string or dict' - disable_read: 'boolean' Returns: Dict[str, Any] Examples: .. code-block:: sls my-secret: vault.secrets.kv_v1.secret.present: - resource_id: secret/test # Can also be specified as "path" - data: '{"my-birthday": "2012-10-17"}' """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": (), } if isinstance(data, str): try: data = json.loads(data) except JSONDecodeError as e: result[ "comment" ] = f"Fail to load data {data} as json object: {e.__class__.__name__}: {e}" result["result"] = False return result # data is converted to SafeNamespaceDict to avoid it being converted to string and printed to console. data = if not disable_read: read_ret = await hub.exec.vault.secrets.kv_v1.secret.get(ctx, path=resource_id) if not read_ret["result"]: if "InvalidPath" not in str(read_ret["comment"]): result["result"] = False result["comment"] = read_ret["comment"] return result else: result["old_state"] = { "name": name, "path": resource_id, "data":["ret"]["data"]), } else: hub.log.debug(f"vault.secrets.kv_v1.secret '{name}' read has been disabled.") result["comment"] = ( f"vault.secrets.kv_v1.secret '{name}' read has been disabled.", ) if (result["old_state"] is not None) and result["old_state"]["data"] == data: result["comment"] = result["comment"] + ( f"vault.secrets.kv_v1.secret '{name}' has no property need to be updated.", ) result["new_state"] = copy.deepcopy(result["old_state"]) return result elif result["old_state"] is None: if ctx.get("test", False): result["comment"] = (f"Would create vault.secrets.kv_v1.secret '{name}'.",) result["new_state"] = {"name": name, "path": resource_id, "data": data} # calculating changes here to show keys that would be updated result["changes"] = hub.tool.vault.secret.calculate_changes( old_state=result["old_state"], new_state=result["new_state"] ) return result else: if ctx.get("test", False): result["comment"] = (f"Would update vault.secrets.kv_v1.secret '{name}'.",) result["new_state"] = {"name": name, "path": resource_id, "data": data} # calculating changes here to show keys that would be updated result["changes"] = hub.tool.vault.secret.calculate_changes( old_state=result["old_state"], new_state=result["new_state"] ) return result write_ret = await hub.exec.hvac.client.secrets.kv.v1.create_or_update_secret( ctx, path=resource_id, secret=data ) if not write_ret["result"]: result["result"] = False result["comment"] = write_ret["comment"] return result result["new_state"] = {"name": name, "path": resource_id, "data": data} # calculating changes here to show keys that are updated result["changes"] = hub.tool.vault.secret.calculate_changes( old_state=result["old_state"], new_state=result["new_state"] ) if result["old_state"] is None: result["comment"] = (f"Created vault.secrets.kv_v1.secret '{name}'.",) else: result["comment"] = (f"Updated vault.secrets.kv_v1.secret '{name}'.",) return result
[docs]async def absent( hub, ctx, name: str, *, resource_id: (str, "alias=path"), ) -> Dict: """ Deletes a secret stored with Vault KV_v1 secret engine. Args: name (str): An Idem name of the resource. resource_id (str): The full logical path to write the data. This argument can also be specified using the alias "path." This should be prefixed with 'secret/'. Request Syntax: .. code-block:: sls [vault-secret-name]: vault.secrets.kv_v1.secret.absent: - resource_id: 'string' # Can also be specified as "path" Returns: Dict[str, Any] Examples: .. code-block:: sls my-secret: vault.secrets.kv_v1.secret.absent: - resource_id: secret/test # an also be specified as "path" """ result = { "name": name, "result": True, "old_state": None, "new_state": None, "comment": (), } read_ret = await hub.exec.vault.secrets.kv_v1.secret.get(ctx, path=resource_id) if not read_ret["result"]: if "InvalidPath" in str(read_ret["comment"]): result["comment"] = ( f"vault.secrets.kv_v1.secret '{name}' is already absent.", ) else: result["result"] = False result["comment"] = read_ret["comment"] return result # "data" is not populated to reduce data exposure. result["old_state"] = {"name": name, "path": resource_id} if ctx.get("test", False): result["comment"] = (f"Would delete vault.secrets.kv_v1.secret '{name}'.",) return result delete_ret = await hub.exec.hvac.client.secrets.kv.v1.delete_secret( ctx, path=resource_id ) if not delete_ret["result"]: result["result"] = False result["comment"] = read_ret["comment"] else: result["comment"] = (f"Deleted vault.secrets.kv_v1.secret '{name}'.",) return result
[docs]async def describe(hub, ctx): """ Vault doesn't allow enumeration of secrets """ return {}