import ast
import io
import inspect
import glob
import os
import re
import six
import sphinxcontrib.napoleon
import sphinxcontrib.napoleon.docstring
import sys
import traceback
from .value import pack, unpack
import numpy
import pandas
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt # pylint: disable=import-error
from .context import Context
undefined = object()
# Global variables that are always available regardless of what
# modules haved been `imported`
#
# See https://stackoverflow.com/a/28050317/4625911 for why
# to use `builtins` modules instead of `__builtin__`
from six.moves import builtins
GLOBALS = dir(builtins)
[docs]class PythonContext(Context):
def __init__(self, *args, **kwargs):
Context.__init__(self, *args, **kwargs)
if self._dir:
os.chdir(self._dir)
self._variables = {}
[docs] def libraries(self, *args):
return []
[docs] def list(self, types=[]):
names = []
for name, obj in self._scope.items():
if not len(types) or self.type(obj) in types:
names.append(name)
return names
[docs] def compile(self, cell):
"""
Compile a cell
:returns: A compiled ``cell``
"""
cell = Context.compile(self, cell)
try:
# Ensure this is a Python cell
if cell['lang'] and cell['lang'] != 'py':
raise RuntimeError('Cell code must be Python code')
else:
cell['lang'] = 'py'
# If the cell's code is empty, just return
code = cell['code'].strip()
if code == '':
return cell
# Parse the code and catch any syntax errors
try:
tree = ast.parse(code, mode='exec')
except SyntaxError as exc:
cell['messages'].append({
'type': 'error',
'message': str(exc),
'line': getattr(exc, 'lineno', 0),
'column': getattr(exc, 'offset', 0)
})
return cell
# Walk the AST to dtermine dependencies
ast_visitor = CompileAstVisitor(tree)
for name in ast_visitor.used:
if name in ast_visitor.declared or name in GLOBALS:
continue
present = False
for input in cell['inputs']:
if input.get('name') == name:
present = True
break
if not present:
cell['inputs'].append({
'name': name
})
# Determine the output from the last statement in the AST
last = tree.body[-1]
if isinstance(last, ast.Assign):
for target in last.targets:
if isinstance(target, ast.Name):
name = target.id
cell['outputs'] = [{
'name': name
}]
except Exception as exc:
cell['messages'].append({
'type': 'error',
'message': str(exc),
'trace': self._get_trace(exc)
})
return cell
[docs] def compile_func(self, func=None, file=None, dir=None):
"""
Compile a ``func`` operation
Parses the source of the function (either a string or a file
path) to extract it's ``description``, ``param``, ``return`` etc
properties.
>>> context.compile_func({
>>> 'type': 'func',
>>> 'source': 'def hello(who): return "Hello Hello %s!" % who'
>>> })
{
'type': 'func',
'name': 'hello',
'source': 'def hello(): return "Hello Hello %s!" % who'
'params': [{
'name': 'who'
}],
...
}
Parameters
----------
func : dict or string
A ``func`` operation. If a string is supplied then an operation
object is created with the ``source`` property set to
the string.
Returns
-------
func : dict
The compiled ``func`` operation
messages : list
A list of messages (e.g errors)
"""
messages = []
if func is None:
if file:
with open(file) as file_obj:
func, messages = self.compile_func({
'type': 'func',
'source': file_obj.read()
})
return func, messages
elif dir:
count =0
for file in glob.glob(dir + '/*.py'):
self.compile_func(file=file)
count += 1
return count
else:
raise RuntimeError('No function provided to compile!')
elif callable(func):
func_obj = func
func = {
'type': 'func',
'source': '\n'.join(inspect.getsourcelines(func_obj)[0]).strip()
}
func_name = func_obj.__code__.co_name
else:
# If necessary, wrap string arguments into an operation dict
if isinstance(func, str) or isinstance(func, bytes):
func = {
'type': 'func',
'source': func
}
# Obtain source code
source = func.get('source')
if not source:
raise RuntimeError('Not function source code specified in `source` or `file` properties')
# Parse function source and extract properties from the Function object
scope = {}
six.exec_(source, scope)
# Get name of function
names = [key for key in scope.keys() if not key.startswith('__')]
if len(names) > 1:
messages.append({
'type': 'warning',
'message': 'More than one function or object defining in function source: %s' % names
})
func_name = names[-1]
func_obj = scope[func_name]
# Extract parameter specifications
func_spec = inspect.getargspec(func_obj)
args = func_spec.args
if func_spec.varargs:
args.append(func_spec.varargs)
if func_spec.keywords:
args.append(func_spec.keywords)
params = []
for index, name in enumerate(args):
param = {
'name': name
}
if name == func_spec.varargs:
param['repeat'] = True
elif name == func_spec.keywords:
param['extend'] = True
if func_spec.defaults:
defaults_index = len(args) - len(func_spec.defaults) + index
if defaults_index > -1:
default = func_spec.defaults[defaults_index]
param['default'] = {
'type': Context.type(default),
'data': default
}
params.append(param)
# Get docstring and parse it for extra parameter specs
docstring = func_obj.__doc__
docstring_params = {}
docstring_returns = {}
if docstring:
docstring = trim_docstring(docstring)
config = sphinxcontrib.napoleon.Config(napoleon_use_param=True, napoleon_use_rtype=True)
docstring = sphinxcontrib.napoleon.docstring.NumpyDocstring(docstring, config, what='function').lines()
docstring = sphinxcontrib.napoleon.docstring.GoogleDocstring(docstring, config, what='function').lines()
summary = docstring[0]
description = ''
pattern = re.compile(r'^:(param|returns|type|rtype)(\s+(\w+))?:(.*)$')
for line in docstring[1:]:
match = pattern.match(line)
if match:
type = match.group(1)
name = match.group(3)
desc = match.group(4).strip()
if type == 'param':
param = docstring_params.get(name, {})
param['description'] = desc
docstring_params[name] = param
elif type == 'type':
param = docstring_params.get(name, {})
param['type'] = desc
docstring_params[name] = param
elif type == 'returns':
docstring_returns['description'] = desc
elif type == 'rtype':
docstring_returns['type'] = desc
else:
description += line + '\n'
description = description.strip()
if len(summary):
func.update({'summary': summary})
if len(description):
func.update({'description': description})
for name, spec in docstring_params.items():
for index, param in enumerate(params):
if param['name'] == name:
params[index].update(spec)
break
if len(docstring_returns):
func.update({'returns': docstring_returns})
func.update({
'name': func_name,
'params': params
})
# Register the function in scope
self._scope[func_name] = func_obj
return func, messages
[docs] def execute(self, cell):
cell = self.compile(cell)
try:
locals = {}
for input in cell['inputs']:
name = input.get('name')
value = input.get('value')
if not name:
raise RuntimeError('Name is required for input')
if not value:
raise RuntimeError('Value is required for input "%s"' % name)
if name in self._variables:
value = self._variables[name]
else:
value = unpack(value)
locals[name] = value
code = cell['code'].strip()
try:
six.exec_(code, {}, locals)
except RuntimeError as exc:
cell['messages'].append(self._runtime_error(exc))
# Evaluate the last line and if no error then make the value output
# This is inefficient in the sense that the last line is evaluated twice
# but alternative approaches would appear to require some code parsing
last = code.split('\n')[-1]
try:
output = eval(last, {}, locals)
except:
output = undefined
if output is undefined and len(cell['outputs']):
# If the last statement was an assignment then grab that variable
name = cell['outputs'][0]['name']
if name:
output = locals.get(name)
if output is not undefined:
packed = pack(output)
if len(cell['outputs']):
cell['outputs'][0]['value'] = packed
else:
cell['outputs'] = [{
'value': packed
}]
# Clear the current matplot figure (if any)
# after any plot has been packed as an output
plt.clf()
except Exception as exc:
cell['messages'].append({
'type': 'error',
'message': str(exc),
'trace': self._get_trace(exc)
})
return cell
def _runtime_error(self):
exc_type, exc_value, exc_traceback = sys.exc_info()
# Extract traceback and for compatibility with >=Py3.5 ensure converted to tuple
frames = [tuple(frame) for frame in traceback.extract_tb(exc_traceback)]
# Remove first associated with line that executes code above
frames = frames[1:]
# If this is Python 2 then also need to remove the frames for the six.exec_ function
if len(frames) and frames[0][0][-7:] == '/six.py':
frames = frames[2:]
# Replace '<string>' with 'code' for file, and '<module>' with '' for function
# (filename, line number, function name, text)
trace = []
for frame in frames:
trace.append([
frame[0].replace('<string>', 'code'),
frame[1],
frame[2].replace('<module>', ''),
'' if frame[3] is None else frame[3]
])
# Get line number from last entry
if len(trace):
line = trace[-1][1]
else:
# No trace when syntax error
line = 0
return {
'type': 'error',
'line': line,
'column': 0,
'message': exc_type.__name__ + ': ' + traceback._some_str(exc_value)
}
def _get_trace(self, exc):
stream = io.StringIO() if six.PY3 else io.BytesIO()
traceback.print_exc(file=stream)
return stream.getvalue()
PythonContext.spec = {
'name': 'PythonContext',
'client': 'ContextHttpClient'
}
class CompileAstVisitor(ast.NodeVisitor):
# Use ast.dump to find out about structure of node types
# > import ast
# > ast.dump(ast.parse('x=1').body[0])
# "Assign(targets=[Name(id='x', ctx=Store())], value=Num(n=1))"
def __init__(self, tree):
self.declared = []
self.used = []
self.visit(tree)
def aliases(self, aliases):
for alias in aliases:
name = alias.asname if alias.asname else alias.name
self.declared.append(name)
def visit_Import(self, node):
self.aliases(node.names)
def visit_ImportFrom(self, node):
self.aliases(node.names)
def visit_Global(self, node):
for name in node.names:
self.declared.append(name)
def visit_FunctionDef(self, node):
# Do not visit function body
self.declared.append(node.name)
def visit_ClassDef(self, node):
# Do not visit class body
self.declared.append(node.name)
def visit_Assign(self, node):
# There are various forms of assignment including
# attribute, subscript, list, tuple etc.
# We are only interested in name assignment
for target in node.targets:
if isinstance(target, ast.Name):
self.declared.append(target.id)
# Visit the `value` child node
self.visit(node.value)
def visit_For(self, node):
"""
Visit a `For` node
At present, don't visit it, avoiding the
`target` (ie the itertor variable) being treated
as an input. In the future this should be fixed
so that proper, variable scoping is accounted for.
"""
return
def visit_Name(self, node):
self.used.append(node.id)
def trim_docstring(docstring):
"""From https://www.python.org/dev/peps/pep-0257/"""
if not docstring:
return ''
# Convert tabs to spaces (following the normal Python rules)
# and split into a list of lines:
lines = docstring.expandtabs().splitlines()
# Determine minimum indentation (first line doesn't count):
indent = sys.maxsize
for line in lines[1:]:
stripped = line.lstrip()
if stripped:
indent = min(indent, len(line) - len(stripped))
# Remove indentation (first line is special):
trimmed = [lines[0].strip()]
if indent < sys.maxsize:
for line in lines[1:]:
trimmed.append(line[indent:].rstrip())
# Strip off trailing and leading blank lines:
while trimmed and not trimmed[-1]:
trimmed.pop()
while trimmed and not trimmed[0]:
trimmed.pop(0)
# Return a single string:
return '\n'.join(trimmed)