Source code for idem_aws.exec.aws.s3.bucket_replication
"""Exec functions for S3 bucket's replication configurations."""
from typing import Dict
[docs]async def get(hub, ctx, name: str, resource_id: str) -> Dict:
"""Returns the replication configuration of a bucket.
Args:
name(str):
The name of the Idem state.
resource_id(str):
AWS S3 bucket name.
Examples:
Calling from the CLI:
.. code-block:: bash
$ idem exec aws.s3.bucket.get name="bucket-replication-name" resource_id="bucket-replication-id"
Using in a state:
.. code-block:: yaml
get_a_bucket_replication:
exec.run:
- path: aws.s3.bucket_replication.get
- kwargs:
name: bucket-replication-name
resource_id: bucket-replication-id
"""
result = dict(comment=[], ret=None, result=True)
ret = await hub.exec.boto3.client.s3.get_bucket_replication(ctx, Bucket=resource_id)
if not ret["result"]:
if "ReplicationConfigurationNotFoundError" in str(ret["comment"]):
result["comment"].append(
hub.tool.aws.comment_utils.get_empty_comment(
resource_type="aws.s3.bucket_replication", name=name
)
)
result["comment"] += list(ret["comment"])
return result
result["comment"] += list(ret["comment"])
result["result"] = False
return result
if "ReplicationConfiguration" not in ret["ret"]:
result["comment"].append(
hub.tool.aws.comment_utils.get_empty_comment(
resource_type="aws.s3.bucket_replication", name=name
)
)
result["comment"] += list(ret["comment"])
return result
result[
"ret"
] = hub.tool.aws.s3.conversion_utils.convert_raw_bucket_replication_to_present(
ctx=ctx, raw_resource=ret["ret"], bucket_name=resource_id
)
return result