Source code for idem_aws.exec.aws.dynamodb.table

import copy
from typing import Any
from typing import Dict


"""
Exec functions for AWS DynamoDB Table resources.
"""


[docs]async def get(hub, ctx, name, resource_id) -> Dict[str, Any]: """Get a DynamoDB Table resource from AWS. Args: name(str): The name of the Idem state. resource_id(str): The name of the Dynamodb table. Returns: Dict[str, Any] """ result = dict(comment=[], ret=None, result=True) describe_ret = await hub.exec.boto3.client.dynamodb.describe_table( ctx, TableName=resource_id ) if not describe_ret["result"]: if "ResourceNotFoundException" in str(describe_ret["comment"]): result["comment"].append( hub.tool.aws.comment_utils.get_empty_comment( resource_type="aws.dynamodb.table", name=name ) ) result["comment"] += list(describe_ret["comment"]) return result result["result"] = False result["comment"] = describe_ret["comment"] return result table_resource = describe_ret.get("ret").get("Table") tags_ret = await hub.exec.boto3.client.dynamodb.list_tags_of_resource( ctx, ResourceArn=table_resource.get("TableArn") ) if not tags_ret["result"]: result["result"] = False result["comment"] = tags_ret["comment"] return result table_resource.update(copy.deepcopy(tags_ret["ret"])) continuous_backups_ret = ( await hub.exec.boto3.client.dynamodb.describe_continuous_backups( ctx, TableName=table_resource.get("TableName") ) ) if not continuous_backups_ret.get("result"): result["result"] = False result["comment"] = continuous_backups_ret["comment"] return result table_resource.update(copy.deepcopy(continuous_backups_ret.get("ret"))) time_to_live_ret = await hub.exec.boto3.client.dynamodb.describe_time_to_live( ctx, TableName=table_resource.get("TableName") ) if not time_to_live_ret.get("result"): result["result"] = False result["comment"] = time_to_live_ret["comment"] return result table_resource.update(copy.deepcopy(time_to_live_ret.get("ret"))) result["ret"] = hub.tool.aws.dynamodb.table.convert_raw_table_to_present( ctx, raw_resource=table_resource, idem_resource_name=name ) return result