Source code for idem_aws.exec.aws.rds.tag
"""RDS Tag operations"""
import copy
from typing import Any
from typing import Dict
[docs]async def update_rds_tags(
hub,
ctx,
resource_arn,
old_tags: Dict[str, Any],
new_tags: Dict[str, Any],
):
"""
Update tags of AWS RDS resources
Args:
resource_arn (str):
AWS resource arn
old_tags (dict):
Dict in the format of {tag-key: tag-value}
new_tags (dict):
Dict in the format of {tag-key: tag-value}
Returns:
{"result": True|False, "comment": "A message", "ret": dict of updated tags}
Examples:
Calling this exec module function from the cli
.. code-block:: bash
idem exec aws.rds.tag.update_rds_tags resource_arn="some_arn" old_tags='{"old_tag": "old_val"}' new_tags='{"new_tag": "new_val"}'
Using in a state:
.. code-block:: yaml
my_unmanaged_resource:
exec.run:
- path: aws.rds.tag.update_rds_tags
- kwargs:
resource_arn: some_arn
old_tags:
old_tag: old_val
new_tags:
new_tag: new_val
"""
tags_to_add = {}
tags_to_remove = {}
if new_tags is not None:
tags_to_remove, tags_to_add = hub.tool.aws.tag_utils.diff_tags_dict(
old_tags=old_tags, new_tags=new_tags
)
result = dict(comment=[], result=True, ret=None)
if (not tags_to_remove) and (not tags_to_add):
result["ret"] = copy.deepcopy(old_tags if old_tags else {})
return result
if tags_to_remove:
if not ctx.get("test", False):
delete_ret = await hub.exec.boto3.client.rds.remove_tags_from_resource(
ctx,
ResourceName=resource_arn,
TagKeys=list(tags_to_remove),
)
if not delete_ret["result"]:
result["comment"] = delete_ret["comment"]
result["result"] = False
return result
if tags_to_add:
if not ctx.get("test", False):
add_ret = await hub.exec.boto3.client.rds.add_tags_to_resource(
ctx,
ResourceName=resource_arn,
Tags=hub.tool.aws.tag_utils.convert_tag_dict_to_list(tags_to_add),
)
if not add_ret["result"]:
result["comment"] = add_ret["comment"]
result["result"] = False
return result
result["ret"] = new_tags
result["comment"] += [f"Update tags: Add [{tags_to_add}] Remove [{tags_to_remove}]"]
return result