"""\
Copyright (c) 2022, Flagstaff Solutions, LLC
All rights reserved.
"""
from json import dumps as json_dumps
import logging
import requests
from requests import Session
from gofigr.models import *
LOGGER = logging.getLogger(__name__)
API_URL = "https://api.gofigr.io"
API_VERSION = "v1.1"
APP_URL = "https://app.gofigr.io"
[docs]
def assert_one(elements, error_none=None, error_many=None):
"""\
Asserts that a list/tuple contains only a single element (raising an exception if not), and returns
that element.
:param elements: list/tuple
:param error_none: error message if input is empty
:param error_many: error message if multiple elements are present
:return: the single element in the input
"""
if len(elements) == 0:
raise ValueError(error_none or "Expected exactly one value but got none")
elif len(elements) > 1:
raise ValueError(error_many or f"Expected exactly one value but got n={len(elements)}")
else:
return elements[0]
[docs]
class UnauthorizedError(RuntimeError):
"""\
Thrown if user doesn't have permissions to perform an action.
"""
pass
[docs]
class UserInfo:
"""\
Stores basic information about a user: username, email, etc.
"""
def __init__(self, username, first_name, last_name, email, date_joined, is_active, avatar):
"""\
:param username:
:param first_name:
:param last_name:
:param email:
:param date_joined:
:param is_active:
:param avatar: avatar as a PIL.Image instance
"""
self.username = username
self.first_name, self.last_name = first_name, last_name
self.email = email
self.date_joined = date_joined
self.is_active = is_active
self.avatar = avatar
@staticmethod
def _avatar_to_b64(img):
if not img:
return None
bio = io.BytesIO()
img.save(bio, format="png")
return b64encode(bio.getvalue()).decode('ascii')
@staticmethod
def _avatar_from_b64(data):
if not data:
return None
return PIL.Image.open(io.BytesIO(b64decode(data)))
[docs]
@staticmethod
def from_json(obj):
"""\
Parses a UserInfo object from JSON
:param obj: JSON representation
:return: UserInfo instance
"""
date_joined = obj.get('date_joined')
return UserInfo(username=obj.get('username'),
first_name=obj.get('first_name'),
last_name=obj.get('last_name'),
email=obj.get('email'),
date_joined=dateutil.parser.parse(date_joined) if date_joined is not None else None,
is_active=obj.get('is_active'),
avatar=UserInfo._avatar_from_b64(obj.get('avatar')))
[docs]
def to_json(self):
"""Converts this UserInfo object to json"""
return {'username': self.username,
'first_name': self.first_name,
'last_name': self.last_name,
'email': self.email,
'date_joined': str(self.date_joined) if self.date_joined else None,
'is_active': self.is_active,
'avatar': UserInfo._avatar_to_b64(self.avatar)}
def __str__(self):
return json_dumps(self.to_json())
def __eq__(self, other):
return str(self) == str(other)
[docs]
class GoFigr:
"""\
The GoFigr client. Handles all communication with the API: authentication, figure creation and manipulation,
sharing, retrieval of user information, etc.
"""
def __init__(self, username, password,
url=API_URL,
authenticate=True):
"""\
:param username: username to connect with
:param password: password for authentication
:param url: API URL
:param authenticate: whether to authenticate right away. If False, authentication will happen during
the first request.
"""
self.service_url = url
self.username = username
self.password = password
self._primary_workspace = None
self._access_token = None
self._refresh_token = None
if authenticate:
self.authenticate()
self._bind_models()
@property
def app_url(self):
"""Returns the URL to the GoFigr app"""
return self.service_url.replace("api", "app").replace(":8000", ":3000")
def _bind_models(self):
"""\
Create instance-bound model classes, e.g. Workspace, Figure, etc. Each will internally
store a reference to this GoFigr client -- that way we don't have to pass it around.
:return: None
"""
# pylint: disable=too-few-public-methods,protected-access
for name, obj in globals().items():
if inspect.isclass(obj) and issubclass(obj, ModelMixin):
class _Bound(obj):
_gf = self
_Bound.__qualname__ = f"GoFigr.{name}"
_Bound._gofigr_type_name = name.replace("gf_", "")
setattr(self, name.replace("gf_", ""), _Bound)
elif inspect.isclass(obj) and issubclass(obj, NestedMixin):
# Nested mixins don't reference the GoFigr object, but they're exposed in the same way
# for consistency.
setattr(self, name, obj)
@property
def api_url(self):
"""\
Full URL to the API endpoint.
"""
return f"{self.service_url}/api/{API_VERSION}/"
@property
def jwt_url(self):
"""\
Full URL to the JWT endpoint (for authentication).
"""
return f"{self.service_url}/api/token/"
@staticmethod
def _is_expired_token(response):
"""\
Checks whether a response failed due to an expired auth token.
:param response: Response object
:return: True if failed due to an expired token, False otherwise.
"""
if response.status_code != HTTPStatus.UNAUTHORIZED:
return False
try:
obj = response.json()
return obj.get('code') == 'token_not_valid'
except ValueError:
return False
def _request(self, method, endpoint, throw_exception=True, expected_status=(HTTPStatus.OK, ), **kwargs):
"""\
Convenience function for making HTTP requests.
:param method: one of Session methods: Session.get, Session.post, etc.
:param endpoint: relative API endpoint
:param throw_exception: whether to check response status against expected_status and throw an exception
:param expected_status: list of acceptable response status codes
:param kwargs: extra params passed verbatim to method(...)
:return: Response
"""
url = urljoin(self.api_url, endpoint)
if not hasattr(expected_status, '__iter__'):
expected_status = [expected_status, ]
if self._access_token is None:
raise RuntimeError("Please authenticate first")
rqst = requests.session()
try:
response = method(rqst, url, headers={'Authorization': f'Bearer {self._access_token}'}, **kwargs)
if self._is_expired_token(response):
self._refresh_access_token()
return self._request(method, endpoint,
throw_exception=throw_exception,
expected_status=expected_status, **kwargs)
if throw_exception and response.status_code not in expected_status:
if response.status_code == HTTPStatus.FORBIDDEN:
raise UnauthorizedError(f"Unauthorized: {response.content}")
else:
raise RuntimeError(f"Request to {url} returned {response.status_code}: {response.content}")
return response
finally:
rqst.close()
def _get(self, endpoint, throw_exception=True, **kwargs):
return self._request(Session.get, endpoint, throw_exception=throw_exception, **kwargs)
def _post(self, endpoint, json, throw_exception=True, **kwargs):
return self._request(Session.post, endpoint, json=json, throw_exception=throw_exception, **kwargs)
def _patch(self, endpoint, json, throw_exception=True, **kwargs):
return self._request(Session.patch, endpoint, json=json, throw_exception=throw_exception, **kwargs)
def _put(self, endpoint, json, throw_exception=True, **kwargs):
return self._request(Session.put, endpoint, json=json, throw_exception=throw_exception, **kwargs)
def _delete(self, endpoint, throw_exception=True, **kwargs):
return self._request(Session.delete, endpoint, throw_exception=throw_exception,
expected_status=HTTPStatus.NO_CONTENT, **kwargs)
[docs]
def heartbeat(self, throw_exception=True):
"""\
Checks whether we can communicate with the API. Currently, this works by polling /api/v1/info.
:param throw_exception: throw an exception if response code is not 200
:return: Response
"""
return self._get("info/", throw_exception=throw_exception)
def _refresh_access_token(self):
"""\
Refresh the JWT access token. If a refresh is not possible (e.g. the token has expired), will attempt
to re-authenticate.
:return: True if successful. Exception if not.
"""
rqst = requests.session()
try:
rsp = rqst.post(self.jwt_url + "refresh/",
data={'refresh': self._refresh_token},
allow_redirects=False)
if rsp.status_code == 200:
self._access_token = rsp.json()['access']
return True
else:
return self.authenticate()
finally:
if rqst is not None:
rqst.close()
[docs]
def authenticate(self):
"""\
Authenticates with the API.
:return: True
"""
rqst = requests.session()
try:
rsp = rqst.post(self.jwt_url,
data={'username': self.username, 'password': self.password},
allow_redirects=False)
if rsp.status_code != 200:
raise RuntimeError("Authentication failed")
self._refresh_token = rsp.json()['refresh']
self._access_token = rsp.json()['access']
return True
finally:
if rqst is not None:
rqst.close()
[docs]
def user_info(self, username=None):
"""\
Retrieves information about a user.
:param username: username. Set to None for self.
:return: UserInfo object.
"""
if not username:
return UserInfo.from_json(self._get("user").json()[0])
else:
return UserInfo.from_json(self._get("user/" + username).json())
[docs]
def update_user_info(self, user_info, username=None):
"""\
Updates user information for a user.
:param user_info: UserInfo instance
:param username: optional username. This is for testing only -- you will get an error if attempting \
to update information for anybody other than yourself.
:return: refreshed UserInfo from server
"""
response = self._put("user/" + (username or user_info.username) + "/", user_info.to_json())
return UserInfo.from_json(response.json())
@property
def workspaces(self):
"""Returns a list of all workspaces that the current user is a member of."""
# pylint: disable=no-member
return self.Workspace.list()
@property
def primary_workspace(self):
"""\
Returns the primary workspace for this user.
:return: Workspace instance
"""
if self._primary_workspace is not None:
return self._primary_workspace
primaries = [w for w in self.workspaces if w.workspace_type == "primary"]
pw = assert_one(primaries,
"No primary workspace found. Please contact support.",
"Multiple primary workspaces found. Please contact support.")
self._primary_workspace = pw
return self._primary_workspace
[docs]
def load_ipython_extension(ip):
"""\
Loads the Jupyter extension. Present here so that we can do "%load_ext gofigr" without having to refer
to a subpackage.
:param ip: IPython shell
:return: None
"""
# pylint: disable=import-outside-toplevel
from gofigr.jupyter import _load_ipython_extension
return _load_ipython_extension(ip)