Source code for idem_aws.exec.aws.es.elasticsearch_domain

"""Exec module for managing Amazon Elasticsearch Service Domain"""


[docs]async def get( hub, ctx, name: str, resource_id: str, ): """Get the domain configuration information about the specified Elasticsearch domain Args: name(str): An Idem name of the resource. resource_id(str): The name of the Elasticsearch domain. Returns: .. code-block:: python {"result": True|False, "comment": A message List, "ret": None|Dict} Examples: Using in a state: .. code-block:: yaml my_unmanaged_resource: exec.run: - path: aws.es.elasticsearch_domain.get - kwargs: name: my_resource resource_id: resource_id Calling this exec function from the cli with resource_id: .. code-block:: bash idem exec aws.es.elasticsearch_domain.get resource_id="resource_id" name="name" """ result = dict(comment=[], ret=None, result=True) get_domain_ret = await hub.exec.boto3.client.es.describe_elasticsearch_domain( ctx, DomainName=resource_id ) if not get_domain_ret["result"]: if "ResourceNotFoundException" in str(get_domain_ret["comment"]): result["comment"].append( hub.tool.aws.comment_utils.get_empty_comment( resource_type="aws.es.elasticsearch_domain", name=name ) ) result["comment"] += list(get_domain_ret["comment"]) return result result["result"] = False result["comment"] = list(get_domain_ret["comment"]) return result elasticsearch_domain = get_domain_ret["ret"].get("DomainStatus") arn = elasticsearch_domain.get("ARN") tags_ret = await hub.exec.boto3.client.es.list_tags(ctx, ARN=arn) tags = [] if not tags_ret["result"]: result["comment"] = list(tags_ret["comment"]) result["result"] = False return result else: if tags_ret["ret"] and tags_ret.get("ret").get("TagList"): tags = tags_ret.get("ret").get("TagList") result[ "ret" ] = hub.tool.aws.es.elasticsearch_domain.convert_raw_elasticsearch_domain_to_present( elasticsearch_domain, tags=hub.tool.aws.tag_utils.convert_tag_list_to_dict(tags), ) return result