Source code for idem_aws.exec.aws.sns.subscription
"""Exec module for managing SNS Subscription."""fromtypingimportAnyfromtypingimportDict__func_alias__={"list_":"list"}
[docs]asyncdefget(hub,ctx,name:str,topic_arn:str=None,resource_id:str=None,)->Dict[str,Any]:"""Get Subscription from either topic arn or resource id. Args: name(str): An Idem name of the state for logging. topic_arn(str, Optional): Topic ARN resource_id(str, Optional): Subscription ARN """result=dict(comment=[],result=True,ret=None)ret=awaithub.exec.aws.sns.subscription.list(ctx,name=name,topic_arn=topic_arn,resource_id=resource_id)ifnotret["result"]:result["result"]=Falseresult["comment"]+=list(ret["comment"])returnresultfiltered=ret["ret"]result["comment"]=ret["comment"]iffiltered:result["ret"]=filtered[0]iflen(filtered)>1:result["comment"].append(f"More than one aws.sns.subscription resource was found. Use resource {filtered[0].get('resource_id')}")returnresult
[docs]asyncdeflist_(hub,ctx,name:str=None,topic_arn:str=None,resource_id:str=None)->Dict:"""Use an un-managed Subscription as a data-source. Args: name(str): The name of the Idem state. topic_arn(str, Optional): Topic ARN resource_id(str, Optional): Subscription ARN """result=dict(comment=[],result=True,ret=[])ret=Noneiftopic_arn:ret=awaithub.exec.boto3.client.sns.list_subscriptions_by_topic(ctx,TopicArn=topic_arn)ifnotret["result"]:if"NotFoundException"instr(ret["comment"]):result["comment"].append(hub.tool.aws.comment_utils.get_empty_comment(resource_type="aws.sns.subscription",name=name))result["comment"]+=list(ret["comment"])returnresultresult["result"]=Falseresult["comment"]+=list(ret["comment"])returnresultelse:ret=awaithub.exec.boto3.client.sns.list_subscriptions(ctx)ifnotret["result"]:result["result"]=Falseresult["comment"]+=list(ret["comment"])returnresultifresource_id:filtered=list(filter(lambdasubs:subs["SubscriptionArn"]==resource_id,ret["ret"]["Subscriptions"],))ret["ret"]["Subscriptions"]=filteredifnotret["ret"]["Subscriptions"]:result["comment"].append(hub.tool.aws.comment_utils.get_empty_comment(resource_type="aws.sns.subscription",name=name))returnresultforsubscriptioninret["ret"]["Subscriptions"]:subscription_id=subscription.get("SubscriptionArn")ifsubscription_id=="Deleted"orsubscription_id=="PendingConfirmation":continueresource=awaithub.tool.boto3.resource.create(ctx,"sns","Subscription",arn=subscription_id)raw_resource=awaithub.tool.boto3.resource.describe(resource)result["ret"].append(hub.tool.aws.sns.conversion_utils.convert_raw_subscription_to_present(raw_resource=raw_resource,idem_resource_name=subscription_id))returnresult