Source code for stencila.sqlite_context

import os
import re
import sqlite3
import string

import pandas

from .value import pack, unpack

undefined = object()


[docs]class SqliteContext(object): def __init__(self, dir=None): self._dir = dir db = ':memory:' if dir: dbs = [file for file in os.listdir(dir) if re.match(r'^.*\.(sqlite(3?)|db(3?))$', file)] if len(dbs): db = os.path.join(dir, dbs[0]) self._connection = sqlite3.connect(db)
[docs] def runCode(self, code): errors = None output = undefined try: if code.lstrip().upper().startswith('SELECT '): output = pandas.read_sql_query(code, self._connection) else: self._connection.execute(code) except Exception as exc: errors = [{ 'line': 0, 'column': 0, 'message': str(exc) }] return { 'errors': errors, 'output': None if output is undefined else pack(output) }
[docs] def callCode(self, code, inputs={}): variables = {} for name, package in inputs.items(): value = unpack(package) if isinstance(value, pandas.DataFrame): value.to_sql(name, self._connection, flavor='sqlite', if_exists='replace', index=False) elif type(value) == list: variables[name] = tuple(value) else: variables[name] = value # Non-table inputs are available as text substitution variables code_ = string.Template(code).substitute(variables) return self.runCode(code_)
SqliteContext.spec = { 'name': 'SqliteContext', 'base': 'Context', 'aliases': ['sql', 'sqlite'] }