Source code for gofigr.annotators
"""\
Copyright (c) 2023, Flagstaff Solutions, LLC
All rights reserved.
"""
import json
import os
import re
import subprocess
import sys
from abc import ABC
from urllib.parse import unquote, urlparse
from gofigr import CodeLanguage
from gofigr.context import RevisionContext
from gofigr.databricks import get_dbutils
PATH_WARNING = "To fix this warning, you can manually specify the notebook name & path in the call to configure(). " \
"Please see https://gofigr.io/docs/gofigr-python/latest/customization.html#notebook-name-path " \
"for details."
[docs]
class Annotator(ABC):
"""\
Annotates figure revisions with pertinent information, such as cell code, variable values, etc.
"""
def __init__(self, extension):
self.extension = extension
[docs]
def annotate(self, revision):
"""
Annotates the figure revision.
:param revision: FigureRevision
:return: annotated FigureRevision
"""
return revision
[docs]
class CellIdAnnotator(Annotator):
"""Annotates revisions with the ID of the Jupyter cell"""
[docs]
def annotate(self, revision):
if revision.metadata is None:
revision.metadata = {}
try:
cell_id = self.extension.cell.cell_id
except AttributeError:
cell_id = None
revision.metadata['cell_id'] = cell_id
return revision
[docs]
class CellCodeAnnotator(Annotator):
""""Annotates revisions with cell contents"""
[docs]
def annotate(self, revision):
if self.extension.cell is not None:
code = self.extension.cell.raw_cell
else:
code = "N/A"
revision.data.append(revision.client.CodeData(name="Jupyter Cell",
language=CodeLanguage.PYTHON,
contents=code))
return revision
[docs]
class PipFreezeAnnotator(Annotator):
"""Annotates revisions with the output of pip freeze"""
def __init__(self, extension, cache=True):
"""\
:param extension: the GoFigr Jupyter extension
:param cache: if True, will only run pip freeze once and cache the output
"""
super().__init__(extension)
self.cache = cache
self.cached_output = None
[docs]
def annotate(self, revision):
if self.cache and self.cached_output:
output = self.cached_output
else:
try:
output = subprocess.check_output(["pip", "freeze"]).decode('ascii')
self.cached_output = output
except subprocess.CalledProcessError as e:
output = e.output
revision.data.append(revision.client.TextData(name="pip freeze", contents=output))
return revision
[docs]
class SystemAnnotator(Annotator):
"""Annotates revisions with the OS version"""
[docs]
def annotate(self, revision):
try:
output = subprocess.check_output(["uname", "-a"]).decode('ascii')
except subprocess.CalledProcessError as e:
output = e.output
revision.data.append(revision.client.TextData(name="System Info", contents=output))
return revision
NOTEBOOK_PATH = "notebook_path"
NOTEBOOK_NAME = "notebook_name"
NOTEBOOK_URL = "url"
NOTEBOOK_KERNEL = "kernel"
PYTHON_VERSION = "python_version"
BACKEND_NAME = "backend"
_ACTIVE_TAB_TITLE = "active_tab_title"
def _parse_path_from_tab_title(title):
"""Parses out the notebook path from the tab/widget title"""
for line in title.splitlines(keepends=False):
m = re.match(r'Path:\s*(.*)\s*', line)
if m:
return m.group(1)
return None
[docs]
class NotebookMetadataAnnotator(Annotator):
""""Annotates revisions with notebook metadata, including filename & path, as well as the full URL"""
[docs]
def parse_from_databricks(self):
"""Returns notebook path if running in Databricks"""
try:
# pylint: disable=undefined-variable
context = get_dbutils(self.extension.shell).notebook.entry_point.getDbutils().notebook().getContext()
nb = context.notebookPath().get()
return {NOTEBOOK_PATH: nb, NOTEBOOK_NAME: os.path.basename(nb)}
except Exception: # pylint: disable=broad-exception-caught
return None
[docs]
def parse_from_vscode(self):
"""Returns notebook path if running in VSCode"""
if self.extension.cell is None or self.extension.cell.cell_id is None:
return None
elif "vscode-notebook-cell:" not in self.extension.cell.cell_id:
return None
m = re.match(r'^vscode-notebook-cell:(.*)#.*$', unquote(self.extension.cell.cell_id))
if m is None:
return None
notebook_path = m.group(1)
notebook_name = os.path.basename(notebook_path)
return {NOTEBOOK_PATH: notebook_path,
NOTEBOOK_NAME: notebook_name}
[docs]
def try_get_metadata(self):
"""Infers the notebook path & name using currently available metadata if possible, returning None otherwise"""
try:
return self.parse_metadata(error=False)
except Exception: # pylint: disable=broad-exception-caught
return None
def _parse_from_proxy(self, meta, error):
if 'url' not in meta and _ACTIVE_TAB_TITLE not in meta:
if error:
raise RuntimeError("No URL found in Notebook metadata")
else:
return None
notebook_name = None
# Try parsing the name from the title first
if _ACTIVE_TAB_TITLE in meta and meta[_ACTIVE_TAB_TITLE] is not None:
notebook_name = _parse_path_from_tab_title(meta[_ACTIVE_TAB_TITLE])
# If that doesn't work, try the URL
if notebook_name is None:
notebook_name = unquote(urlparse(meta['url']).path.rsplit('/', 1)[-1])
notebook_dir = self.extension.shell.starting_dir
full_path = None
for candidate_path in [os.path.join(notebook_dir, notebook_name),
os.path.join(notebook_dir, os.path.basename(notebook_name)),
os.path.join(os.path.dirname(notebook_dir), notebook_name),
os.path.join(os.path.dirname(notebook_dir), os.path.basename(notebook_name))]:
if os.path.exists(candidate_path):
full_path = candidate_path
break
if full_path is None:
full_path = os.path.join(notebook_dir, notebook_name) # might still be helpful, even if slightly incorrect
print(f"The inferred path for the notebook does not exist: {full_path}. {PATH_WARNING}", file=sys.stderr)
return {NOTEBOOK_PATH: full_path,
NOTEBOOK_NAME: notebook_name,
NOTEBOOK_URL: meta.get('url')}
[docs]
def parse_metadata(self, error=True):
"""
Infers the notebook path & name from metadata passed through the WebSocket (if available)
:param error: if True, will raise an error if metadata is not available
"""
vsc_meta = self.parse_from_vscode()
if vsc_meta is not None:
return vsc_meta
db_meta = self.parse_from_databricks()
if db_meta is not None:
return db_meta
# At this point the metadata needs to come from the JavaScript proxy
meta = self.extension.notebook_metadata
if meta is None and error:
raise RuntimeError("No Notebook metadata available")
elif meta is None:
return None
return self._parse_from_proxy(meta, error)
[docs]
def annotate(self, revision):
if revision.metadata is None:
revision.metadata = {}
try:
if NOTEBOOK_NAME not in revision.metadata or NOTEBOOK_PATH not in revision.metadata:
revision.metadata.update(self.parse_metadata())
full_path = revision.metadata.get(NOTEBOOK_PATH)
if full_path and os.path.exists(full_path):
revision.data += [revision.client.FileData.read(full_path)]
except Exception as e: # pylint: disable=broad-exception-caught
print(f"GoFigr could not automatically obtain the name of the currently"
f" running notebook. {PATH_WARNING} Details: {e}",
file=sys.stderr)
revision.metadata[NOTEBOOK_NAME] = "N/A"
revision.metadata[NOTEBOOK_PATH] = "N/A"
return revision
[docs]
class NotebookNameAnnotator(NotebookMetadataAnnotator):
"""(Deprecated) Annotates revisions with notebook name & path"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
print("NotebookNameAnnotator is deprecated. Please use NotebookMetadataAnnotator", file=sys.stderr)
[docs]
class EnvironmentAnnotator(Annotator):
"""Annotates revisions with the python version & the kernel info"""
[docs]
def annotate(self, revision):
if revision.metadata is None:
revision.metadata = {}
revision.metadata[NOTEBOOK_KERNEL] = sys.executable
revision.metadata[PYTHON_VERSION] = sys.version
return revision
[docs]
class BackendAnnotator(Annotator):
"""Annotates revisions with the python version & the kernel info"""
[docs]
def annotate(self, revision):
if revision.metadata is None:
revision.metadata = {}
context = RevisionContext.get(revision)
revision.metadata[BACKEND_NAME] = context.backend.get_backend_name() if context and context.backend else "N/A"
return revision
[docs]
class HistoryAnnotator(Annotator):
"""Annotates revisions with IPython execution history"""
[docs]
def annotate(self, revision):
context = RevisionContext.get(revision)
if not hasattr(context.extension.shell, 'history_manager'):
return revision
hist = context.extension.shell.history_manager
if hist is None:
return revision
revision.data.append(revision.client.CodeData(name="IPython history",
language="python",
format="jupyter-history/json",
contents=json.dumps(hist.input_hist_raw)))
return revision