Source code for idem_aws.states.aws.lambda_aws.function_permission

"""State module for managing Lambda function permission"""
import copy
import json
from typing import Any
from typing import Dict

__contracts__ = ["resource"]
# if no permission exists it throws below error message.
PERMISSION_DOES_NOT_EXISTS = "ResourceNotFoundException: An error occurred (ResourceNotFoundException) when calling the GetPolicy operation"


[docs]async def present( hub, ctx, name: str, action: str, function_name: str, principal: str, source_arn: str = None, source_account: str = None, resource_id: str = None, event_source_token: str = None, qualifier: str = None, revision_id: str = None, principal_org_id: str = None, function_url_auth_type: str = None, ) -> Dict[str, Any]: """Add specified permission to the lambda function. Args: name(str): Name/Id of the statement/permission action(str): The action that the principal can use on the function. For example, ``lambda:InvokeFunction`` or ``lambda:GetFunction``. function_name(str): The name of the Lambda function, version, or alias. Name formats * Function name - my-function (name-only), my-function:v1 (with alias). * Function ARN - ``arn:aws:lambda:us-west-2:123456789012:function:my-function`` . * Partial ARN - ``123456789012:function:my-function`` principal(str): The Amazon Web Services service or account that invokes the function. If you specify a service, use source_arn or source_account to limit who can invoke the function through that service. source_arn(str, Optional): For Amazon Web Services , the ARN of the Amazon Web Services resource that invokes the function. For example, an Amazon S3 bucket or Amazon SNS topic. source_account(str, Optional): For Amazon S3, the ID of the account that owns the resource. Use this together with SourceArn to ensure that the resource is owned by the specified account. It is possible for an Amazon S3 bucket to be deleted by its owner and recreated by another account. resource_id(str, Optional): Name/Id of the statement/permission event_source_token(str, Optional): For Alexa Smart Home functions, a token that must be supplied by the invoker. qualifier(str, Optional): Specify a version or alias to add permissions to a published version of the function. revision_id(str, Optional): Only update the policy if the revision ID matches the ID that's specified. Use this option to avoid modifying a policy that has changed since you last read it. principal_org_id(str, Optional): The identifier for your organization in Organizations. Use this to grant permissions to all the Amazon Web Services accounts under this organization. function_url_auth_type(str, Optional): The type of authentication that your function URL uses. Set to AWS_IAM if you want to restrict access to authenticated IAM users only. Set to NONE if you want to bypass IAM authentication to create a public endpoint. Request Syntax: .. code-block:: sls [statement_id]: aws.lambda_aws.function_permission.present: - name: 'string' - resource_id: 'string' - action: 'string' - function_name: 'string' - principal: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls test_idem_lambda_function_statement: aws.lambda_aws.function_permission.present: - action: lambda:GetAlias - effect: Allow - function_name: ecd17a181d6588e27c976cdeff501e90750b0dcafebba907cc4aab3c - name: test-001 - principal: "AWS: AIDAX2FJ77DC2AS7BAPBU" - resource_id: test-001 """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) # check if lambda function exists ret = await hub.exec.boto3.client["lambda"].get_function( ctx, FunctionName=function_name ) if not ret["result"]: result["comment"] = ret["comment"] result["result"] = False return result # get current set of permissions get_policy = await hub.exec.boto3.client["lambda"].get_policy( ctx, FunctionName=function_name ) if not get_policy["result"] and not any( PERMISSION_DOES_NOT_EXISTS in msg for msg in get_policy["comment"] ): result["comment"] = get_policy["comment"] result["result"] = False return result if get_policy["result"]: final_resource_id = resource_id if resource_id else name raw_statements = json.loads(get_policy["ret"]["Policy"])["Statement"] for statement in raw_statements: if statement["Sid"] == final_resource_id: result[ "old_state" ] = hub.tool.aws.lambda_aws.function_permission.convert_raw_lambda_permission_to_present( raw_resource=statement, function_name=function_name, revision_id=get_policy["ret"]["RevisionId"], ) result["comment"] = hub.tool.aws.comment_utils.already_exists_comment( resource_type="aws.lambda_aws.function_permission", name=final_resource_id, ) result["new_state"] = copy.deepcopy(result["old_state"]) return result if ctx.get("test", False): result["comment"] = hub.tool.aws.comment_utils.would_create_comment( resource_type="aws.lambda_aws.function_permission", name=name ) principal_response = {"AWS": principal} result["new_state"] = hub.tool.aws.test_state_utils.generate_test_state( enforced_state={}, desired_state={ "name": name, "resource_id": name, "action": action, "function_name": function_name, "principal": principal_response, "source_arn": source_arn, "source_account": source_account, "event_source_token": event_source_token, "qualifier": qualifier, "revision_id": revision_id, "principal_org_id": principal_org_id, "function_url_auth_type": function_url_auth_type, }, ) return result add_permission = await hub.exec.boto3.client["lambda"].add_permission( ctx, FunctionName=function_name, StatementId=name, Action=action, Principal=principal, SourceArn=source_arn, SourceAccount=source_account, EventSourceToken=event_source_token, Qualifier=qualifier, RevisionId=revision_id, PrincipalOrgID=principal_org_id, FunctionUrlAuthType=function_url_auth_type, ) if not add_permission["result"]: result["comment"] = add_permission["comment"] result["result"] = False return result get_policy = await hub.exec.boto3.client["lambda"].get_policy( ctx, FunctionName=function_name ) if not get_policy["result"]: result["comment"] = get_policy["comment"] result["result"] = False return result raw_statements = json.loads(get_policy["ret"]["Policy"])["Statement"] for statement in raw_statements: if statement["Sid"] == name: result[ "new_state" ] = hub.tool.aws.lambda_aws.function_permission.convert_raw_lambda_permission_to_present( raw_resource=statement, function_name=function_name, revision_id=get_policy["ret"]["RevisionId"], ) break result["comment"] = hub.tool.aws.comment_utils.create_comment( resource_type="aws.lambda_aws.function_permission", name=name ) return result
[docs]async def absent( hub, ctx, name: str, function_name: str = None, resource_id: str = None ) -> Dict[str, Any]: """Remove specified permission from the lambda function. Args: name(str): Name/Id of the statement/permission function_name(str, Optional): The name of the Lambda function, version, or alias. Name formats * Function name - my-function (name-only), my-function:v1 (with alias). * Function ARN - ``arn:aws:lambda:us-west-2:123456789012:function:my-function`` . * Partial ARN - ``123456789012:function:my-function`` . resource_id(str, Optional): Name/Id of the statement/permission Request Syntax: .. code-block:: sls [statement_id]: aws.lambda_aws.function_permission.absent: - name: 'string' Returns: Dict[str, Any] Examples: .. code-block:: sls test_idem_lambda_function_statement: aws.lambda_aws.function_permission.absent: - name: test-001 """ result = dict(comment=[], old_state=None, new_state=None, name=name, result=True) if not resource_id: resource_id = name if not function_name: result["comment"] = hub.tool.aws.comment_utils.missing_args_for_absent_comment( resource_type="aws.lambda_aws.function_permission", name=name, args=["function_name"], ) result["result"] = False return result ret = await hub.exec.boto3.client["lambda"].get_function( ctx, FunctionName=function_name ) if not ret["result"]: result["comment"] = ret["comment"] result["result"] = False return result get_policy = await hub.exec.boto3.client["lambda"].get_policy( ctx, FunctionName=function_name ) if not get_policy["result"]: if any(PERMISSION_DOES_NOT_EXISTS in msg for msg in get_policy["comment"]): result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.lambda_aws.function_permission", name=resource_id ) else: result["comment"] = ret["comment"] result["result"] = False return result raw_statements = json.loads(get_policy["ret"]["Policy"])["Statement"] list_of_statement_ids = [statement["Sid"] for statement in raw_statements] if name not in list_of_statement_ids: result["comment"] = hub.tool.aws.comment_utils.already_absent_comment( resource_type="aws.lambda_aws.function_permission", name=resource_id ) return result for statement in raw_statements: if statement["Sid"] == name: result[ "old_state" ] = hub.tool.aws.lambda_aws.function_permission.convert_raw_lambda_permission_to_present( raw_resource=statement, function_name=function_name, revision_id=get_policy["ret"]["RevisionId"], ) break if ctx.get("test", False): result["comment"] = hub.tool.aws.comment_utils.would_delete_comment( resource_type="aws.lambda_aws.function_permission", name=resource_id ) return result remove_permission = await hub.exec.boto3.client["lambda"].remove_permission( ctx, FunctionName=function_name, StatementId=resource_id, ) if not remove_permission["result"]: result["comment"] = remove_permission["comment"] result["result"] = False result["comment"] = hub.tool.aws.comment_utils.delete_comment( resource_type="aws.lambda_aws.function_permission", name=resource_id, ) return result
[docs]async def describe(hub, ctx) -> Dict[str, Dict[str, Any]]: """Returns permissions associated with each lambda function. Returns: Dict[str, Dict[str, Any]] Examples: .. code-block:: bash $ idem describe aws.lambda_aws.function_permission """ result = {} ret = await hub.exec.boto3.client["lambda"].list_functions(ctx) if not ret["result"]: hub.log.warning(f"Could not describe lambda functions. Error: {ret['comment']}") return result if ret and ret["result"] and ret["ret"]["Functions"]: for function in ret["ret"]["Functions"]: function_name = function.get("FunctionName") get_policy = await hub.exec.boto3.client["lambda"].get_policy( ctx, FunctionName=function_name ) if not get_policy["result"]: if not any( PERMISSION_DOES_NOT_EXISTS in msg for msg in get_policy["comment"] ): hub.log.debug( f"Could not describe lambda functions permission for {function_name}. Error: {get_policy['comment']}" ) continue raw_statements = json.loads(get_policy["ret"]["Policy"])["Statement"] for statement in raw_statements: permission_translated = hub.tool.aws.lambda_aws.function_permission.convert_raw_lambda_permission_to_present( raw_resource=statement, function_name=function_name, revision_id=get_policy["ret"]["RevisionId"], ) result[statement["Sid"]] = { "aws.lambda_aws.function_permission.present": [ {parameter_key: parameter_value} for parameter_key, parameter_value in permission_translated.items() ] } return result