from typing import Any
from idem_vra.helpers.mapper import add_properties
from idem_vra.helpers.mapper import omit_properties
from idem_vra.helpers.models import StateReturn
__contracts__ = ["resource"]
TREQ = {"present": {"require": ["vra.iaas.project.present"]}, "absent": {"require": []}}
[docs]async def present(hub, ctx, name: str, **kwargs):
"""
:param string name: (required in body) A human-friendly name used as an identifier in APIs that support this
option
:param string apiVersion: (optional in query) The version of the API in yyyy-MM-dd format (UTC). For versioning
information please refer to /codestream/api/about
:param string Authorization: (optional in header) Bearer token
:param string branchName: (optional in body) Branch name for which the webhook has been configured.
:param integer delayTimeInMins: (optional in body) Delay time after which the pipeline execution is triggered.
:param string description: (optional in body) A human-friendly description.
:param string endpoint: (optional in body) Git endpoint.
:param string eventCategory: (optional in body) Git event type.
:param array exclusions: (optional in body) Provide file exclusions as conditions for the trigger.
:param string externalListenerLink: (optional in body) Git webhook listener link.
:param array inclusions: (optional in body) Provide file inclusions as conditions for the trigger.
:param object input: (optional in body) Pipeline Execution input properties.
:param boolean insecureSsl: (optional in body) verify SSL certificates when delivering payloads
:param string pipeline: (optional in body) Pipeline name which is meant to be triggered when a git event occur.
:param boolean prioritizeExclusion: (optional in body) Prioritize Exclusion ensures that pipelines are not triggered even if
any ” +
“of the files in a commit match the specified files in the
exclusion paths or regex.
:param string project: (optional in body) The project this entity belongs to.
:param string refreshToken: (optional in body) Codestream API token.
:param string repoName: (optional in body) Repo name for which the webhook has been configured.
:param string secretToken: (optional in body) Secret token to validate received payloads.
:param string serverType: (optional in body) Git Server Type.
:param string serverWebhookId: (optional in body) Git webhook id.
"""
try:
state = GitwebhooksStateImpl(hub, ctx)
return await state.present(hub, ctx, name, **kwargs)
except Exception as error:
hub.log.error("Error during enforcing present state: gitwebhooks")
hub.log.error(str(error))
raise error
[docs]async def absent(hub, ctx, name: str, **kwargs):
"""
:param string p_id: (required in path) id
:param string apiVersion: (optional in query) The version of the API in yyyy-MM-dd format (UTC). For versioning
information please refer to /codestream/api/about
:param string Authorization: (optional in header) Bearer token
"""
"""
:param string name: (required) name of the resource
"""
try:
state = GitwebhooksStateImpl(hub, ctx)
return await state.absent(hub, ctx, name, **kwargs)
except Exception as error:
hub.log.error("Error during enforcing absent state: gitwebhooks")
hub.log.error(str(error))
raise error
[docs]async def describe(hub, ctx):
try:
state = GitwebhooksStateImpl(hub, ctx)
return await state.describe(hub, ctx)
except Exception as error:
hub.log.error("Error during describe: gitwebhooks")
hub.log.error(str(error))
raise error
[docs]def is_pending(hub, ret: dict, state: str = None, **pending_kwargs):
try:
state = GitwebhooksStateImpl(hub, None)
return state.is_pending(hub, ret, state, **pending_kwargs)
except Exception as error:
hub.log.error("Error during is_pending: gitwebhooks")
hub.log.error(str(error))
raise error
[docs]class GitwebhooksState:
def __init__(self, hub, ctx):
self.hub = hub
self.ctx = ctx
[docs] async def present(self, hub, ctx, name: str, **kwargs):
search_result = (await self.paginate_find(hub, ctx))["ret"]
for s in search_result.content:
if name == s["name"] and True:
hub.log.info(
f'Returning resource gitwebhooks "{s["name"]}" due to existing resource "{name}"'
)
s = await self.remap_resource_structure(hub, ctx, s)
return StateReturn(
result=True,
comment=f"Resource gitwebhooks {name} already exists.",
old=s,
new=s,
)
res = (
await hub.exec.vra.pipeline.triggers.create_git_webhook_using_post(
ctx, name, **kwargs
)
)["ret"]
res = await self.remap_resource_structure(hub, ctx, res)
return StateReturn(
result=True,
comment=f"Creation of gitwebhooks {name} success.",
old=None,
new=res,
)
[docs] async def absent(self, hub, ctx, name: str, **kwargs):
search_result = (await self.paginate_find(hub, ctx))["ret"]
resource = None
for s in search_result.content:
if name == s["name"] and True:
hub.log.info(
f'Found resource gitwebhooks "{s["name"]}" due to existing resource "{name}"'
)
s = await self.remap_resource_structure(hub, ctx, s)
resource = s
if resource:
# it exists!
delete_kwargs = {}
delete_kwargs["p_id"] = resource.get("id")
hub.log.debug(
f"gitwebhooks with name = {resource.get('name')} already exists"
)
await hub.exec.vra.pipeline.triggers.delete_git_webhook_by_id_using_delete(
ctx, **delete_kwargs
)
return StateReturn(
result=True,
comment=f"Resource with name = {resource.get('name')} deleted.",
old=resource,
new=None,
)
return StateReturn(
result=True,
comment=f"Resource with name = {name} is already absent.",
old=None,
new=None,
)
[docs] async def describe(self, hub, ctx):
result = {}
res = await self.paginate_find(hub, ctx)
for obj in res.get("ret", {}).get("content", []):
# Keep track of name and id properties as they may get remapped
obj_name = obj.get("name", "unknown")
obj_id = obj.get("id", "unknown")
obj = await self.remap_resource_structure(hub, ctx, obj)
# Define props
props = [{key: value} for key, value in obj.items()]
# Build result
result[f"{obj_name}-{obj_id.split('-')[-1]}"] = {
"vra.pipeline.gitwebhooks.present": props
}
return result
[docs] async def paginate_find(self, hub, ctx, **kwargs):
"""
Paginate through all resources using their 'find' method.
"""
res = await hub.exec.vra.pipeline.triggers.get_all_git_webhooks_using_get(
ctx, **kwargs
)
numberOfElements = res.get("ret", {}).get("numberOfElements", 0)
totalElements = res.get("ret", {}).get("totalElements", 0)
initialElements = numberOfElements
if numberOfElements != totalElements and totalElements != 0:
while initialElements < totalElements:
hub.log.debug(
f"Requesting gitwebhooks with offset={initialElements} out of {totalElements}"
)
pres = (
await hub.exec.vra.pipeline.triggers.get_all_git_webhooks_using_get(
ctx, skip=initialElements
)
)
initialElements += pres.get("ret", {}).get("numberOfElements", 0)
aggO = res.get("ret", {}).get("content", [])
aggN = pres.get("ret", {}).get("content", [])
res["ret"]["content"] = [*aggO, *aggN]
res["ret"]["numberOfElements"] = initialElements
return res
[docs] def is_pending(self, hub, ret: dict, state: str = None, **pending_kwargs):
"""
State reconciliation
"""
hub.log.debug(f'Running is_pending for resource: {ret.get("__id__", None)}...')
is_pending_result = False
hub.log.debug(
f'is_pending_result for resource "{ret.get("__id__", None)}": {is_pending_result}'
)
return is_pending_result
[docs] async def remap_resource_structure(self, hub, ctx, obj: dict) -> dict:
schema_mapper = {
"add": [],
"omit": [
"_createTimeInMicros",
"_updateTimeInMicros",
"_link",
"updatedAt",
"updatedBy",
"_projectId",
"createdAt",
"createdBy",
"version",
],
}
# Perform resource mapping by adding properties and omitting properties.
# Property renaming is addition followed by omission.
if schema_mapper:
resource_name = "gitwebhooks"
hub.log.debug(f"Remapping resource {resource_name}...")
obj = await add_properties(obj, schema_mapper.get("add", []))
obj = omit_properties(obj, schema_mapper.get("omit", []))
return obj
# ====================================
# State override
# ====================================
[docs]class GitwebhooksStateImpl(GitwebhooksState):
[docs] async def present(self, hub, ctx, name: str, type: Any, value: Any, **kwargs):
search_result = (await self.paginate_find(hub, ctx))["ret"]
for itm in search_result.get("links", []):
s = search_result.documents.get(itm)
if name == s.get("name", "") and True:
hub.log.info(
f'Returning resource triggers "{s["name"]}" due to existing resource "{name}"'
)
s = await self.remap_resource_structure(hub, ctx, s)
return StateReturn(
result=True,
comment=f"Resource triggers {name} already exists.",
old=s,
new=s,
)
res = (
await hub.exec.vra.pipeline.triggers.create_variable_using_post(
ctx, name, type, value, **kwargs
)
)["ret"]
res = await self.remap_resource_structure(hub, ctx, res)
return StateReturn(
result=True,
comment=f"Creation of triggers {name} success.",
old=None,
new=res,
)
[docs] async def absent(self, hub, ctx, name: str, **kwargs):
search_result = (await self.paginate_find(hub, ctx))["ret"]
resource = None
for itm in search_result.get("links", []):
s = search_result.documents.get(itm)
if name == s.get("name", "") and True:
hub.log.info(
f'Found resource triggers "{s["name"]}" due to existing resource "{name}"'
)
s = await self.remap_resource_structure(hub, ctx, s)
resource = s
if resource:
# it exists!
delete_kwargs = {}
delete_kwargs["p_id"] = resource.get("id")
hub.log.debug(f"triggers with name = {resource.get('name')} already exists")
await hub.exec.vra.pipeline.triggers.delete_variable_by_id_using_delete(
ctx, **delete_kwargs
)
return StateReturn(
result=True,
comment=f"Resource with name = {resource.get('name')} deleted.",
old=resource,
new=None,
)
return StateReturn(
result=True,
comment=f"Resource with name = {name} is already absent.",
old=None,
new=None,
)
[docs] async def describe(self, hub, ctx):
result = {}
res = await self.paginate_find(hub, ctx)
for itm in res.get("ret", {}).get("links", []):
# Keep track of name and id properties as they may get remapped
obj = res.get("ret", {}).get("documents", {}).get(itm)
obj_name = obj.get("name", "unknown")
obj_id = obj.get("id", "unknown")
obj = await self.remap_resource_structure(hub, ctx, obj)
# Define props
props = [{key: value} for key, value in obj.items()]
# Build result
result[f"{obj_name}-{obj_id.split('-')[-1]}"] = {
"vra.pipeline.triggers.present": props
}
return result