Source code for terminusgps.wialon.session
import logging
import os
import typing
import wialon.api
logger = logging.getLogger(__name__)
UNKNOWN_ERROR = 6
[docs]
class WialonAPIError(Exception):
"""Raised when a Wialon API call fails."""
def __init__(self, message, *args, **kwargs) -> None:
self.message = message
if not hasattr(message, "_code"):
self._code = UNKNOWN_ERROR
else:
try:
self._code = int(message._code)
except ValueError:
self._code = UNKNOWN_ERROR
return super().__init__(message, *args, **kwargs)
@property
def code(self) -> int:
"""Wialon API error code."""
return self._code
class Wialon(wialon.api.Wialon):
def call(self, action_name, *argc, **kwargs) -> dict[str, typing.Any]:
try:
return super().call(action_name, *argc, **kwargs)
except wialon.api.WialonError as e:
logger.warning(f"Failed to execute '{action_name}': '{e}'")
raise WialonAPIError(e)
[docs]
class WialonSession:
def __init__(
self,
scheme: str = "https",
host: str = "hst-api.wialon.com",
port: int = 443,
sid: str | None = None,
token: str | None = None,
auth_hash: str | None = None,
username: str | None = None,
check_service: str | None = None,
) -> None:
"""
Starts or continues a Wialon API session.
:param scheme: HTTP request scheme to use. Default is ``"https"``.
:type scheme: str
:param host: Wialon API host url. Default is ``"hst-api.wialon.com"``.
:type host: str
:param port: Wialon API port. Default is ``443``.
:type port: int
:param sid: A Wialon API session id. Default is :py:obj:`None`.
:type sid: str | None
:param token: A Wialon API token. Default is environment variable ``"WIALON_TOKEN"``.
:type token: str | None
:param auth_hash: A Wialon API authentication hash. Default is :py:obj:`None`.
:type auth_hash: str | None
:param username: A Wialon user to operate as during the session.
:type username: str | None
:param check_service: A Wialon service name to check before calling the Wialon API. Default is :py:obj:`None`.
:type check_service: str | None
:returns: Nothing.
:rtype: None
"""
self._uid = None
self._wialon_api = Wialon(scheme=scheme, host=host, port=port, sid=sid)
self._token = token if token else os.getenv("WIALON_TOKEN")
self._username = username
self._auth_hash = auth_hash
self._check_service = check_service
def __str__(self) -> str:
return f"Session #{self.id}"
def __repr__(self) -> str:
return f"{self.__class__}(sid={self.id})"
def __enter__(self) -> "WialonSession":
"""Logs into the Wialon API session if it wasn't already active before returning it."""
if self.id is None:
if self._token:
self.token_login(token=self._token, username=self._username)
elif self._auth_hash and self._username:
self.auth_hash_login(
auth_hash=self._auth_hash,
username=self._username,
check_service=self._check_service,
)
else:
raise WialonAPIError("Failed to login to the Wialon API")
return self
def __exit__(self, *args, **kwargs) -> None:
"""Logs out of the Wialon API session if :py:attr:`id` was set."""
if self.id is not None:
self.logout()
[docs]
def token_login(self, token: str, username: str | None = None) -> None:
"""
Logs in to a Wialon API session using a token.
:param token: A Wialon API token.
:type token: str
:param username: Wialon user to operate as during the Wialon API session. Default is :py:obj:`None`.
:type username: str
:returns: Nothing.
:rtype: None
"""
params = {"token": token, "flags": 0x3 if username else 0x1}
if username is not None:
params.update({"operateAs": username})
response = self.wialon_api.token_login(**params)
self.wialon_api.sid = response.get("eid")
self._username = response.get("au")
self._uid = response.get("user", {}).get("id")
self._gis_sid = response.get("gis_sid")
[docs]
def auth_hash_login(
self, auth_hash: str, username: str, check_service: str | None = None
) -> None:
"""
Logs in to a Wialon API session using an auth hash.
:param auth_hash: An authorization hash.
:type auth_hash: str
:param username: Wialon user to operate as during the Wialon API session.
:type username: str
:param check_service: Name of a Wialon service to check if the user has access to. Default is :py:obj:`None` (no service check).
:type check_service: str | None
:returns: Nothing.
:rtype: None
"""
params = {"authHash": auth_hash, "operateAs": username}
if check_service is not None:
params.update({"checkService": check_service})
response = self.wialon_api.core_use_auth_hash(**params)
self.wialon_api.sid = response.get("eid")
self._username = response.get("au")
self._uid = response.get("user", {}).get("id")
[docs]
def logout(self) -> None:
"""
Logs out of the Wialon API session.
:raises WialonAPIError: If the Wialon API session logout failed.
:returns: Nothing.
:rtype: None
"""
session_id = self.wialon_api.sid
if session_id is not None:
response = self.wialon_api.core_logout({})
if not int(response.get("error")) == 0:
raise WialonAPIError(
message=f"Failed to logout of the Wialon API session #{session_id}",
code=int(response.get("error")),
)
self.wialon_api.sid = None
@property
def wialon_api(self) -> Wialon:
return self._wialon_api
@property
def uid(self) -> str | None:
"""
User id of the session's authenticated Wialon user.
:type: str | None
:value: None
"""
return self._uid
@property
def username(self) -> str | None:
"""
Username of the session's authenticated Wialon user.
:type: str | None
:value: None
"""
return self._username
@property
def gis_sid(self) -> str | None:
"""
GIS session id for the Wialon API session.
:type: str | None
:value: None
"""
return self._gis_sid
@property
def id(self) -> str | None:
"""
Wialon API session id.
Shortcut property for :py:attr:`WialonSession.wialon_api.sid`.
Returns :py:obj:`None` if the session wasn't logged in.
:type: str | None
:value: None
"""
return self.wialon_api.sid