Source code for idem_aws.exec.aws.dynamodb.tag
import copy
from typing import Any
from typing import Dict
[docs]async def update_tags(
hub,
ctx,
resource_arn: str,
old_tags: Dict[str, Any],
new_tags: Dict[str, Any],
):
"""Update tags of AWS Dynamo DB resources
Args:
resource_arn:
Identifies the Amazon DynamoDB resource to which tags should be added.
This value is an Amazon Resource Name (ARN).
old_tags:
Dict of existing tags.
new_tags:
Dict of new tags.
Returns:
{"result": True|False, "comment": ["A list"], "ret": dict of updated tags}
"""
tags_to_add = {}
tags_to_remove = {}
if new_tags is not None:
tags_to_remove, tags_to_add = hub.tool.aws.tag_utils.diff_tags_dict(
old_tags=old_tags, new_tags=new_tags
)
result = dict(comment=[], result=True, ret={})
if (not tags_to_remove) and (not tags_to_add):
result["ret"] = copy.deepcopy(old_tags if old_tags else {})
return result
if tags_to_remove:
if not ctx.get("test", False):
delete_ret = await hub.exec.boto3.client.dynamodb.untag_resource(
ctx,
ResourceArn=resource_arn,
TagKeys=list(tags_to_remove.keys()),
)
if not delete_ret["result"]:
result["comment"] = delete_ret["comment"]
result["result"] = False
return result
if tags_to_add:
if not ctx.get("test", False):
add_ret = await hub.exec.boto3.client.dynamodb.tag_resource(
ctx,
ResourceArn=resource_arn,
Tags=hub.tool.aws.tag_utils.convert_tag_dict_to_list(tags_to_add),
)
if not add_ret["result"]:
result["comment"] += add_ret["comment"]
result["result"] = False
return result
result["ret"] = new_tags
if ctx.get("test", False):
result["comment"] = hub.tool.aws.comment_utils.would_update_tags_comment(
tags_to_remove=tags_to_remove, tags_to_add=tags_to_add
)
else:
result["comment"] = hub.tool.aws.comment_utils.update_tags_comment(
tags_to_remove=tags_to_remove, tags_to_add=tags_to_add
)
return result