Source code for ampapi.base

from __future__ import annotations

import asyncio
import copy
import functools
import json
import logging
import re
from dataclasses import fields, is_dataclass
from datetime import datetime
from pprint import pformat
from typing import TYPE_CHECKING, Any, ClassVar, Optional, ParamSpec, Union, overload

import aiohttp
from aiohttp import ClientResponse
from dataclass_wizard import fromdict
from pyotp import TOTP

from .backoff import ExponentialBackoff
from .bridge import Bridge
from .modules import ActionResult, ActionResultError, APISession, BuildInfo, Diagnostics, LoginResults, Status

if TYPE_CHECKING:
    from collections.abc import Callable, Coroutine, Iterable
    from datetime import timedelta
    from typing import Concatenate

    from _typeshed import DataclassInstance
    from typing_extensions import ParamSpec, Self, TypeVar

    from .modules import APIResponseDataTableAlias, Controller, Instance, InstanceStatus, Updates

    D = TypeVar("D", bound="Base")
    T = ParamSpec("T")
    F = TypeVar("F")
    X = TypeVar("X", bound=DataclassInstance)

__all__ = ("Base",)

FORMAT_DATA: bool = True


APIReturnTypeAlias = Union[LoginResults, ActionResultError, ActionResult]


[docs] class Base: """ Contains the base functions for all AMP API endpoints and handles the parsing of Bridge data. .. warning:: Do not overwrite or alter the :attr:`instance_id`. .. note:: A session expires after 240sec of inactivity per Cube Coders AMP. If for any reason you want to change that value; set the attribute :attr:`session_ttl`.\n Attributes ----------- api_url: :class:`str` The URL to access the Web Panel. This comes from your :class:`APIParams`. session_ttl: :class:`int` How long before the session id expires in seconds, default is 240 seconds. instance_id: :class:`str` The Instance id is a string determined by AMP. \n This attribute will be set automatically after making a :meth:`login` request, default is "O". session: :class:`aiohttp.ClientSession` A static Session to use, otherwise the class will generate it's own as needed. """ # Private Attributes logger: ClassVar[logging.Logger] = logging.getLogger(__name__) _bridge: Bridge _backoff: ExponentialBackoff # Public Attributes url: str instance_id: str session_ttl: ClassVar[int] = 240 module: str # TODO - make this var unchangeable via private attr in future release. # Error response strings. _ads_only: ClassVar[str] = "This API call is only available to <class:`ADSModule`> type classes." _failed_api: ClassVar[str] = "The API call returned a malformed response." _minecraft_only: ClassVar[str] = "This API call is only available on Minecraft type instances." _no_bridge: ClassVar[str] = "Failed to setup connection. You need to initiate `<class Bridge>` first." _no_controller: ClassVar[str] = ( "The function failed as the <class:`AMPControllerInstance`> was not properly initialized and set." ) _no_data: ClassVar[str] = "Failed to receive any data from post request." _unauthorized_access: ClassVar[str] = "The user does not have the required permissions to interact with this instance." _instance_offline: ClassVar[str] = "The requested Instance is not available at this time. | URL: %s" _version_unavailable: ClassVar[str] = "The API call %s is no longer available at this version of AMP %s" # These are used to handle JSON keys that cannot be parsed properly via regex. # See :func:`camel_to_snake_re` json_key_mapping: ClassVar[dict[str, str]] = { "ContainerCPUs": "container_cpus", "InstalledRAMMB": "installed_ram_mb", "FreeRAMMB": "free_ram_mb", "AvailableIPs": "available_ips", "SecurityandPrivacy": "security_and_privacy", } def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: self.url = "" self.instance_id = "0" bridge: Bridge = Bridge._get_bridge() self._backoff = ExponentialBackoff() # Validate the bridge object is at the same memory address. self.logger.debug("DEBUG %s __init__ %s", type(self).__name__, id(self)) self.logger.debug("bridge object -> %s", pformat(bridge)) self.session: aiohttp.ClientSession | None = session if isinstance(bridge, Bridge): self.parse_bridge(bridge=bridge) # def __del__(self) -> None: # try: # asyncio.run(self.__adel__()) # self.logger.debug("Closing the open `aiohttp.ClientSession`| Session: %s", self.session) # except RuntimeError: # self.logger.error("Failed to close our `aiohttp.ClientSession`") # async def __adel__(self) -> None: # if self.session is not None: # await self.session.close() @property def format_data(self) -> bool: """ Controls whether the data returned from an API endpoint is formatted or not.\n Default is ``True`` which comes from the global parameter ``FORMAT_DATA``. .. note:: ``True`` = formatted \n ``False`` = unformatted Returns -------- :class:`bool` Returns True or False. """ global FORMAT_DATA return FORMAT_DATA @format_data.setter def format_data(self, value: bool) -> None: global FORMAT_DATA FORMAT_DATA = value
[docs] @staticmethod def ads_only( func: Callable[Concatenate[D, T], Coroutine[None, None, F]], ) -> Callable[Concatenate[D, T], Coroutine[None, None, F]]: """ Checks the class attribute ``.module`` and is equal to ``ADS`` or if the type of Instance using the function is AMPADSInstance. Parameters ---------- func : Callable[Concatenate[D, T], Coroutine[None, None, F]] The function the decorator is wrapping. Returns ------- Callable[Concatenate[D, T], Coroutine[None, None, F]] The function the decorator is wrapping. Raises ------ RuntimeError The API call is only allowed to be run on a type(:py:class:`AMPADSInstance`). """ @functools.wraps(wrapped=func) def wrapper_ads_only(self: D, *args: T.args, **kwargs: T.kwargs) -> Coroutine[None, None, F]: from .adsmodule import ADSModule from .instance import AMPADSInstance if self.module == "ADS" or type(self) is AMPADSInstance or isinstance(self, ADSModule): return func(self, *args, **kwargs) else: raise RuntimeError(self._ads_only) return wrapper_ads_only
async def _call_api( self, api: str, parameters: Union[None, dict[str, Any]] = None, format_data: Union[bool, None] = None, format_: Union[type[X], type[APIResponseDataTableAlias], None] = None, sanitize_json: bool = True, _use_from_dict: bool = True, _auto_unpack: bool = True, _no_data: bool = False, ) -> Any: """|coro| Uses :class:`aiohttp.ClientSession` post request to access the AMP API endpoints. \n .. note:: Will populate the ``SESSIONID`` key for :param:`parameters` if it is not provided. This is the default behavior. .. warning:: Will return an :class:`ActionResultError` class if any errors happen when attempting to call the API. Parameters ----------- api: :class:`str` The API endpoint to call, eg ``Core/GetModuleInfo``. parameters: Union[None, dict[:class:`str`, Any]], optional The parameters to pass to the API endpoint, by default None. format_data: Union[:class:`bool`, None], optional Format the JSON response data. (Uses ``FORMAT_DATA`` global constant if None), by default None. format_: :py:class:`DataclassInstance`, optional The dataclass the JSON response will formatted to, by default None. sanitize_json: :class:`bool`, optional Replaces invalid characters in our JSON responses, by default True. _use_from_dict: :class:`bool`, optional Controls whether the data will use :meth:`fromdict` of dataclass wizard to unpack the data. Typical usage case is to handle nested :class:`DataclassInstance`, by default True. _auto_unpack: :class:`bool`, optional Controls whether the data will be unpacked automatically via ``(**data)``, by default True. _no_data: :class:`bool`, optional Informs the connection that the API does not have a JSON response, by default False. Returns -------- Any Typical returns are of the same type that is passed in to ``format_``, either in an :class:`Iterable` or not depending on the data, otherwise returns an unformatted JSON response if :attr:`format_data` or ``FORMAT_DATA`` is False. """ # Old Docstring Content # Raises # ------ # :exc:`ValueError` # When a JSON response :attr:`ClientSession.content_length` == 0 or :class:`aiohttp.ClientSession` raises an Exception.\n # When the API endpoint returns a malformed JSON response. # :exc:`ConnectionError` # When an JSON response status code is not 200.\n # When an JSON response has a dict key value of "Instance Unavailable. # :exc:`PermissionError` # When the JSON response has a dict key value of "Unauthorized Access" or permission related error. global FORMAT_DATA header: dict = {"Accept": "text/javascript"} post_req: ClientResponse | None self.logger.debug("_call_api -> %s was called with %s", api, parameters) # This should save us some boiler plate code throughout our API calls. if parameters is None: parameters = {} api_session: APISession = self._bridge._sessions.get(self.instance_id, APISession(id="0", ttl=datetime.now())) if isinstance(api_session, APISession): parameters["SESSIONID"] = api_session.id json_data: str = json.dumps(obj=parameters) _url: str = self.url + "/API/" + api self.logger.debug( "SESSION GET %s | API CALL: %s | API URL: %s | DATA: %s", self.instance_id, api, _url, pformat(json_data) ) if self.session is None: self.session = aiohttp.ClientSession() try: post_req = await self.session.post(url=_url, headers=header, data=json_data) # We have a dynamic backoff function to prevent reconnect attempts to frequently. except RuntimeError as e: # ? Suggestion # Attempting to re-open the session if it is somehow closed during usage. if isinstance(e.args[0], str) and "session is closed" in e.args[0].lower(): self.session = aiohttp.ClientSession() retry: float = self._backoff.delay() self.logger.error( "<Base._call_api> encountered a <RuntimeError> and will retry in %s. | Exception: %s", retry, e ) await asyncio.sleep(delay=retry) return await self._call_api( api=api, parameters=parameters, format_data=format_data, format_=format_, sanitize_json=sanitize_json, _use_from_dict=_use_from_dict, _auto_unpack=_auto_unpack, _no_data=_no_data, ) except Exception as e: retry = self._backoff.delay() self.logger.error("<Base._call_api> encountered an Exception and will retry in %s. | Exception: %s", retry, e) await asyncio.sleep(delay=retry) return ActionResultError(status=False, reason="UNK", result=ValueError(e)) if post_req.content_length == 0: return ActionResultError(status=False, reason="Content Length is 0", result=ValueError(self._no_data)) # raise ValueError(self._no_data) if post_req.status != 200: return ActionResultError( status=False, reason="Status Code not equal to 200", result=ConnectionError(self._no_data) ) # raise ConnectionError(self._no_data) post_req_json: Any = await post_req.json() if post_req_json is None and _no_data is False: return ActionResultError( status=False, reason="JSON is None and Data is None", result=ConnectionError(self._no_data) ) # raise ConnectionError(self._no_data) elif _no_data is True: return None # They removed "result" from all replies thus breaking most if not all future code. # This was an old example from pre 2.3 AMP API that could have the following return: # `{'resultReason': 'Internal Auth - No reason given', 'success': False, 'result': 0}` self.logger.debug( "URL: %s | aiohttp.ClientResponse.json() type: %s | _call_api parameters: %s", api, type(post_req_json), parameters, ) self.logger.debug("aiohttp.ClientResponse.json() formatted: %s", pformat(post_req_json)) if sanitize_json is True: post_req_json = self.sanitize_json(post_req_json) self.logger.debug("Sanitize json: %s | Sanitized data: %s", sanitize_json, pformat(post_req_json)) if isinstance(post_req_json, dict): if "title" in post_req_json: post_req_json = post_req_json["title"] if isinstance(post_req_json, str) and ( post_req_json == "Unauthorized Access" or post_req_json == "Instance Unavailable" ): self.logger.error("%s failed because of %s", api, post_req_json) api_session = APISession(id="0", ttl=datetime.now()) self._bridge._sessions.update({self.instance_id: api_session}) if post_req_json == "Unauthorized Access": return ActionResultError( status=False, reason="Unauthorized Access", result=PermissionError(self._unauthorized_access) ) # raise PermissionError(self._unauthorized_access) elif post_req_json == "Instance Unavailable": return ActionResultError( status=False, reason="Instance Unavailable", result=ConnectionError(self._instance_offline, self.url), ) # raise ConnectionError(self._instance_offline, self.url) elif api == "Core/Login": return LoginResults(**post_req_json) # ? Suggestion # This is breaking newer code as it's grabbing the inner key versus older version of AMP having two `result` keys. # We may need to re-add this fuctionality in a different manner going forward. # elif "result" in post_req_json: # post_req_json = post_req_json["result"] # if isinstance(post_req_json, bool) and post_req_json is False: # self.logger.error("%s failed because of %s", api, post_req_json) # raise ValueError(self._failed_api) elif isinstance(post_req_json, dict) and "status" in post_req_json and post_req_json["status"] is False: self.logger.error("%s failed because of Status: %s", api, post_req_json) return ActionResultError(status=False, reason="Status is False", result=ValueError(self._failed_api)) # return ValueError(self._failed_api) self.logger.debug( "DEBUG: FORMAT DATA | local Format Data: %s | global Format Data: %s | format_: %s | POST REQ TYPE: %s", format_data, FORMAT_DATA, format_, type(post_req_json), ) if (format_ is None or format_data is False) or (format_data is None and FORMAT_DATA is False): return post_req_json elif isinstance(post_req_json, (dict, list)) and ( (format_data is True) or (format_data is None and FORMAT_DATA is True) ): return self.json_to_dataclass( json=post_req_json, format_=format_, _use_from_dict=_use_from_dict, _auto_unpack=_auto_unpack ) else: return post_req_json async def _connect(self) -> LoginResults | None: """|coro| Logs into AMP via "API/Core/Login" endpoint using your :class:`Bridge` object. .. note:: If Applicable handles your 2FA using :class:`TOTP` \n Stores the ``SESSIONID`` via :class:`APISession` dataclass for future usage inside the :class:`Bridge` object. Returns -------- :class:`LoginResults` | None The results from ``API/Core/Login`` as a dataclass. Raises ------- :exc:`ValueError` If the 2 Factor Authentication code is not a formatted properly aka the :attr:`~Bridge.token` when making the :py:class:`Bridge` object. """ code: Union[str, TOTP] = "" # get our InstanceID and use it to key for session_id session: APISession = self._bridge._sessions.get(self.instance_id, APISession(id="0", ttl=datetime.now())) if isinstance(session, APISession): ttl: timedelta = datetime.now() - session.ttl if ttl.seconds > self.session_ttl: sessionID = "0" else: sessionID: str = session.id if sessionID == "0": if self._bridge.use_2fa is True: try: # Handles time based 2Factory Auth Key/Code code = TOTP(self._bridge.token).now() except AttributeError: raise ValueError( "Please check your 2 Factor Code, should not contain spaces, escape characters and it must be enclosed in quotes!" ) try: parameters: dict[str, Any] = { "username": self._bridge.user, "password": self._bridge.password, "token": code, "rememberMe": True, } result: Any = await self._call_api( api="Core/Login", parameters=parameters, format_data=True, format_=LoginResults ) if isinstance(result, LoginResults): # This is our new sessions table to correlate InstanceID to a sessionID. api_session = APISession(id=result.session_id, ttl=datetime.now()) self._bridge._sessions.update({self.instance_id: api_session}) return result else: self.logger.warning(msg="Failed response from 'API/Core/Login' in <Base>._connect()") return result except Exception as e: self.logger.warning("Core/Login Exception:", exc_info=e) else: return
[docs] async def call_end_point(self, api: str, parameters: None | dict[str, Any] = None) -> dict[str, Any]: """|coro| Universal API function for calling any API endpoint. Some API endpoints require the Instance module type to be ADS. \n See `/api_spec_sheets/ADS_api_spec.md` or `/api_spec_sheets/Minecraft_api_spec.md` for full API endpoints and parameter information. .. note:: Parameter key "SESSIONID" is handled for you. .. warning:: DO NOT include the full URL - *Exclude: www.yourAMPURL.com/API/*\n Parameters ----------- api: :class:`str` The AMP API endpoint to call. eg "Core/GetModuleInfo" parameters : None | dict[:class:`str`, Any], optional The parameters to pass to the API endpoint, by default is None Returns -------- dict[:class:`str`, Any] : The JSON response from the API endpoint. """ await self._connect() result: Any = await self._call_api(api=api, parameters=parameters) return result
[docs] @staticmethod def camel_to_snake_re(data: str) -> str: """ A simple regex pattern applied to a string to remove Camel Casing and apply snake_case. .. note:: This will fail on entries with an underscore between to capital characters. | *eg (Tool_Version = tool__version)*\n Will also not format properly when handling strings that have a multiple uppercase followed by a lowercase. | *eg (ContainerCPUs = container_cp_us)* Parameters ----------- data: :class:`str` The string to be converted. Returns ------- :class:`str` The converted string from CamelCase to snake_case. """ data = re.sub(pattern="(.)([A-Z][a-z]+)", repl=r"\1_\2", string=data) return re.sub(pattern="([a-z0-9])([A-Z])", repl=r"\1_\2", string=data).lower()
[docs] @staticmethod def camel_case_data(data: dict[str, Any]) -> dict[str, Any]: """ Calls the :meth:`title` on every dict key. Parameters ----------- data: dict[:class:`str`, Any] The dictionary to camel case. Returns -------- dict[:class:`str`, Any] The camel cased dict. """ res: dict[str, str | bool | int] = {} for key, value in data.items(): if value is not None: res[key.title()] = value return res
[docs] @staticmethod def dataclass_to_dict(dataclass_: DataclassInstance) -> dict[str, Any]: """ Convert a dataclass to a dictionary. Parameters ----------- dataclass_: :class:`DataclassInstance` The dataclass to convert. Returns -------- dict[:class:`str`, Any] The converted dataclass as a dict. Raises ------- :exc:`TypeError` When the object passed in is not of type(:class:`DataclassInstance`). """ parameters: dict[Any, Any] = {} if is_dataclass(dataclass_) is False: raise TypeError(f"The object {dataclass_} is not of the same type as <dataclass>.") for field in fields(class_or_instance=dataclass_): value: Any = getattr(dataclass_, field.name) if value is None: continue parameters[field.name] = value return parameters
[docs] @staticmethod def json_to_dataclass( json: Iterable[Any], format_: Union[type[X], type[APIResponseDataTableAlias]], _use_from_dict: bool, _auto_unpack: bool, ) -> X | list[APIResponseDataTableAlias | X] | APIResponseDataTableAlias | Iterable[Any]: """ Format the JSON response data to a dataclass. .. note:: All JSON response data will be sanitized before it is turned into a dataclass. See :meth:`sanitize_json`. Parameters ----------- json: Any JSON response data to format. format: Union[:class:`DataclassInstance`, class:`DeploymentTemplate`] Must be of type :class:`DataclassInstance` or similar to unpack the JSON response data. _use_from_dict: :class:`bool` Use :meth:`fromdict` from dataclass_wizard to unpack the JSON response data. - Typically this is used to handle nested :class:`DataclassInstance`. _auto_unpack: :class:`bool` Use ``**data`` to unpack the JSON response data. Returns -------- X | list[:class:`DeploymentTemplate` | X] | :class:`DeploymentTemplate` | None Either a list or single entry of :class:`DataclassInstance`. """ if isinstance(json, list): # _use_from_dict is to handle nested Dataclasses. if _use_from_dict is True: return [fromdict(format_, data) for data in json] # Self explanatory; uses the `**` annotation to unpack our data. elif _auto_unpack is True: return [format_(**data) for data in json] else: return [format_(data) for data in json] # type: ignore elif isinstance(json, dict): # _use_from_dict is to handle nested Dataclasses. if _use_from_dict is True: return fromdict(format_, json) elif _auto_unpack is True: return format_(**json) else: return format_(json) # type: ignore else: return json
[docs] def parse_bridge(self, bridge: Bridge) -> None: """ Takes the :class:`Bridge` object and set's the :attr:`~Base.url` and sets :attr:`_bridge` to our Bridge object. .. note:: Also validates the 2FA token. Parameters ----------- bridge: :class:`Bridge` The :class:`Bridge` object to parse. Raises ------- :exc:`ValueError` If 2FA Token is not provided and :attr:`_use_2fa` == True.\n If 2FA Token is not enclosed in single(',') or double(",") quotes. """ # We use this later on in _connect to update `_session_id`; # so all connections will use the same session id (if possible) self._bridge = bridge self.url = bridge.url if bridge.use_2fa is True: if bridge.token == "": raise ValueError("You must provide a 2FA Token if you are using 2FA.") # elif bridge.token.startswith(("'", '"')) is False or bridge.token.endswith(("'", '"')) is False: # raise ValueError("2FA Token must be enclosed in quotes.") # Removed starting and ending quotes elif len(bridge.token) < 8: raise ValueError( "Your 2FA token appears to be too short (<8 characters). Please use the code that generates the timed based tokens." )
[docs] def parse_data(self, data: Union[Controller, Instance, Status, Updates]) -> Self: """ Takes in a :class:`DataclassInstance` and iterates through it's :meth:`fields` and set's the values as attributes of the :class:`DataclassInstance` that called this function. Parameters ----------- data: Union[:class:`Controller`, :class:`Instance`, :class:`Status`, :class:`Updates`] The :class:`DataclassInstance` to parse. Returns ------- :class:`Self` Returns the class that called this function. """ for field in fields(class_or_instance=data): setattr(self, field.name, getattr(data, field.name)) return self
@overload @classmethod def sanitize_json(cls, json: str) -> str: ... @overload @classmethod def sanitize_json(cls, json: Iterable) -> Iterable[Any]: ...
[docs] @classmethod def sanitize_json(cls, json: Iterable | str) -> Iterable[Any] | str: """|classmethod| Replaces spaces and underscores in the JSON response dict keys while also formatting keys to ``snake_case``. Also supports a single string and will replace any of these chars ``_ ' ( )`` with nothing. Parameters ---------- json: Any The JSON response data to be sanitize. Returns ------- Iterable[Any] The JSON response data cleaned up. """ if isinstance(json, list): # print("JSON is a list") _new_data = copy.copy(x=json) for i in range(0, len(json), 1): # If any of our entries are a dictionary; let's go through them. if isinstance(json[i], dict): # print("List New Data", json[i]) _new_data[i] = cls.sanitize_json(json=json[i]) # print("Sanitized New Data", _new_data[i]) # _new_data[entry] = cls.sanitize_json(json=entry) return _new_data if isinstance(json, dict): # print("JSON is a dict") _new_data = copy.copy(x=json) for key, value in json.items(): # print("DICT Not cleaned up", key, value) _new_data.pop(key) # To handle keys with spaces and to remove underscores that exist already. key: str = key.replace(" ", "") key: str = key.replace("_", "") if key in cls.json_key_mapping: # print("KEY MAPPING OVERWRITE", key, cls.json_key_mapping[key]) key = cls.json_key_mapping[key] key = cls.camel_to_snake_re(data=key) if isinstance(value, (list, dict)): value = cls.sanitize_json(json=value) _new_data[key] = value return _new_data if isinstance(json, str): # Typical use is to make attributes PEP8 compliant for a class. _new_data = copy.copy(json) _new_data = _new_data.replace(" ", "_") _new_data = _new_data.replace("(", "").replace(")", "") _new_data = _new_data.replace("'", "") if _new_data.endswith("."): _new_data = _new_data[:-1].lower() return _new_data.lower() return json
[docs] @staticmethod def sanitize_path(path: str) -> str: """|classmethod| The path is relative to the Instances home directory. eg "/myInstanceName/" \n You do not need to include "." to specify the current directory as all path's start from root/home. .. note:: Example `await Instance.copyFile("eula.txt", "test")` would move `./eula.txt` to `./test/eula.txt` Parameters ----------- path: str The path to be sanitized. Raises ------- :exc:`ValueError` If the path string contains a underscore. Returns -------- :class:`str` Return the sanitized path string. """ if "_" in path: raise ValueError("You cannot use '_' in path strings.") path = path.replace("//", "/") path = path.replace("\\", "/") path = path.replace("..", ".") # Remove starting periods as they are un-needed when being passed into an API if path.startswith("."): path = path[1:] # Remove starting slashes, all paths start relative to Instance root. if path.startswith("/"): path = path[1:] return path
[docs] @staticmethod def to_snake_case(data: str, /) -> str: """ Quick function to return snake_case from camelCase. Parameters ---------- data: :class:`str` The string to convert. Returns -------- :class:`str` The camelCase string. """ fmt: list[str] = [] for character in data: if character.isupper(): fmt.append(f"_{character.lower()}") continue fmt.append(character) return "".join(fmt)
[docs] async def version_validation(self, version: BuildInfo) -> None: """ Compares the Version of the application/Instance against the version that is passed in. Parameters ----------- version: :class:`VersionInfo` The version to compare against the application. Raises ------- :exc:`RuntimeError` The application version no longer supports this API call.. """ result: Any = await self._call_api( api="Core/GetDiagnosticsInfo", format_data=True, format_=Diagnostics, _use_from_dict=False, _auto_unpack=True, ) if isinstance(result, Diagnostics) and isinstance(result.application_version, BuildInfo): _version: BuildInfo = result.application_version if result.application_version < version: raise RuntimeError(self._version_unavailable, "`Core/GetWebserverMetrics`", _version) else: self.logger.warning( "Unable to validate version Info, the API call %s may raise an error", "`Core/GetDiagnosticsInfo`" )