Source code for idem_aws.states.aws.sqs.queue

"""State module for managing AWS SQS queue."""
import copy
from dataclasses import make_dataclass
from typing import Any
from typing import Dict

__contracts__ = ["resource"]


[docs]async def present( hub, ctx, name: str, resource_id: str = None, delay_seconds: int = None, maximum_message_size: int = None, message_retention_period: int = None, policy: str or Dict = None, receive_message_wait_time_seconds: int = None, redrive_policy: make_dataclass( "RedrivePolicy", [("deadLetterTargetArn", str), ("maxReceiveCount", int)] ) = None, visibility_timeout: int = None, kms_master_key_id: str = None, kms_data_key_reuse_period_seconds: int = None, sqs_managed_sse_enabled: bool = None, fifo_queue: bool = None, content_based_deduplication: bool = None, deduplication_scope: str = None, fifo_throughput_limit: str = None, tags: Dict[str, str] = None, ) -> Dict[str, Any]: """Creates a new standard or FIFO queue. You can pass one or more attributes in the request. Keep the following in mind - if you don't specify the fifo_queue attribute, Amazon SQS creates a standard queue. You can't change the queue type after you create it, and you can't convert an existing standard queue into a FIFO queue. You must either create a new FIFO queue for your application or delete your existing standard queue and recreate it as a FIFO queue. For more information, see Moving From a Standard Queue to a FIFO Queue in the Amazon SQS Developer Guide. If you don't provide a value for an attribute, the queue is created with the default value for the attribute. If you delete a queue, you must wait at least 60 seconds before creating a queue with the same name. To successfully create a new queue, you must provide a queue name that adheres to the limits related to queues and is unique within the scope of your queues. After you create a queue, you must wait at least one second after the queue is created to be able to use the queue. Cross-account permissions don't apply to this action. For more information, see Grant cross-account permissions to a role and a user name in the Amazon SQS Developer Guide. Args: name(str): The name of the new queue. A queue name can have up to 80 characters. Valid values: alphanumeric characters, hyphens (-), and underscores (_). A FIFO queue name must end with the .fifo suffix. Case-sensitive. resource_id(str, Optional): The URL of the Amazon SQS queue. Case-sensitive. Defaults to None. delay_seconds(int, Optional): The length of time, in seconds, for which the delivery of all messages in the queue is delayed. Valid values: An integer from 0 to 900 seconds (15 minutes). Default: 0. maximum_message_size(int, Optional): The limit of how many bytes a message can contain before Amazon SQS rejects it. Valid values: An integer from 1,024 bytes (1 KiB) to 262,144 bytes (256 KiB). Default: 262,144 (256 KiB). message_retention_period(int, Optional): The length of time, in seconds, for which Amazon SQS retains a message. Valid values: An integer from 60 seconds (1 minute) to 1,209,600 seconds (14 days). Default: 345,600 (4 days). policy(dict or str, Optional): The queue's policy. A valid Amazon Web Services policy. For more information about policy structure, see Overview of Amazon Web Services IAM Policies in the Amazon IAM User Guide. receive_message_wait_time_seconds(int, Optional): The length of time, in seconds, for which a ReceiveMessage action waits for a message to arrive. Valid values: An integer from 0 to 20 (seconds). Default: 0. redrive_policy(dict, Optional): The dictionary that includes the parameters for the dead-letter queue functionality of the source queue. For more information about the redrive policy and dead-letter queues, see Using Amazon SQS Dead-Letter Queues in the Amazon SQS Developer Guide. * deadLetterTargetArn (str): The Amazon Resource Name (ARN) of the dead-letter queue to which Amazon SQS moves messages after the value of maxReceiveCount is exceeded. * maxReceiveCount (int): The number of times a message is delivered to the source queue before being moved to the dead-letter queue. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue. The dead-letter queue of a FIFO queue must also be a FIFO queue. Similarly, the dead-letter queue of a standard queue must also be a standard queue. visibility_timeout(int, Optional): The visibility timeout for the queue, in seconds. Valid values: An integer from 0 to 43,200 (12 hours). Default: 30. For more information about the visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide. The following attributes apply only to server-side-encryption: Args: kms_master_key_id(str, Optional): The ID of an Amazon Web Services managed customer master key (CMK) for Amazon SQS or a custom CMK. For more information, see Key Terms. While the alias of the Amazon Web Services managed CMK for Amazon SQS is always alias/aws/sqs, the alias of a custom CMK can, for example, be alias/MyAlias . For more examples, see KeyId in the Key Management Service API Reference. kms_data_key_reuse_period_seconds(int, Optional): The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling KMS again. An integer representing seconds, between 60 seconds (1 minute) and 86,400 seconds (24 hours). Default: 300 (5 minutes). A shorter time period provides better security but results in more calls to KMS which might incur charges after Free Tier. For more information, see How Does the Data Key Reuse Period Work? sqs_managed_sse_enabled(bool, Optional): enables server-side queue encryption using SQS owned encryption keys. Only one server-side encryption option is supported per queue. The following attributes apply only to FIFO (first-in-first-out) queues: Args: fifo_queue(bool, Optional): Designates a queue as FIFO. Valid values are true and false. If you don't specify the FifoQueue attribute, Amazon SQS creates a standard queue. You can provide this attribute only during queue creation. You can't change it for an existing queue. When you set this attribute, you must also provide the MessageGroupId for your messages explicitly. For more information, see FIFO queue logic in the Amazon SQS Developer Guide. content_based_deduplication(bool, Optional): Enables content-based deduplication. Valid values are true and false. For more information, see Exactly-once processing in the Amazon SQS Developer Guide. Note the following: Every message must have a unique MessageDeduplicationId. You may provide a MessageDeduplicationId explicitly. If you aren't able to provide a MessageDeduplicationId and you enable ContentBasedDeduplication for your queue, Amazon SQS uses a SHA-256 hash to generate the MessageDeduplicationId using the body of the message (but not the attributes of themessage). If you don't provide a MessageDeduplicationId and the queue doesn't have ContentBasedDeduplication set, the action fails with an error. If the queue has ContentBasedDeduplication set, your MessageDeduplicationId overrides the generated one. When ContentBasedDeduplication is in effect, messages with identical content sent within the deduplication interval are treated as duplicates and only one copy of the message is delivered. If you send one message with ContentBasedDeduplication enabled and then another message with a MessageDeduplicationId that is the same as the one generated for the first MessageDeduplicationId, the two messages are treated as duplicates and only one copy of the message is delivered. The following attributes apply only to high throughput for FIFO queues: Args: deduplication_scope(str, Optional): Specifies whether message deduplication occurs at the message group or queue level. Valid values are messageGroup and queue. fifo_throughput_limit(str, Optional): Specifies whether the FIFO queue throughput quota applies to the entire queue or per message group. Valid values are perQueue and perMessageGroupId. The perMessageGroupId value is allowed only when the value for DeduplicationScope is messageGroup. To enable high throughput for FIFO queues, do the following: - Set DeduplicationScope to messageGroup. - Set FifoThroughputLimit to perMessageGroupId. If you set these attributes to anything other than the values shown for enabling high throughput, normal throughput is in effect and deduplication occurs as specified. For information on throughput quotas, see Quotas related to messages in the Amazon SQS Developer Guide. Defaults to None. tags(dict, Optional): Add cost allocation tags to the specified Amazon SQS queue. For an overview, see Tagging Your Amazon SQS Queues in the Amazon SQS Developer Guide. When you use queue tags, keep the following guidelines in mind: - Adding more than 50 tags to a queue isn't recommended. - Tags don't have any semantic meaning. Amazon SQS interprets tags as character strings. - Tags are case-sensitive. - A new tag with a key identical to that of an existing tag overwrites the existing tag. For a full list of tag restrictions, see Quotas related to queues in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account permissions don't apply to this action. For more information, see Grant cross-account permissions to a role and a user name in the Amazon SQS Developer Guide. Defaults to None. Returns: dict[str, Any] Examples: .. code-block:: yaml example_fifo_queue.fifo: aws.sqs.queue.present: - delay_seconds: 35 - maximum_message_size: 12345 - message_retention_period: 1759 - policy: Version: 2022-03-30 Id: Example_Queue_Policy Statement: Effect: Deny Principal: AWS: - "000000000000" Action: sqs:SendMessage Resource: arn:aws:sqs:us-west-1:000000000000:example_sqs_queue.fifo - receive_message_wait_time_seconds: 3 - redrive_policy: deadLetterTargetArn: arn:aws:sqs:us-west-1:000000000000:dead_letter_sqs_queue.fifo maxReceiveCount: 5 - visibility_timeout: 75 - kms_master_key_id: alias/sample/key - kms_data_key_reuse_period_seconds: 735 - fifo_queue: true - content_based_deduplication: true - deduplication_scope: message_group - fifo_throughput_limit: perMessageGroupId - tags: something: this-is-just-some-text """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) resource_updated = False before = None queue_url = resource_id # Try to get the state of the resource if such is available if queue_url: queue_state_ret = await hub.exec.aws.sqs.queue.get(ctx, queue_url) if not queue_state_ret["result"] or not queue_state_ret["ret"]: result["result"] = False result["comment"] += queue_state_ret["comment"] return result before = queue_state_ret["ret"] present_attributes = { "delay_seconds": delay_seconds, "maximum_message_size": maximum_message_size, "message_retention_period": message_retention_period, "policy": policy, "receive_message_wait_time_seconds": receive_message_wait_time_seconds, "redrive_policy": redrive_policy, "visibility_timeout": visibility_timeout, "kms_master_key_id": kms_master_key_id, "kms_data_key_reuse_period_seconds": kms_data_key_reuse_period_seconds, "sqs_managed_sse_enabled": sqs_managed_sse_enabled, "fifo_queue": fifo_queue, "content_based_deduplication": content_based_deduplication, "deduplication_scope": deduplication_scope, "fifo_throughput_limit": fifo_throughput_limit, } # Convert the new state attributes to a dict of string values with raw attribute names plan_state_raw_attributes = ( hub.tool.aws.sqs.conversion_utils.convert_present_attributes_to_raw( **present_attributes ) ) if before: # Check if the resource needs to be updated result["old_state"] = before # Convert the old state attributes to a dict of string values with raw attribute names old_state_raw_attributes = ( hub.tool.aws.sqs.conversion_utils.convert_present_attributes_to_raw( before.get("delay_seconds"), before.get("maximum_message_size"), before.get("message_retention_period"), before.get("policy"), before.get("receive_message_wait_time_seconds"), before.get("redrive_policy"), before.get("visibility_timeout"), before.get("kms_master_key_id"), before.get("kms_data_key_reuse_period_seconds"), before.get("sqs_managed_sse_enabled"), before.get("fifo_queue"), before.get("content_based_deduplication"), before.get("deduplication_scope"), before.get("fifo_throughput_limit"), ) ) # Get only the attributes that need to be updated attributes_to_update = dict( plan_state_raw_attributes.items() - old_state_raw_attributes.items() ) tags_to_remove, tags_to_add = hub.tool.aws.tag_utils.diff_tags_dict( before.get("tags"), tags ) resource_updated = attributes_to_update or tags_to_add or tags_to_remove # Handle the case when updating the resource with --test if ctx.get("test", False): # Construct the new state in the case of updating the resource with --test result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_id": resource_id, **present_attributes, "tags": tags, }, ) if attributes_to_update: # Updated resource result["comment"] += hub.tool.aws.comment_utils.would_update_comment( "aws.sqs.queue", name ) if tags_to_add or tags_to_remove: # Updated tags result[ "comment" ] += hub.tool.aws.comment_utils.would_update_tags_comment( tags_to_remove, tags_to_add ) else: # Update the attributes if attributes_to_update: updated_attributes_ret = ( await hub.exec.boto3.client.sqs.set_queue_attributes( ctx, QueueUrl=queue_url, Attributes=attributes_to_update ) ) updated_attributes_result = updated_attributes_ret["result"] result["result"] = result["result"] and updated_attributes_result result["comment"] += ( hub.tool.aws.comment_utils.update_comment("aws.sqs.queue", name) if updated_attributes_result else updated_attributes_ret["comment"] ) # tags_to_remove may contain the tags to be updated. They do not need to be deleted and then added, # they can be updated just by calling the hub.exec.boto3.client.sqs.tag_queue with those updated tags, # so they can be removed from the dict of tags to remove tag_keys_to_delete = list(tags_to_remove.keys() - tags_to_add.keys()) deleted_tags = False if tag_keys_to_delete: # Delete any tags from the old state which are not in the new state deleted_tags_ret = await hub.exec.boto3.client.sqs.untag_queue( ctx, QueueUrl=queue_url, TagKeys=tag_keys_to_delete ) deleted_tags = deleted_tags_ret["result"] if not deleted_tags: result["result"] = False result["comment"] += deleted_tags_ret["comment"] # Update or add tags updated_tags = False if tags_to_add: updated_tags_ret = await hub.exec.boto3.client.sqs.tag_queue( ctx, QueueUrl=queue_url, Tags=tags_to_add ) updated_tags = updated_tags_ret["result"] if not updated_tags: result["result"] = False result["comment"] += updated_tags_ret["comment"] if updated_tags or deleted_tags: result["comment"] += hub.tool.aws.comment_utils.update_tags_comment( tags_to_remove, tags_to_add ) else: # Handle the case when creating the resource with --test if ctx.get("test", False): # Construct the new state in the case of creating the resource with --test result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_id": resource_id, **present_attributes, "tags": tags, }, ) result["comment"] += hub.tool.aws.comment_utils.would_create_comment( "aws.sqs.queue", name ) else: # Create a new SQS queue ret = await hub.exec.boto3.client.sqs.create_queue( ctx, QueueName=name, Attributes=plan_state_raw_attributes, tags=tags, ) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result queue_url = ret["ret"].get("QueueUrl") result["comment"] += hub.tool.aws.comment_utils.create_comment( "aws.sqs.queue", name ) if (not before) or resource_updated: if not ctx.get("test", False): # Get the new state after creating or updating the resource queue_state_after = await hub.exec.aws.sqs.queue.get( ctx, queue_url, expected_attributes=present_attributes, expected_tags=tags, ) result["new_state"] = queue_state_after["ret"] result["result"] = result["result"] and queue_state_after["result"] result["comment"] += queue_state_after["comment"] else: # Resource already exists or there are no updates to properties result["comment"] += hub.tool.aws.comment_utils.already_exists_comment( "aws.sqs.queue", name ) result["new_state"] = copy.deepcopy(result["old_state"]) return result
[docs]async def absent(hub, ctx, name: str, resource_id: str = None) -> Dict[str, Any]: """Deletes the queue specified by the QueueUrl, regardless of the queue's contents. Be careful with the DeleteQueue action: When you delete a queue, any messages in the queue are no longer available. When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, a SendMessage request might succeed, but after 60 seconds the queue and the message you sent no longer exist. When you delete a queue, you must wait at least 60 seconds before creating a queue with the same name. Cross-account permissions don't apply to this action. For more information, see Grant cross-account permissions to a role and a user name in the Amazon SQS Developer Guide. Args: name(str): The name of the SQS queue to delete. resource_id(str, Optional): The URL of the Amazon SQS queue to delete. Case-sensitive. Idem automatically considers this resource being absent if this field is not specified. Returns: dict[str, Any] Examples: .. code-block:: yaml resource_is_absent: aws.sqs.queue.absent: - resource_id: value """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) before = None # Try to get the previous state of the resource if such is available if resource_id: # resource_id is the queue URL queue_state_ret = await hub.exec.aws.sqs.queue.get(ctx, resource_id) if queue_state_ret["result"]: before = queue_state_ret["ret"] else: result["result"] = False result["comment"] += queue_state_ret["comment"] return result if not before: # Resource already absent result["comment"] += hub.tool.aws.comment_utils.already_absent_comment( "aws.sqs.queue", name ) else: # Delete the resource result["old_state"] = before if ctx.get("test", False): result["comment"] += hub.tool.aws.comment_utils.would_delete_comment( "aws.sqs.queue", name ) return result ret = await hub.exec.boto3.client.sqs.delete_queue(ctx, QueueUrl=resource_id) result["result"] = ret["result"] if not result["result"]: result["comment"] = ret["comment"] return result result["comment"] += hub.tool.aws.comment_utils.delete_comment( "aws.sqs.queue", name ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Returns a list of your queues in the current region. Returns: dict[str, Any] Examples: .. code-block:: bash $ idem describe aws.sqs.queue """ result = {} queue_urls_ret = await hub.exec.boto3.client.sqs.list_queues(ctx) if not queue_urls_ret["result"]: hub.log.warning(f"Could not describe aws.sqs.queue {queue_urls_ret['comment']}") return {} # The resource_id is the queue URL for resource_id in queue_urls_ret["ret"]["QueueUrls"]: queue_state_ret = await hub.exec.aws.sqs.queue.get(ctx, resource_id) if not queue_state_ret["ret"]: hub.log.debug(queue_state_ret["comment"]) continue # queue_state_ret["ret"] can contain items of type NamespaceDict # which cannot be represented as a valid yaml, so they need to be # converted to dict which is done via this copy method queue_state = copy.copy(queue_state_ret["ret"]) queue_state.pop("name") queue_state_list = [{k: v} for k, v in queue_state.items()] # The queue name is the last part of the queue URL, it is used as the idem resource name in describe name = resource_id.split("/")[-1] result[name] = {"aws.sqs.queue.present": queue_state_list} return result