Source code for idem_aws.states.aws.sns.topic

"""State module for managing SNS Topics."""
import copy
from dataclasses import field
from dataclasses import make_dataclass
from typing import Any
from typing import Dict
from typing import List

__contracts__ = ["resource"]


[docs]async def present( hub, ctx, name: str, resource_id: str = None, attributes: Dict[str, str] = None, tags: Dict[str, Any] or List[ make_dataclass("Tag", [("Key", str), ("Value", str, field(default=None))]) ] = None, ) -> Dict[str, Any]: """Creates a topic to which notifications can be published. Users can create at most 100,000 standard topics (at most 1,000 FIFO topics). For more information, see Creating an Amazon SNS topic in the Amazon SNS Developer Guide. This action is idempotent, so if the requester already owns a topic with the specified name, that topic's ARN is returned without creating a new topic. Args: name(str): The idem name for the resource resource_id(str, Optional): The AWS resource identifier, here it is resource arn attributes(dict, Optional): Attributes of topic tags(dict or list, Optional): Dict in the format of ``{tag-key: tag-value}`` or list of tags in the format of ``[{"Key": tag-key, "Value": tag-value}]`` to associate with the topic. Each tag consists of a key name and an associated value. Defaults to None. * Key (str): The key of the tag. Constraints: Tag keys are case-sensitive and accept a maximum of 127 Unicode characters. May not begin with aws:. * Value(str, Optional): The value of the tag. Constraints: Tag values are case-sensitive and accept a maximum of 256 Unicode characters. Request Syntax: .. code-block:: yaml [topic-name]: aws.sns.topic.present: - resource_id: 'string' - attributes: 'Dict' - tags: - Key: 'string' Value: 'string' Returns: dict[str, Any] Examples: .. code-block:: yaml new-topic: aws.sns.topic.present: - attributes: DeliveryPolicy: '{"http":{"defaultHealthyRetryPolicy":{"minDelayTarget":10,"maxDelayTarget":30,"numRetries":10,"numMaxDelayRetries":7,"numNoDelayRetries":0,"numMinDelayRetries":3,"backoffFunction":"linear"}}}' - tags: - Key: Name Value: new-topic """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) before = None resource_updated = False plan_state = None if resource_id: resource = await hub.tool.boto3.resource.create( ctx, "sns", "Topic", arn=resource_id ) before = await hub.tool.boto3.resource.describe(resource) if isinstance(tags, List): tags = hub.tool.aws.tag_utils.convert_tag_list_to_dict(tags) if before: try: ret_tags = await hub.exec.boto3.client.sns.list_tags_for_resource( ctx, ResourceArn=resource_id ) if not ret_tags["result"]: result["comment"] = ret_tags["comment"] result["result"] = False return result result[ "old_state" ] = hub.tool.aws.sns.conversion_utils.convert_raw_topic_to_present( raw_resource=before, raw_resource_tags=ret_tags, idem_resource_name=name ) plan_state = copy.deepcopy(result["old_state"]) old_attributes = plan_state.get("attributes") old_tags = plan_state.get("tags") if tags is not None and old_tags != tags: # Check and update tags update_tags_ret = await hub.tool.aws.sns.sns_utils.update_tags( ctx=ctx, resource_arn=resource_id, old_tags=old_tags, new_tags=tags, ) if not update_tags_ret["result"]: result["comment"] = update_tags_ret["comment"] result["result"] = False return result resource_updated = bool(update_tags_ret["ret"]) result["comment"] = update_tags_ret["comment"] if ctx.get("test", False) and update_tags_ret["ret"]: plan_state["tags"] = update_tags_ret["ret"] if attributes is not None: # Check and update Attributes update_attributes_ret = ( await hub.tool.aws.sns.sns_utils.update_topic_attributes( ctx=ctx, resource_arn=resource_id, old_attributes=old_attributes, new_attributes=attributes, ) ) if not update_attributes_ret["result"]: result["comment"] = update_attributes_ret["comment"] result["result"] = False return result resource_updated = resource_updated or bool( update_attributes_ret["ret"] ) result["comment"] += update_attributes_ret["comment"] if ctx.get("test", False) and update_attributes_ret["ret"]: plan_state["attributes"] = update_attributes_ret["ret"].get( "updated_attributes" ) if not resource_updated: result["comment"] += [f"aws.sns.topic '{name}' already exists"] except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] result["result"] = False else: if ctx.get("test", False): result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "attributes": attributes, "tags": tags, }, ) result["comment"] = hub.tool.aws.comment_utils.would_create_comment( resource_type="aws.sns.topic", name=name ) return result try: ret = await hub.exec.boto3.client.sns.create_topic( ctx, Name=name, Attributes=attributes, Tags=hub.tool.aws.tag_utils.convert_tag_dict_to_list(tags) if tags else None, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result resource_id = ret["ret"].get("TopicArn") result["comment"] = hub.tool.aws.comment_utils.create_comment( "aws.sns.topic", name ) except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] result["result"] = False try: if ctx.get("test", False): result["new_state"] = plan_state elif (not before) or resource_updated: resource = await hub.tool.boto3.resource.create( ctx, "sns", "Topic", arn=resource_id ) after = await hub.tool.boto3.resource.describe(resource) ret_tags = await hub.exec.boto3.client.sns.list_tags_for_resource( ctx, ResourceArn=resource_id ) if not ret_tags["result"]: result["comment"] = ret_tags["comment"] result["result"] = False return result result[ "new_state" ] = hub.tool.aws.sns.conversion_utils.convert_raw_topic_to_present( raw_resource=after, raw_resource_tags=ret_tags, idem_resource_name=name ) else: result["new_state"] = copy.deepcopy(result["old_state"]) except Exception as e: result["comment"] += [str(e)] result["result"] = False return result
[docs]async def absent(hub, ctx, name: str, resource_id: str = None) -> Dict[str, Any]: """Deletes a topic and all its subscriptions. Deleting a topic might prevent some messages previously sent to the topic from being delivered to subscribers. This action is idempotent, so deleting a topic that does not exist does not result in an error. Args: name(str): The name of the state. resource_id(str, Optional): The AWS resource identifier, here it is resource arn Request Syntax: .. code-block:: yaml [topic-name]: aws.sns.topic.absent: - resource_id: 'string' Returns: dict[str, Any] Examples: .. code-block:: yaml test-topic: aws.sns.topic.absent: - resource_id: arn:aws:sns:eu-west-3:537227425989:test-topic """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not resource_id: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.sns.topic", name=name ) return result resource = await hub.tool.boto3.resource.create(ctx, "sns", "Topic", resource_id) before = await hub.tool.boto3.resource.describe(resource) if not before: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( "aws.sns.topic", name ) else: ret_tags = await hub.exec.boto3.client.sns.list_tags_for_resource( ctx, ResourceArn=resource_id ) if not ret_tags["result"]: result["comment"] = ret_tags["comment"] result["result"] = False return result result[ "old_state" ] = hub.tool.aws.sns.conversion_utils.convert_raw_topic_to_present( raw_resource=before, raw_resource_tags=ret_tags, idem_resource_name=name ) if ctx.get("test", False): result["comment"] = hub.tool.aws.comment_utils.would_delete_comment( "aws.sns.topic", name ) return result try: ret = await hub.exec.boto3.client.sns.delete_topic( ctx, TopicArn=resource_id ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] result["result"] = False return result result["comment"] = hub.tool.aws.comment_utils.delete_comment( "aws.sns.topic", name ) except hub.tool.boto3.exception.ClientError as e: result["comment"] += [f"{e.__class__.__name__}: {e}"] return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Describe the resource in a way that can be recreated/managed with the corresponding "present" function. Returns a list of the requester's topics. Each call returns a limited list of topics, up to 100. If there are more topics, a NextToken is also returned. Use the NextToken parameter in a new ListTopics call to get further results. This action is throttled at 30 transactions per second (TPS). Returns: dict[str, Any] Examples: .. code-block:: bash $ idem describe aws.sns.topic """ result = {} ret = await hub.exec.boto3.client.sns.list_topics(ctx) if not ret["result"]: hub.log.warning(f"Could not describe topic {ret['comment']}") return {} for topic in ret["ret"]["Topics"]: # Including fields to match the 'present' function parameters resource_id = topic["TopicArn"] idem_resource_name = resource_id.split(":")[-1] resource = await hub.tool.boto3.resource.create( ctx, "sns", "Topic", arn=resource_id ) raw_resource = await hub.tool.boto3.resource.describe(resource) ret_tags = await hub.exec.boto3.client.sns.list_tags_for_resource( ctx, ResourceArn=resource_id ) if not ret_tags["result"]: hub.log.warning(f"Could not describe topic tag {ret_tags['comment']}") return {} resource_translated = ( hub.tool.aws.sns.conversion_utils.convert_raw_topic_to_present( raw_resource=raw_resource, raw_resource_tags=ret_tags, idem_resource_name=idem_resource_name, ) ) result[idem_resource_name] = { "aws.sns.topic.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in resource_translated.items() ] } return result