Source code for idem_aws.exec.aws.cloudfront.cache_policy

"""Exec module for managing Amazon Cloudfront Cache Policy."""

__func_alias__ = {"list_": "list"}

from typing import Dict


[docs]async def get( hub, ctx, name, resource_id: str = None, ) -> Dict: """Gets Cloudfront cache policy from AWS account. Args: name(str): The name of the Idem state. resource_id(str, Optional): AWS cloudfront cache policy id to identify the resource. Returns: Dict[str, Any] Examples: Calling this exec module function from the cli with resource_id .. code-block:: bash idem exec aws.cloudfront.cache_policy.get name="idem_name" resource_id="resource_id" Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.cloudfront.cache_policy.get - kwargs: name: my_resource resource_id: resource_id """ result = dict(comment=[], ret=None, result=True) ret_list = await hub.exec.aws.cloudfront.cache_policy.list(ctx) if not ret_list["result"]: result["comment"] += list(ret_list["comment"]) result["result"] = False hub.log.debug(f"Could not list cache policies {ret_list['comment']}") return result if not ret_list.get("ret"): result["comment"] += ["No cache policy present"] return result for policy in ret_list["ret"]: if (resource_id and policy.get("resource_id") == resource_id) or policy.get( "name" ) == name: result["ret"] = policy break if not result["ret"]: result["comment"] += [f"Cache policy '{name}' not present"] return result
[docs]async def list_( hub, ctx, ) -> Dict: """Lists all Cloudfront cache policies. Returns: Dict[str, Any] Examples: Calling this exec module function from the cli with resource_id .. code-block:: bash idem exec aws.cloudfront.cache_policy.list Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.cloudfront.cache_policy.get """ result = dict(comment=[], ret=[], result=True) ret_list = await hub.exec.boto3.client.cloudfront.list_cache_policies(ctx) if not ret_list["result"]: result["comment"] += list(ret_list["comment"]) result["result"] = False hub.log.debug(f"Could not list cache policies {ret_list['comment']}") return result if not ret_list.get("ret"): result["comment"] += ["No cache policy present"] return result items = ret_list["ret"]["CachePolicyList"]["Items"] for item in items: policy_id = item["CachePolicy"]["Id"] name = item["CachePolicy"]["CachePolicyConfig"]["Name"] ret = await hub.exec.boto3.client.cloudfront.get_cache_policy(ctx, Id=policy_id) if not ret["result"]: if "NoSuchCachePolicy" in str(ret["comment"]): result["comment"].append( hub.tool.aws.comment_utils.get_empty_comment( resource_type="aws.cloudfront.cache_policy", name=name ) ) result["comment"] += list(ret["comment"]) result["result"] = False continue if not ret["ret"]["CachePolicy"]: result["comment"].append( hub.tool.aws.comment_utils.get_empty_comment( resource_type="aws.cloudfront.cache_policy", name=name ) ) continue ret["ret"]["CachePolicy"]["ETag"] = ret["ret"]["ETag"] result["ret"].append( hub.tool.aws.cloudfront.conversion_utils.convert_raw_cache_policy_to_present( ctx, raw_resource=ret["ret"]["CachePolicy"], idem_resource_name=name ) ) return result