Source code for idem_vra.states.vra.pipeline.gitwebhooks

from typing import Any

from idem_vra.helpers.mapper import add_properties
from idem_vra.helpers.mapper import omit_properties
from idem_vra.helpers.models import StateReturn


__contracts__ = ["resource"]

TREQ = {"present": {"require": ["vra.iaas.project.present"]}, "absent": {"require": []}}


[docs]async def present(hub, ctx, name: str, **kwargs): """ :param string name: (required in body) A human-friendly name used as an identifier in APIs that support this option :param string apiVersion: (optional in query) The version of the API in yyyy-MM-dd format (UTC). For versioning information please refer to /codestream/api/about :param string Authorization: (optional in header) Bearer token :param string branchName: (optional in body) Branch name for which the webhook has been configured. :param integer delayTimeInMins: (optional in body) Delay time after which the pipeline execution is triggered. :param string description: (optional in body) A human-friendly description. :param string endpoint: (optional in body) Git endpoint. :param string eventCategory: (optional in body) Git event type. :param array exclusions: (optional in body) Provide file exclusions as conditions for the trigger. :param string externalListenerLink: (optional in body) Git webhook listener link. :param array inclusions: (optional in body) Provide file inclusions as conditions for the trigger. :param object input: (optional in body) Pipeline Execution input properties. :param boolean insecureSsl: (optional in body) verify SSL certificates when delivering payloads :param string pipeline: (optional in body) Pipeline name which is meant to be triggered when a git event occur. :param boolean prioritizeExclusion: (optional in body) Prioritize Exclusion ensures that pipelines are not triggered even if any ” + “of the files in a commit match the specified files in the exclusion paths or regex. :param string project: (optional in body) The project this entity belongs to. :param string refreshToken: (optional in body) Codestream API token. :param string repoName: (optional in body) Repo name for which the webhook has been configured. :param string secretToken: (optional in body) Secret token to validate received payloads. :param string serverType: (optional in body) Git Server Type. :param string serverWebhookId: (optional in body) Git webhook id. """ try: state = GitwebhooksStateImpl(hub, ctx) return await state.present(hub, ctx, name, **kwargs) except Exception as error: hub.log.error("Error during enforcing present state: gitwebhooks") hub.log.error(str(error)) raise error
[docs]async def absent(hub, ctx, name: str, **kwargs): """ :param string p_id: (required in path) id :param string apiVersion: (optional in query) The version of the API in yyyy-MM-dd format (UTC). For versioning information please refer to /codestream/api/about :param string Authorization: (optional in header) Bearer token """ """ :param string name: (required) name of the resource """ try: state = GitwebhooksStateImpl(hub, ctx) return await state.absent(hub, ctx, name, **kwargs) except Exception as error: hub.log.error("Error during enforcing absent state: gitwebhooks") hub.log.error(str(error)) raise error
[docs]async def describe(hub, ctx): try: state = GitwebhooksStateImpl(hub, ctx) return await state.describe(hub, ctx) except Exception as error: hub.log.error("Error during describe: gitwebhooks") hub.log.error(str(error)) raise error
[docs]def is_pending(hub, ret: dict, state: str = None, **pending_kwargs): try: state = GitwebhooksStateImpl(hub, None) return state.is_pending(hub, ret, state, **pending_kwargs) except Exception as error: hub.log.error("Error during is_pending: gitwebhooks") hub.log.error(str(error)) raise error
[docs]class GitwebhooksState: def __init__(self, hub, ctx): self.hub = hub self.ctx = ctx
[docs] async def present(self, hub, ctx, name: str, **kwargs): search_result = (await self.paginate_find(hub, ctx))["ret"] for s in search_result.content: if name == s["name"] and True: hub.log.info( f'Returning resource gitwebhooks "{s["name"]}" due to existing resource "{name}"' ) s = await self.remap_resource_structure(hub, ctx, s) return StateReturn( result=True, comment=f"Resource gitwebhooks {name} already exists.", old=s, new=s, ) res = ( await hub.exec.vra.pipeline.triggers.create_git_webhook_using_post( ctx, name, **kwargs ) )["ret"] res = await self.remap_resource_structure(hub, ctx, res) return StateReturn( result=True, comment=f"Creation of gitwebhooks {name} success.", old=None, new=res, )
[docs] async def absent(self, hub, ctx, name: str, **kwargs): search_result = (await self.paginate_find(hub, ctx))["ret"] resource = None for s in search_result.content: if name == s["name"] and True: hub.log.info( f'Found resource gitwebhooks "{s["name"]}" due to existing resource "{name}"' ) s = await self.remap_resource_structure(hub, ctx, s) resource = s if resource: # it exists! delete_kwargs = {} delete_kwargs["p_id"] = resource.get("id") hub.log.debug( f"gitwebhooks with name = {resource.get('name')} already exists" ) await hub.exec.vra.pipeline.triggers.delete_git_webhook_by_id_using_delete( ctx, **delete_kwargs ) return StateReturn( result=True, comment=f"Resource with name = {resource.get('name')} deleted.", old=resource, new=None, ) return StateReturn( result=True, comment=f"Resource with name = {name} is already absent.", old=None, new=None, )
[docs] async def describe(self, hub, ctx): result = {} res = await self.paginate_find(hub, ctx) for obj in res.get("ret", {}).get("content", []): # Keep track of name and id properties as they may get remapped obj_name = obj.get("name", "unknown") obj_id = obj.get("id", "unknown") obj = await self.remap_resource_structure(hub, ctx, obj) # Define props props = [{key: value} for key, value in obj.items()] # Build result result[f"{obj_name}-{obj_id.split('-')[-1]}"] = { "vra.pipeline.gitwebhooks.present": props } return result
[docs] async def paginate_find(self, hub, ctx, **kwargs): """ Paginate through all resources using their 'find' method. """ res = await hub.exec.vra.pipeline.triggers.get_all_git_webhooks_using_get( ctx, **kwargs ) numberOfElements = res.get("ret", {}).get("numberOfElements", 0) totalElements = res.get("ret", {}).get("totalElements", 0) initialElements = numberOfElements if numberOfElements != totalElements and totalElements != 0: while initialElements < totalElements: hub.log.debug( f"Requesting gitwebhooks with offset={initialElements} out of {totalElements}" ) pres = ( await hub.exec.vra.pipeline.triggers.get_all_git_webhooks_using_get( ctx, skip=initialElements ) ) initialElements += pres.get("ret", {}).get("numberOfElements", 0) aggO = res.get("ret", {}).get("content", []) aggN = pres.get("ret", {}).get("content", []) res["ret"]["content"] = [*aggO, *aggN] res["ret"]["numberOfElements"] = initialElements return res
[docs] def is_pending(self, hub, ret: dict, state: str = None, **pending_kwargs): """ State reconciliation """ hub.log.debug(f'Running is_pending for resource: {ret.get("__id__", None)}...') is_pending_result = False hub.log.debug( f'is_pending_result for resource "{ret.get("__id__", None)}": {is_pending_result}' ) return is_pending_result
[docs] async def remap_resource_structure(self, hub, ctx, obj: dict) -> dict: schema_mapper = { "add": [], "omit": [ "_createTimeInMicros", "_updateTimeInMicros", "_link", "updatedAt", "updatedBy", "_projectId", "createdAt", "createdBy", "version", ], } # Perform resource mapping by adding properties and omitting properties. # Property renaming is addition followed by omission. if schema_mapper: resource_name = "gitwebhooks" hub.log.debug(f"Remapping resource {resource_name}...") obj = await add_properties(obj, schema_mapper.get("add", [])) obj = omit_properties(obj, schema_mapper.get("omit", [])) return obj
# ==================================== # State override # ====================================
[docs]class GitwebhooksStateImpl(GitwebhooksState):
[docs] async def present(self, hub, ctx, name: str, type: Any, value: Any, **kwargs): search_result = (await self.paginate_find(hub, ctx))["ret"] for itm in search_result.get("links", []): s = search_result.documents.get(itm) if name == s.get("name", "") and True: hub.log.info( f'Returning resource triggers "{s["name"]}" due to existing resource "{name}"' ) s = await self.remap_resource_structure(hub, ctx, s) return StateReturn( result=True, comment=f"Resource triggers {name} already exists.", old=s, new=s, ) res = ( await hub.exec.vra.pipeline.triggers.create_variable_using_post( ctx, name, type, value, **kwargs ) )["ret"] res = await self.remap_resource_structure(hub, ctx, res) return StateReturn( result=True, comment=f"Creation of triggers {name} success.", old=None, new=res, )
[docs] async def absent(self, hub, ctx, name: str, **kwargs): search_result = (await self.paginate_find(hub, ctx))["ret"] resource = None for itm in search_result.get("links", []): s = search_result.documents.get(itm) if name == s.get("name", "") and True: hub.log.info( f'Found resource triggers "{s["name"]}" due to existing resource "{name}"' ) s = await self.remap_resource_structure(hub, ctx, s) resource = s if resource: # it exists! delete_kwargs = {} delete_kwargs["p_id"] = resource.get("id") hub.log.debug(f"triggers with name = {resource.get('name')} already exists") await hub.exec.vra.pipeline.triggers.delete_variable_by_id_using_delete( ctx, **delete_kwargs ) return StateReturn( result=True, comment=f"Resource with name = {resource.get('name')} deleted.", old=resource, new=None, ) return StateReturn( result=True, comment=f"Resource with name = {name} is already absent.", old=None, new=None, )
[docs] async def describe(self, hub, ctx): result = {} res = await self.paginate_find(hub, ctx) for itm in res.get("ret", {}).get("links", []): # Keep track of name and id properties as they may get remapped obj = res.get("ret", {}).get("documents", {}).get(itm) obj_name = obj.get("name", "unknown") obj_id = obj.get("id", "unknown") obj = await self.remap_resource_structure(hub, ctx, obj) # Define props props = [{key: value} for key, value in obj.items()] # Build result result[f"{obj_name}-{obj_id.split('-')[-1]}"] = { "vra.pipeline.triggers.present": props } return result