"""\
Copyright (c) 2022, Flagstaff Solutions, LLC
All rights reserved.
"""
# pylint: disable=cyclic-import, no-member, global-statement, protected-access
# pylint: disable=too-many-locals
import io
import os
import pickle
import sys
import traceback
from functools import wraps
from pathlib import Path
from uuid import UUID
import gofigr
from gofigr import GoFigr, API_URL
from gofigr.annotators import NotebookMetadataAnnotator, NOTEBOOK_NAME, NOTEBOOK_PATH, IPythonAnnotator
from gofigr.publisher import Publisher, DEFAULT_ANNOTATORS, DEFAULT_BACKENDS, _mark_as_published, is_published, \
is_suppressed
from gofigr.proxy import run_proxy_async, get_javascript_loader
from gofigr.profile import MeasureExecution
from gofigr.trap import GfDisplayPublisher, SuppressDisplayTrap
from gofigr.utils import from_config_or_env
from gofigr.widget import DetailedWidget, StartupWidget
try:
from IPython.core.display_functions import display
except ModuleNotFoundError:
from IPython.core.display import display
# pylint: disable=too-many-instance-attributes
class _GoFigrExtension:
"""\
Implements the main Jupyter extension functionality. You will not want to instantiate this class directly.
Instead, please call get_extension().
"""
def __init__(self, ip,
auto_publish=False,
notebook_metadata=None,
configured=False,
loader_shown=False,
asset_log=None):
"""\
:param ip: iPython shell instance
:param auto_publish: whether to auto-publish figures
:param pre_run_hook: function to use as a pre-run hook
:param post_execute_hook: function to use as a post-execute hook
:param notebook_metadata: information about the running notebook, as a key-value dictionary
"""
self.shell = ip
self.auto_publish = auto_publish
self.cell = None
self.proxy = None
self.loader_shown = loader_shown
self.configured = configured
if notebook_metadata is None:
self.notebook_metadata = NotebookMetadataAnnotator().try_get_metadata()
else:
self.notebook_metadata = notebook_metadata
self.gf = None # active GF object
self.publisher = None # current Publisher instance
self.wait_for_metadata = None # callable which waits for metadata to become available
self.asset_log = asset_log if asset_log is not None else {}
self.deferred_revisions = []
@property
def is_ready(self):
"""True if the extension has been configured and ready for use."""
return self.configured and self.notebook_metadata is not None
def resolve_analysis(self):
"""Gets the current analysis"""
if isinstance(self.publisher.analysis, gofigr.NotebookName):
meta = NotebookMetadataAnnotator().try_get_metadata()
if meta is not None:
self.publisher.analysis = self.publisher.workspace.get_analysis(name=Path(meta[NOTEBOOK_NAME]).stem,
create=True)
self.publisher.analysis.fetch()
return self.publisher.analysis
def display_trap(self, data, suppress_display):
"""\
Called whenever *any* code inside the Jupyter session calls display().
:param data: dictionary of MIME types
:param suppress_display: callable with no arguments. Call to prevent the originating figure from being shown.
:return: None
"""
if self.auto_publish:
self.publisher.auto_publish_hook(self, data, suppress_display)
def add_to_deferred(self, rev):
"""\
Adds a revision to a list of deferred revisions. Such revisions will be annotated in the post_run_cell
hook, and re-saved.
This functionality exists because it's possible to load the GoFigr extension and publish figures in the same
cell, in which case GoFigr will not receive the pre_run_cell hook and will not have access to cell information
when the figure is published. This functionality allows us to obtain the cell information after it's run
(in the post_run_cell hook), re-run annotators, and update the figure with full annotations.
:param rev: revision to defer
:return: None
"""
if rev not in self.deferred_revisions:
self.deferred_revisions.append(rev)
def check_config(self):
"""Ensures the plugin has been configured for use"""
if not self.configured:
raise RuntimeError("GoFigr not configured. Please call configure() first.")
def pre_run_cell(self, info):
"""\
Default pre-run cell hook.
:param info: Cell object
:return:None
"""
self.cell = info
def _get_metadata_from_proxy(self, result):
if self.configured and not self.loader_shown and "_VSCODE" not in result.info.raw_cell:
self.proxy, self.wait_for_metadata = run_proxy_async(self.gf, proxy_callback)
with SuppressDisplayTrap():
display(get_javascript_loader(self.gf, self.proxy))
self.loader_shown = True
if self.notebook_metadata is None and self.wait_for_metadata is not None:
self.wait_for_metadata()
self.wait_for_metadata = None
def post_run_cell(self, result):
"""Post run cell hook.
:param result: ExecutionResult
:return: None
"""
self.cell = result.info
if self.notebook_metadata is None:
self._get_metadata_from_proxy(result)
while len(self.deferred_revisions) > 0:
rev = self.deferred_revisions.pop(0)
rev = self.publisher.annotate(rev)
rev.save(silent=True)
self.cell = None
def _register_handler(self, event_name, handler):
"""Inserts a handler at the beginning of the list while avoiding double-insertions"""
handlers = [handler]
for hnd in self.shell.events.callbacks[event_name]:
self.shell.events.unregister(event_name, hnd)
if hnd != handler: # in case it's already registered, skip it
handlers.append(hnd)
for hnd in handlers:
self.shell.events.register(event_name, hnd)
def unregister(self):
"""\
Unregisters all hooks, effectively disabling the plugin.
"""
try:
self.shell.events.unregister('pre_run_cell', self.pre_run_cell)
except ValueError:
pass
try:
self.shell.events.unregister('post_run_cell', self.post_run_cell)
except ValueError:
pass
def register_hooks(self):
"""\
Register all hooks with Jupyter.
:return: None
"""
self._register_handler('pre_run_cell', self.pre_run_cell)
self._register_handler('post_run_cell', self.post_run_cell)
native_display_publisher = self.shell.display_pub
if not isinstance(native_display_publisher, GfDisplayPublisher):
self.shell.display_pub = GfDisplayPublisher(native_display_publisher, display_trap=self.display_trap)
_GF_EXTENSION = None # GoFigrExtension global
[docs]
@require_configured
def get_extension():
"""Returns the GoFigr Jupyter extension instance"""
return _GF_EXTENSION
def _load_ipython_extension(ip):
"""\
Loads the Jupyter extension. Aliased to "load_ipython_extension" (no leading underscore) in the main init.py file.
:param ip: IPython shell
:return: None
"""
global _GF_EXTENSION
if _GF_EXTENSION is not None:
_GF_EXTENSION.unregister()
_GF_EXTENSION = _GoFigrExtension(ip)
_GF_EXTENSION.register_hooks()
try:
configure()
except Exception as e: # pylint: disable=broad-exception-caught
if "auth" in str(e).lower() and "failed" in str(e).lower():
print("GoFigr authentication failed. Please manually call configure(api_key=<YOUR API KEY>).",
file=sys.stderr)
else:
print(traceback.format_exc(), file=sys.stderr)
print(f"Could not automatically configure GoFigr. Please call configure() manually. Error: {e}",
file=sys.stderr)
ip.user_ns["configure"] = configure
ip.user_ns["get_extension"] = get_extension
ip.user_ns["publish"] = publish
# These have to use the fully qualified reference or isinstance won't work correctly
# after the extension is re-loaded. That's because gofigr.FindByName and gofigr.jupyter.FindByName (imported scope)
# will contain different instantiations of the type.
ip.user_ns["FindByName"] = gofigr.FindByName
ip.user_ns["ApiId"] = gofigr.ApiId
ip.user_ns["NotebookName"] = gofigr.NotebookName
[docs]
def parse_uuid(val):
"""\
Attempts to parse a UUID, returning None if input is not a valid UUID.
:param val: value to parse
:return: UUID (as a string) or None
"""
try:
return str(UUID(val))
except ValueError:
return None
[docs]
def proxy_callback(result):
"""Proxy callback"""
if result is not None and hasattr(result, 'metadata'):
get_extension().notebook_metadata = result.metadata
get_extension().resolve_analysis()
if get_extension().is_ready:
StartupWidget(get_extension()).show()
[docs]
class JupyterPublisher(Publisher):
"""\
Adds Jupyter-specific functionality to GoFigr's base Publisher class.
"""
def __init__(self, *args, clear=True, **kwargs):
super().__init__(*args, clear=clear, **kwargs)
@property
def _non_ipython_annotators(self):
return [ann for ann in self.annotators if not isinstance(ann, IPythonAnnotator)]
[docs]
def auto_publish_hook(self, extension, data, suppress_display=None):
"""\
Hook for automatically publishing figures without an explicit call to publish().
:param extension: GoFigrExtension instance
:param data: data being published. This will usually be a dictionary of mime formats.
:param suppress_display: if used in an auto-publish hook, this will contain a callable which will
suppress the display of this figure using the native IPython backend.
:return: None
"""
for backend in self.backends:
compatible_figures = list(backend.find_figures(extension.shell, data))
for fig in compatible_figures:
if not is_published(fig) and not is_suppressed(fig):
self.publish(fig=fig, backend=backend, suppress_display=suppress_display)
if len(compatible_figures) > 0:
break
[docs]
def publish(self, *args, **kwargs):
with SuppressDisplayTrap():
ext = get_extension()
if ext.cell is None:
revision = super().publish(*args, annotators=self._non_ipython_annotators, **kwargs)
ext.add_to_deferred(revision)
else:
revision = super().publish(*args, **kwargs)
return revision
# pylint: disable=too-many-arguments, too-many-locals
[docs]
@require_configured
def publish(fig=None, backend=None, **kwargs):
"""\
Publishes a figure. See :func:`gofigr.jupyter.Publisher.publish` for a list of arguments. If figure and backend
are both None, will publish default figures across all available backends.
:param fig: figure to publish
:param backend: backend to use
:param kwargs:
:return:
"""
ext = get_extension()
if fig is None and backend is None:
# If no figure and no backend supplied, publish default figures across all available backends
for available_backend in ext.publisher.backends:
fig = available_backend.get_default_figure(silent=True)
if fig is not None:
return ext.publisher.publish(fig=fig, backend=available_backend, **kwargs)
return None
else:
return ext.publisher.publish(fig=fig, backend=backend, **kwargs)
[docs]
@require_configured
def get_gofigr():
"""Gets the active GoFigr object."""
return get_extension().gf
[docs]
@require_configured
def download_file(api_id, file_name, path):
"""\
Downloads a file and saves it.
:param api_id: API ID of the revision containing the file
:param file_name: name of the file to download
:param path: where to save the file (either existing directory or full path with the file name)
:return: number of bytes written
"""
if os.path.exists(path) and os.path.isdir(path):
path = os.path.join(path, file_name)
gf = get_gofigr()
rev = gf.Revision(api_id=api_id).fetch(fetch_data=False)
for data in rev.file_data:
if data.name == file_name:
data.fetch()
data.write(path)
return len(data.data)
raise RuntimeError(f"Could not find file \"{file_name}\" in revision {api_id}.")
[docs]
@require_configured
def download_all(api_id, path):
"""\
Downloads all files attached to a revision
:param api_id: API ID of the revision containing the files
:param path: directory where to save the files
:return: number of bytes written
"""
if not os.path.exists(path) or not os.path.isdir(path):
raise RuntimeError(f"{path} does not exist or is not a directory")
gf = get_gofigr()
rev = gf.Revision(api_id=api_id).fetch(fetch_data=False)
num_bytes = 0
for data in rev.file_data:
data.fetch()
data.write(os.path.join(path, data.name))
num_bytes += len(data.data)
return num_bytes