Source code for idem_vra.states.vra.catalog.catalogsources

from typing import Any

from idem_vra.helpers.mapper import add_properties
from idem_vra.helpers.mapper import omit_properties
from idem_vra.helpers.models import StateReturn


__contracts__ = ["resource"]

TREQ = {"present": {"require": ["vra.iaas.project.present"]}, "absent": {"require": []}}


[docs]async def present(hub, ctx, name: str, config: Any, id: Any, typeId: Any, **kwargs): """ :param object config: (required in body) Source custom configuration :param string id: (required in body) Catalog Source id :param string name: (required in body) Catalog Source name :param string typeId: (required in body) Type of source, e.g. blueprint, CFT... etc :param string apiVersion: (optional in query) The version of the API in yyyy-MM-dd format (UTC). If you do not specify explicitly an exact version, you will be calling the latest supported API version. :param boolean validationOnly: (optional in query) If true, the source will not be created. It returns the number of items belonging to the source. The request will still return an error code if the source is invalid. :param string createdAt: (optional in body) Creation time :param string createdBy: (optional in body) Created By :param string description: (optional in body) Catalog Source description :param boolean global: (optional in body) Global flag indicating that all the items can be requested across all projects. :param string iconId: (optional in body) Default Icon Id :param integer itemsFound: (optional in body) Number of items found :param integer itemsImported: (optional in body) Number of items imported. :param string lastImportCompletedAt: (optional in body) Last import completion time :param array lastImportErrors: (optional in body) Last import error(s) :param string lastImportStartedAt: (optional in body) Last import start time :param string lastUpdatedAt: (optional in body) Update time :param string lastUpdatedBy: (optional in body) Updated By :param string projectId: (optional in body) Project id where the source belongs """ try: state = CatalogsourcesState(hub, ctx) return await state.present(hub, ctx, name, config, id, typeId, **kwargs) except Exception as error: hub.log.error("Error during enforcing present state: catalogsources") hub.log.error(str(error)) raise error
[docs]async def absent(hub, ctx, name: str, **kwargs): """ :param string p_sourceId: (required in path) Catalog source ID :param string apiVersion: (optional in query) The version of the API in yyyy-MM-dd format (UTC). If you do not specify explicitly an exact version, you will be calling the latest supported API version. """ """ :param string name: (required) name of the resource """ try: state = CatalogsourcesState(hub, ctx) return await state.absent(hub, ctx, name, **kwargs) except Exception as error: hub.log.error("Error during enforcing absent state: catalogsources") hub.log.error(str(error)) raise error
[docs]async def describe(hub, ctx): try: state = CatalogsourcesState(hub, ctx) return await state.describe(hub, ctx) except Exception as error: hub.log.error("Error during describe: catalogsources") hub.log.error(str(error)) raise error
[docs]def is_pending(hub, ret: dict, state: str = None, **pending_kwargs): try: state = CatalogsourcesState(hub, None) return state.is_pending(hub, ret, state, **pending_kwargs) except Exception as error: hub.log.error("Error during is_pending: catalogsources") hub.log.error(str(error)) raise error
[docs]class CatalogsourcesState: def __init__(self, hub, ctx): self.hub = hub self.ctx = ctx
[docs] async def present( self, hub, ctx, name: str, config: Any, id: Any, typeId: Any, **kwargs ): search_result = (await self.paginate_find(hub, ctx))["ret"] for s in search_result.content: if name == s["name"] and True: hub.log.info( f'Returning resource catalogsources "{s["name"]}" due to existing resource "{name}"' ) s = await self.remap_resource_structure(hub, ctx, s) return StateReturn( result=True, comment=f"Resource catalogsources {name} already exists.", old=s, new=s, ) res = ( await hub.exec.vra.catalog.catalogsources.post_using_post2( ctx, config, id, name, typeId, **kwargs ) )["ret"] res = await self.remap_resource_structure(hub, ctx, res) return StateReturn( result=True, comment=f"Creation of catalogsources {name} success.", old=None, new=res, )
[docs] async def absent(self, hub, ctx, name: str, **kwargs): search_result = (await self.paginate_find(hub, ctx))["ret"] resource = None for s in search_result.content: if name == s["name"] and True: hub.log.info( f'Found resource catalogsources "{s["name"]}" due to existing resource "{name}"' ) s = await self.remap_resource_structure(hub, ctx, s) resource = s if resource: # it exists! delete_kwargs = {} delete_kwargs["p_sourceId"] = resource.get("sourceId") hub.log.debug( f"catalogsources with name = {resource.get('name')} already exists" ) await hub.exec.vra.catalog.catalogsources.delete_using_delete4( ctx, **delete_kwargs ) return StateReturn( result=True, comment=f"Resource with name = {resource.get('name')} deleted.", old=resource, new=None, ) return StateReturn( result=True, comment=f"Resource with name = {name} is already absent.", old=None, new=None, )
[docs] async def describe(self, hub, ctx): result = {} res = await self.paginate_find(hub, ctx) for obj in res.get("ret", {}).get("content", []): # Keep track of name and id properties as they may get remapped obj_name = obj.get("name", "unknown") obj_id = obj.get("id", "unknown") obj = await self.remap_resource_structure(hub, ctx, obj) # Define props props = [{key: value} for key, value in obj.items()] # Build result result[f"{obj_name}-{obj_id.split('-')[-1]}"] = { "vra.catalog.catalogsources.present": props } return result
[docs] async def paginate_find(self, hub, ctx, **kwargs): """ Paginate through all resources using their 'find' method. """ res = await hub.exec.vra.catalog.catalogsources.get_page_using_get2( ctx, **kwargs ) numberOfElements = res.get("ret", {}).get("numberOfElements", 0) totalElements = res.get("ret", {}).get("totalElements", 0) initialElements = numberOfElements if numberOfElements != totalElements and totalElements != 0: while initialElements < totalElements: hub.log.debug( f"Requesting catalogsources with offset={initialElements} out of {totalElements}" ) pres = await hub.exec.vra.catalog.catalogsources.get_page_using_get2( ctx, skip=initialElements ) initialElements += pres.get("ret", {}).get("numberOfElements", 0) aggO = res.get("ret", {}).get("content", []) aggN = pres.get("ret", {}).get("content", []) res["ret"]["content"] = [*aggO, *aggN] res["ret"]["numberOfElements"] = initialElements return res
[docs] def is_pending(self, hub, ret: dict, state: str = None, **pending_kwargs): """ State reconciliation """ hub.log.debug(f'Running is_pending for resource: {ret.get("__id__", None)}...') is_pending_result = False hub.log.debug( f'is_pending_result for resource "{ret.get("__id__", None)}": {is_pending_result}' ) return is_pending_result
[docs] async def remap_resource_structure(self, hub, ctx, obj: dict) -> dict: schema_mapper = { "add": [], "omit": [ "createdAt", "createdBy", "lastImportCompletedAt", "lastImportErrors", "lastImportStartedAt", "lastUpdatedAt", "lastUpdatedBy", ], } # Perform resource mapping by adding properties and omitting properties. # Property renaming is addition followed by omission. if schema_mapper: resource_name = "catalogsources" hub.log.debug(f"Remapping resource {resource_name}...") obj = await add_properties(obj, schema_mapper.get("add", [])) obj = omit_properties(obj, schema_mapper.get("omit", [])) return obj