"""\
Copyright (c) 2022, Flagstaff Solutions, LLC
All rights reserved.
"""
# pylint: disable=cyclic-import, no-member, global-statement, protected-access, wrong-import-order, ungrouped-imports
# pylint: disable=too-many-locals
import inspect
import io
import json
import os
import sys
from collections import namedtuple
from functools import wraps
from uuid import UUID
import PIL
import six
from gofigr import GoFigr, API_URL
from gofigr.annotators import NotebookNameAnnotator, CellIdAnnotator, SystemAnnotator, CellCodeAnnotator, \
PipFreezeAnnotator
from gofigr.backends import get_backend
from gofigr.backends.matplotlib import MatplotlibBackend
from gofigr.backends.plotly import PlotlyBackend
from gofigr.listener import run_listener_async
from gofigr.profile import MeasureExecution
from gofigr.watermarks import DefaultWatermark
try:
from IPython.core.display_functions import display
except ModuleNotFoundError:
from IPython.core.display import display
from IPython.core.display import Javascript, HTML
DISPLAY_TRAP = None
[docs]
class GfDisplayPublisher:
"""\
Custom IPython DisplayPublisher which traps all calls to publish() (e.g. when display(...) is called).
"""
def __init__(self, pub):
"""
:param pub: Publisher to wrap around. We delegate all calls to this publisher unless trapped.
"""
self.pub = pub
[docs]
def publish(self, data, *args, **kwargs):
"""
IPython calls this method whenever it needs data displayed. Our function traps the call
and calls DISPLAY_TRAP instead, giving it an option to suppress the figure from being displayed.
We use this trap to publish the figure if auto_publish is True. Suppression is useful
when we want to show a watermarked version of the figure, and prevents it from being showed twice (once
with the watermark inside the trap, and once without in the originating call).
:param data: dictionary of mimetypes -> data
:param args: implementation-dependent
:param kwargs: implementation-dependend
:return: None
"""
# Python doesn't support assignment to variables in closure scope, so we use a mutable list instead
is_suppressed = [False]
def suppress_display():
is_suppressed[0] = True
if DISPLAY_TRAP is not None:
trap = DISPLAY_TRAP
with SuppressDisplayTrap():
trap(data, suppress_display=suppress_display)
if not is_suppressed[0]:
self.pub.publish(data, *args, **kwargs)
def __getattr__(self, item):
"""\
Delegates to self.pub
:param item:
:return:
"""
if item == "pub":
return super().__getattribute__(self.pub)
return getattr(self.pub, item)
def __setattr__(self, key, value):
"""\
Delegates to self.pub
:param key:
:param value:
:return:
"""
if key == "pub":
super().__setattr__(key, value)
return setattr(self.pub, key, value)
[docs]
def clear_output(self, *args, **kwargs):
"""IPython's clear_output. Defers to self.pub"""
return self.pub.clear_output(*args, **kwargs)
[docs]
class SuppressDisplayTrap:
"""\
Context manager which temporarily suspends all display traps.
"""
def __init__(self):
self.trap = None
def __enter__(self):
global DISPLAY_TRAP
self.trap = DISPLAY_TRAP
DISPLAY_TRAP = None
def __exit__(self, exc_type, exc_val, exc_tb):
global DISPLAY_TRAP
DISPLAY_TRAP = self.trap
self.trap = None
class _GoFigrExtension:
"""\
Implements the main Jupyter extension functionality. You will not want to instantiate this class directly.
Instead, please call get_extension().
"""
def __init__(self, ip,
auto_publish=False,
notebook_metadata=None):
"""\
:param ip: iPython shell instance
:param auto_publish: whether to auto-publish figures
:param pre_run_hook: function to use as a pre-run hook
:param post_execute_hook: function to use as a post-execute hook
:param notebook_metadata: information about the running notebook, as a key-value dictionary
"""
self.shell = ip
self.auto_publish = auto_publish
self.cell = None
self.notebook_metadata = notebook_metadata
self.gf = None # active GF object
self.workspace = None # current workspace
self.analysis = None # current analysis
self.publisher = None # current Publisher instance
self.deferred_revisions = []
def display_trap(self, data, suppress_display):
"""\
Called whenever *any* code inside the Jupyter session calls display().
:param data: dictionary of MIME types
:param suppress_display: callable with no arguments. Call to prevent the originating figure from being shown.
:return: None
"""
if self.auto_publish:
self.publisher.auto_publish_hook(self, data, suppress_display)
def add_to_deferred(self, rev):
"""\
Adds a revision to a list of deferred revisions. Such revisions will be annotated in the post_run_cell
hook, and re-saved.
This functionality exists because it's possible to load the GoFigr extension and publish figures in the same
cell, in which case GoFigr will not receive the pre_run_cell hook and will not have access to cell information
when the figure is published. This functionality allows us to obtain the cell information after it's run
(in the post_run_cell hook), re-run annotators, and update the figure with full annotations.
:param rev: revision to defer
:return: None
"""
if rev not in self.deferred_revisions:
self.deferred_revisions.append(rev)
def check_config(self):
"""Ensures the plugin has been configured for use"""
props = ["gf", "workspace", "analysis", "publisher"]
for prop in props:
if getattr(self, prop, None) is None:
raise RuntimeError("GoFigr not configured. Please call configure() first.")
def pre_run_cell(self, info):
"""\
Default pre-run cell hook.
:param info: Cell object
:return:None
"""
self.cell = info
def post_run_cell(self, result):
"""Post run cell hook.
:param result: ExecutionResult
:return: None
"""
self.cell = result.info
while len(self.deferred_revisions) > 0:
rev = self.deferred_revisions.pop(0)
rev = self.publisher.annotate(rev)
rev.save(silent=True)
self.cell = None
def _register_handler(self, event_name, handler):
"""Inserts a handler at the beginning of the list while avoiding double-insertions"""
handlers = [handler]
for hnd in self.shell.events.callbacks[event_name]:
self.shell.events.unregister(event_name, hnd)
if hnd != handler: # in case it's already registered, skip it
handlers.append(hnd)
for hnd in handlers:
self.shell.events.register(event_name, hnd)
def unregister(self):
"""\
Unregisters all hooks, effectively disabling the plugin.
"""
try:
self.shell.events.unregister('pre_run_cell', self.pre_run_cell)
except ValueError:
pass
try:
self.shell.events.unregister('post_run_cell', self.post_run_cell)
except ValueError:
pass
def register_hooks(self):
"""\
Register all hooks with Jupyter.
:return: None
"""
global DISPLAY_TRAP
DISPLAY_TRAP = self.display_trap
self._register_handler('pre_run_cell', self.pre_run_cell)
self._register_handler('post_run_cell', self.post_run_cell)
native_display_publisher = self.shell.display_pub
if not isinstance(native_display_publisher, GfDisplayPublisher):
self.shell.display_pub = GfDisplayPublisher(native_display_publisher)
_GF_EXTENSION = None # GoFigrExtension global
[docs]
@require_configured
def get_extension():
"""Returns the GoFigr Jupyter extension instance"""
return _GF_EXTENSION
def _load_ipython_extension(ip):
"""\
Loads the Jupyter extension. Aliased to "load_ipython_extension" (no leading underscore) in the main init.py file.
:param ip: IPython shell
:return: None
"""
global _GF_EXTENSION
if _GF_EXTENSION is not None:
_GF_EXTENSION.unregister()
_GF_EXTENSION = _GoFigrExtension(ip)
_GF_EXTENSION.register_hooks()
[docs]
def parse_uuid(val):
"""\
Attempts to parse a UUID, returning None if input is not a valid UUID.
:param val: value to parse
:return: UUID (as a string) or None
"""
try:
return str(UUID(val))
except ValueError:
return None
ApiId = namedtuple("ApiId", ["api_id"])
[docs]
class FindByName:
"""\
Used as argument to configure() to specify that we want to find an analysis/workspace by name instead
of using an API ID
"""
def __init__(self, name, description=None, create=False):
self.name = name
self.description = description
self.create = create
def __repr__(self):
return f"FindByName(name={self.name}, description={self.description}, create={self.create})"
[docs]
def parse_model_instance(model_class, value, find_by_name):
"""\
Parses a model instance from a value, e.g. the API ID or a name.
:param model_class: class of the model, e.g. gf.Workspace
:param value: value to parse into a model instance
:param find_by_name: callable to find the model instance by name
:return: model instance
"""
if isinstance(value, model_class):
return value
elif isinstance(value, str):
return model_class(api_id=value)
elif isinstance(value, ApiId):
return model_class(api_id=value.api_id)
elif isinstance(value, FindByName):
return find_by_name(value)
else:
return ValueError(f"Unsupported target specification: {value}. Please specify an API ID, or use FindByName.")
DEFAULT_ANNOTATORS = (NotebookNameAnnotator, CellIdAnnotator, CellCodeAnnotator, SystemAnnotator,
PipFreezeAnnotator)
DEFAULT_BACKENDS = (MatplotlibBackend, PlotlyBackend)
[docs]
class Publisher:
"""\
Publishes revisions to the GoFigr server.
"""
def __init__(self,
gf,
annotators,
backends,
watermark=None,
image_formats=("png", "eps", "svg"),
interactive=True,
default_metadata=None,
clear=True):
"""
:param gf: GoFigr instance
:param annotators: revision annotators
:param backends: figure backends, e.g. MatplotlibBackend
:param watermark: watermark generator, e.g. QRWatermark()
:param image_formats: image formats to save by default
:param interactive: whether to publish figure HTML if available
:param clear: whether to close the original figures after publication. If False, Jupyter will display
both the input figure and the watermarked output. Default behavior is to close figures.
"""
self.gf = gf
self.watermark = watermark or DefaultWatermark()
self.annotators = annotators
self.backends = backends
self.image_formats = image_formats
self.interactive = interactive
self.clear = clear
self.default_metadata = default_metadata
[docs]
def auto_publish_hook(self, extension, data, suppress_display=None):
"""\
Hook for automatically publishing figures without an explicit call to publish().
:param extension: GoFigrExtension instance
:param data: data being published. This will usually be a dictionary of mime formats.
:param native_publish: callable which will publish the figure using the native backend
:return: None
"""
for backend in self.backends:
compatible_figures = list(backend.find_figures(extension.shell, data))
for fig in compatible_figures:
if not getattr(fig, '_gf_is_published', False):
self.publish(fig=fig, backend=backend, suppress_display=suppress_display)
@staticmethod
def _resolve_target(gf, fig, target, backend):
ext = get_extension()
if target is None:
# Try to get the figure's title
fig_name = backend.get_title(fig)
if fig_name is None:
print("Your figure doesn't have a title and will be published as 'Anonymous Figure'. "
"To avoid this warning, set a figure title or manually call publish() with a target figure. "
"See https://gofigr.io/docs/gofigr-python/latest/start.html#publishing-your-first-figure for "
"an example.", file=sys.stderr)
fig_name = "Anonymous Figure"
sys.stdout.flush()
return ext.analysis.get_figure(fig_name, create=True)
else:
return parse_model_instance(gf.Figure,
target,
lambda search: ext.analysis.get_figure(name=search.name,
description=search.description,
create=search.create))
def _get_image_data(self, gf, backend, fig, rev, image_options):
"""\
Extracts ImageData in various formats.
:param gf: GoFigr instance
:param backend: backend to use
:param fig: figure object
:param rev: Revision object
:param image_options: backend-specific parameters
:return: tuple of: list of ImageData objects, watermarked image to display
"""
if image_options is None:
image_options = {}
image_to_display = None
image_data = []
for fmt in self.image_formats:
if fmt.lower() == "png":
img = PIL.Image.open(io.BytesIO(backend.figure_to_bytes(fig, fmt, image_options)))
img.load()
watermarked_img = self.watermark.apply(img, rev)
else:
watermarked_img = None
# First, save the image without the watermark
try:
image_data.append(gf.ImageData(name="figure",
format=fmt,
data=backend.figure_to_bytes(fig, fmt, image_options),
is_watermarked=False))
except Exception as e: # pylint: disable=broad-exception-caught
print(f"WARNING: We could not obtain the figure in {fmt.upper()} format: {e}", file=sys.stderr)
continue
# Now, save the watermarked version (if available)
if watermarked_img is not None:
bio = io.BytesIO()
watermarked_img.save(bio, format=fmt)
img_data = gf.ImageData(name="figure", format=fmt, data=bio.getvalue(),
is_watermarked=True)
image_data.append(img_data)
if fmt.lower() == 'png':
image_to_display = img_data
if self.interactive and backend.is_interactive(fig):
image_data.append(gf.ImageData(name="figure", format="html",
data=backend.figure_to_html(fig).encode('utf-8'),
is_watermarked=False))
wfig = backend.add_interactive_watermark(fig, rev, self.watermark)
html_with_watermark = gf.ImageData(name="figure", format="html",
data=backend.figure_to_html(wfig).encode('utf-8'),
is_watermarked=True)
image_data.append(html_with_watermark)
image_to_display = wfig # display the native Figure
return image_data, image_to_display
[docs]
def annotate(self, rev):
"""
Annotates a FigureRevision using self.annotators.
:param rev: revision to annotate
:return: annotated revision
"""
for annotator in self.annotators:
with MeasureExecution(annotator.__class__.__name__):
annotator.annotate(rev)
return rev
def _infer_figure_and_backend(self, fig, backend):
"""\
Given a figure and a backend where one of the values could be null, returns a complete set
of a figure to publish and a matching backend.
:param fig: figure to publish. None to publish the default for the backend
:param backend: backend to use. If None, will infer from figure
:return: tuple of figure and backend
"""
if fig is None and backend is None:
raise ValueError("You did not specify a figure to publish.")
elif fig is not None and backend is not None:
return fig, backend
elif fig is None and backend is not None:
fig = backend.get_default_figure()
if fig is None:
raise ValueError("You did not specify a figure to publish, and the backend does not have "
"a default.")
else:
backend = get_backend(fig, self.backends)
return fig, backend
[docs]
def publish(self, fig=None, target=None, gf=None, dataframes=None, metadata=None,
backend=None, image_options=None, suppress_display=None):
"""\
Publishes a revision to the server.
:param fig: figure to publish. If None, we'll use plt.gcf()
:param target: Target figure to publish this revision under. Can be a gf.Figure instance, an API ID, \
or a FindByName instance.
:param gf: GoFigure instance
:param dataframes: dictionary of dataframes to associate & publish with the figure
:param metadata: metadata (JSON) to attach to this revision
usage this will cause Jupyter to print the whole object which we don't want.
:param backend: backend to use, e.g. MatplotlibBackend. If None it will be inferred automatically based on \
figure type
:param image_options: backend-specific params passed to backend.figure_to_bytes
:param suppress_display: if used in an auto-publish hook, this will contain a callable which will
suppress the display of this figure using the native IPython backend.
:return: FigureRevision instance
"""
# pylint: disable=too-many-branches
ext = get_extension()
gf = gf if gf is not None else ext.gf
fig, backend = self._infer_figure_and_backend(fig, backend)
with MeasureExecution("Resolve target"):
target = self._resolve_target(gf, fig, target, backend)
if getattr(target, 'revisions', None) is None:
target.fetch()
combined_meta = self.default_metadata if self.default_metadata is not None else {}
if metadata is not None:
combined_meta.update(metadata)
with MeasureExecution("Bare revision"):
# Create a bare revision first to get the API ID
rev = gf.Revision(figure=target, metadata=combined_meta)
target.revisions.create(rev)
deferred = False
if _GF_EXTENSION.cell is None:
deferred = True
get_extension().add_to_deferred(rev)
with MeasureExecution("Image data"):
rev.image_data, image_to_display = self._get_image_data(gf, backend, fig, rev, image_options)
if image_to_display is not None:
with SuppressDisplayTrap():
if isinstance(image_to_display, gf.ImageData):
display(image_to_display.image)
else:
display(image_to_display)
if suppress_display is not None:
suppress_display()
if dataframes is not None:
table_data = []
for name, frame in dataframes.items():
table_data.append(gf.TableData(name=name, dataframe=frame))
rev.table_data = table_data
if not deferred:
with MeasureExecution("Annotators"):
# Annotate the revision
self.annotate(rev)
with MeasureExecution("Final save"):
rev.save(silent=True)
fig._gf_is_published = True
if self.clear:
backend.close(fig)
with SuppressDisplayTrap():
display(HTML(f"""
<div style='margin-top: 1em; margin-bottom: 1em; margin-left: auto; margin-right: auto;'>
<a href='{rev.revision_url}'>View on GoFigr</a>
</div>"""))
return rev
[docs]
def from_config_or_env(env_prefix, config_path):
"""\
Decorator that binds function arguments in order of priority (most important first):
1. args/kwargs
2. environment variables
3. config file
4. function defaults
:param env_prefix: prefix for environment variables. Variables are assumed to be named \
`<prefix> + <name of function argument in all caps>`, e.g. if prefix is ``MYAPP`` and function argument \
is called host_name, we'll look for an \
environment variable named ``MYAPP_HOST_NAME``.
:param config_path: path to the JSON config file. Function arguments will be looked up using their verbatim names.
:return: decorated function
"""
def decorator(func):
@six.wraps(func)
def wrapper(*args, **kwargs):
# Read config file, if it exists
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
try:
config_file = json.load(f)
except Exception as e:
raise RuntimeError(f"Error parsing configuration file {config_path}") from e
else:
config_file = {}
sig = inspect.signature(func)
param_values = sig.bind_partial(*args, **kwargs).arguments
for param_name in sig.parameters:
env_name = f'{env_prefix}{param_name.upper()}'
if param_name in param_values:
continue # value supplied through args/kwargs: ignore env variables and the config file.
elif env_name in os.environ:
param_values[param_name] = os.environ[env_name]
elif param_name in config_file:
param_values[param_name] = config_file[param_name]
return func(**param_values)
return wrapper
return decorator
[docs]
def find_workspace_by_name(gf, search):
"""\
Finds a workspace by name.
:param gf: GoFigr client
:param search: FindByName instance
:return: a Workspace object
"""
matches = [wx for wx in gf.workspaces if wx.name == search.name]
if len(matches) == 0:
if search.create:
wx = gf.Workspace(name=search.name, description=search.description)
wx.create()
print(f"Created a new workspace: {wx.api_id}")
return wx
else:
raise RuntimeError(f'Could not find workspace named "{search.name}"')
elif len(matches) > 1:
raise RuntimeError(f'Multiple (n={len(matches)}) workspaces match name "{search.name}". '
f'Please use an API ID instead.')
else:
return matches[0]
[docs]
def listener_callback(result):
"""WebSocket callback"""
if result is not None and isinstance(result, dict) and result['message_type'] == "metadata":
_GF_EXTENSION.notebook_metadata = result
# pylint: disable=too-many-arguments, too-many-locals
[docs]
@require_configured
def publish(fig=None, backend=None, **kwargs):
"""\
Publishes a figure. See :func:`gofigr.jupyter.Publisher.publish` for a list of arguments. If figure and backend
are both None, will publish default figures across all available backends.
:param fig: figure to publish
:param backend: backend to use
:param kwargs:
:return:
"""
ext = get_extension()
if fig is None and backend is None:
# If no figure and no backend supplied, publish default figures across all available backends
for available_backend in ext.publisher.backends:
fig = available_backend.get_default_figure(silent=True)
if fig is not None:
ext.publisher.publish(fig=fig, backend=available_backend, **kwargs)
else:
ext.publisher.publish(fig=fig, backend=backend, **kwargs)
[docs]
@require_configured
def get_gofigr():
"""Gets the active GoFigr object."""
return get_extension().gf