1167 lines
42 KiB
Python
Executable file
1167 lines
42 KiB
Python
Executable file
#! /usr/bin/python3
|
|
# Since peter uses emacs, remind it that this is a -*- python -*- file.
|
|
|
|
import sys
|
|
sys.path.insert(0, sys.path[0] + "/lib")
|
|
|
|
import argparse
|
|
import collections
|
|
import errno
|
|
import fcntl
|
|
import importlib
|
|
import locale
|
|
import os
|
|
import os.path
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
import urwid
|
|
|
|
import newfol.database
|
|
import newfol.exception
|
|
import newfol.filemanip as filemanip
|
|
import newfol.version
|
|
|
|
from newfol.database import DatabaseData, DatabaseVersion
|
|
|
|
__version__ = newfol.version.version()
|
|
|
|
logfd = None
|
|
|
|
def log(*args):
|
|
global logfd
|
|
if "NEWFOL_DEBUG" not in os.environ:
|
|
return
|
|
if logfd is None:
|
|
logfd = open("/tmp/newfol_debug.log", "w")
|
|
print(*args, file=logfd)
|
|
logfd.flush()
|
|
|
|
class KeyboardShortcuts(dict):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self["Q"] = "quit"
|
|
self["q"] = "quit"
|
|
self["f1"] = "previous"
|
|
self["f2"] = "sync-database"
|
|
self["f3"] = "add"
|
|
self["f4"] = "search"
|
|
self["f5"] = "next"
|
|
self["f6"] = "next-secondary"
|
|
self["ctrl f6"] = "next-tertiary"
|
|
self["f7"] = "menu"
|
|
self["ctrl f5"] = "invert-selection"
|
|
self["ctrl f"] = "browse-all"
|
|
self["ctrl b"] = "browse"
|
|
self["ctrl t"] = "cheat-sheet"
|
|
self["ctrl x"] = "about"
|
|
self["enter"] = "select"
|
|
self["meta h"] = "backspace"
|
|
self["meta i"] = "up"
|
|
self["meta n"] = "down"
|
|
self["meta d"] = "right"
|
|
self["meta k"] = "left"
|
|
self["meta g"] = "delete"
|
|
self["meta y"] = "remove"
|
|
self["tab"] = "next-field"
|
|
self["meta f"] = "next-field"
|
|
self["meta j"] = "previous-field"
|
|
self["shift tab"] = "previous-field"
|
|
self["meta u"] = "home"
|
|
self["meta ;"] = "end"
|
|
keys = list(self.keys())
|
|
for i in keys:
|
|
mobj = re.search(r"^(.*\w+ )([a-z])$", i)
|
|
if mobj is not None:
|
|
self[mobj.group(1) + mobj.group(2).upper()] = self[i]
|
|
def shortcut(self, name):
|
|
for k, v in self.items():
|
|
if name == v:
|
|
return k if len(k) == 1 else k.title().replace(" ", "-")
|
|
raise KeyError
|
|
|
|
class SelectableListBox(urwid.ListBox):
|
|
def __init__(self, body, obj=None):
|
|
super().__init__(body)
|
|
self.cb_obj = obj
|
|
def _do_cb(self, type_, arg):
|
|
if self.cb_obj is not None:
|
|
self.cb_obj.callback(type_, arg)
|
|
def keypress(self, size, key):
|
|
try:
|
|
key = DatabaseData().keys[key]
|
|
except:
|
|
pass
|
|
if key in ("up", "down"):
|
|
self._do_cb("key", key)
|
|
off = -1 if key == "up" else 1
|
|
focus = self.get_focus()[1] + off
|
|
if focus < len(self.body) and focus >= 0:
|
|
self.set_focus(focus)
|
|
elif key in ("select", "remove"):
|
|
self._do_cb(key, self.get_focus()[0])
|
|
else:
|
|
key = super().keypress(size, key)
|
|
return key
|
|
|
|
class Tabbable:
|
|
def set_grid(self, grid):
|
|
self.grid = grid
|
|
def select_item(self):
|
|
pass
|
|
def _handle_tab(self, key):
|
|
try:
|
|
key = DatabaseData().keys[key]
|
|
except:
|
|
pass
|
|
if key in ("next-field", "previous-field"):
|
|
item = 0
|
|
for i in range(len(self.grid.cells)):
|
|
if self in self.grid.cells[i].widget_list:
|
|
item = i
|
|
break
|
|
offset = 1 if key == "next-field" else -1
|
|
while True:
|
|
item += offset
|
|
if item >= len(self.grid.cells):
|
|
item = 0
|
|
elif item < 0:
|
|
item = len(self.grid.cells)-1
|
|
widget = self.grid.cells[item]
|
|
if len(widget.widget_list) > 0:
|
|
self.grid.set_focus(self.grid.cells[item])
|
|
widget.widget_list[1].select_item()
|
|
return key
|
|
|
|
class FancyCheckBox(urwid.CheckBox, Tabbable):
|
|
def select_item(self):
|
|
self.toggle_state()
|
|
self.toggle_state()
|
|
def keypress(self, size, key):
|
|
try:
|
|
key = DatabaseData().keys[key]
|
|
except:
|
|
pass
|
|
if len(key) == 1:
|
|
self.toggle_state()
|
|
elif key in ("next-field", "previous-field"):
|
|
return self._handle_tab(key)
|
|
else:
|
|
return super().keypress(size, key)
|
|
|
|
class FancyRadioButton(urwid.RadioButton, Tabbable):
|
|
def select_item(self):
|
|
pass
|
|
def keypress(self, size, key):
|
|
try:
|
|
key = DatabaseData().keys[key]
|
|
except:
|
|
pass
|
|
if len(key) == 1:
|
|
self.toggle_state()
|
|
elif key in ("next-field", "previous-field"):
|
|
return self._handle_tab(key)
|
|
else:
|
|
return super().keypress(size, key)
|
|
|
|
class RecordEditBox(urwid.Edit, Tabbable):
|
|
def select_item(self):
|
|
self.set_edit_pos(0)
|
|
def get_state(self):
|
|
return bool(self.get_edit_text())
|
|
def toggle_state(self):
|
|
if self.get_state():
|
|
self.set_edit_text("")
|
|
else:
|
|
self.set_edit_text("choisi")
|
|
def keypress(self, size, key):
|
|
try:
|
|
if len(key) > 1 and key != "enter":
|
|
key = DatabaseData().keys[key]
|
|
except:
|
|
pass
|
|
if key in ("next-field", "previous-field"):
|
|
return self._handle_tab(key)
|
|
else:
|
|
return super().keypress(size, key)
|
|
|
|
class RecordFormatter:
|
|
def __init__(self, recs, fields):
|
|
self._recs = recs
|
|
self._fields = fields
|
|
@staticmethod
|
|
def longest(x):
|
|
if len(x) == 0:
|
|
return []
|
|
res = [0] * len(x[0])
|
|
for l in x:
|
|
for i in range(len(l)):
|
|
if len(l[i]) > res[i]:
|
|
res[i] = len(l[i])
|
|
return res
|
|
def format(self):
|
|
item_fields = []
|
|
for rec in self._recs:
|
|
item = []
|
|
for field in self._fields:
|
|
if field == "table":
|
|
item.append(rec.table)
|
|
else:
|
|
item.append(rec.fields[field])
|
|
item_fields.append(item)
|
|
lengths = self.longest(item_fields)
|
|
formatted = []
|
|
for item in item_fields:
|
|
pieces = []
|
|
for field, length in zip(item, lengths):
|
|
fmt = "{:<%d}" % length
|
|
pieces.append(fmt.format(field))
|
|
formatted.append(": ".join(pieces))
|
|
return formatted
|
|
|
|
class View:
|
|
def render(self, loop):
|
|
pass
|
|
def rerender(self, loop):
|
|
pass
|
|
def callback(self, type_, obj):
|
|
pass
|
|
|
|
class IntroView(View):
|
|
def render(self, loop):
|
|
self.loop = loop
|
|
dbd = DatabaseData()
|
|
modename = "disabled." if not dbd.schema().execution_allowed() else "enabled."
|
|
text = ["newfol " + __version__,
|
|
"Press %s to quit at any time." % dbd.keys.shortcut("quit"),
|
|
"Press %s to get started." % dbd.keys.shortcut("menu"),
|
|
"Press %s to see database information." %
|
|
dbd.keys.shortcut("about")]
|
|
pile = urwid.Pile([
|
|
urwid.AttrMap(urwid.Text(("text", i), align="center"), "bg")
|
|
for i in text])
|
|
fill = urwid.Filler(pile)
|
|
self.widget = urwid.AttrMap(fill, "bg")
|
|
loop.widget = self.widget
|
|
dbd.views.append(self)
|
|
def rerender(self, loop):
|
|
loop.widget = self.widget
|
|
|
|
class StandardView(View):
|
|
def _do_render(self, loop, func):
|
|
self.loop = loop
|
|
body = func(loop)
|
|
if body is None:
|
|
return
|
|
pile = urwid.Pile([])
|
|
head = urwid.AttrMap(pile, "top")
|
|
top = urwid.Frame(body, head)
|
|
self.widget = top
|
|
loop.widget = self.widget
|
|
return self.widget
|
|
def render(self, loop):
|
|
if self._do_render(loop, self._render_standard) is not None:
|
|
DatabaseData().views.append(self)
|
|
def rerender(self, loop):
|
|
if hasattr(self, "_rerender_standard"):
|
|
return self._do_render(loop, self._rerender_standard)
|
|
loop.widget = self.widget
|
|
|
|
class AboutView(StandardView):
|
|
@staticmethod
|
|
def _display_option(name, *values):
|
|
if isinstance(values[0], bool):
|
|
return ("+" if values[0] else "-") + name
|
|
else:
|
|
return "".join(["+", name] +
|
|
["".join(["(", str(val), ")"]) for val in values])
|
|
def _render_standard(self, loop):
|
|
self.loop = loop
|
|
shortcuts = tuple(map(lambda x: DatabaseData().keys.shortcut(x),
|
|
["quit", "previous", "sync-database", "add", "search", "next",
|
|
"next-secondary", "next-tertiary", "menu", "about", "browse"]))
|
|
try:
|
|
recver = DatabaseData().records[0].version()
|
|
except IndexError:
|
|
recver = '?'
|
|
msgs = [
|
|
"newfol " + __version__,
|
|
"Keys: %s to quit, %s: previous view, %s: write to disk," %
|
|
tuple(shortcuts[0:3]),
|
|
"%s: add record, %s: search, %s: next, %s: secondary next," %
|
|
tuple(shortcuts[3:7]),
|
|
"%s: tertiary next, %s: menu, %s: this screen," %
|
|
tuple(shortcuts[7:10]),
|
|
"%s: browse by table" %
|
|
tuple(shortcuts[10:]),
|
|
"",
|
|
" ".join([
|
|
self._display_option("exe_ok",
|
|
DatabaseData().schema().execution_allowed()),
|
|
self._display_option("homedir", DatabaseData().homedir)]),
|
|
" ".join([
|
|
self._display_option("serialization",
|
|
DatabaseData().version.serialization(),
|
|
DatabaseData().version.serialization_version()),
|
|
self._display_option("rec_ver",
|
|
DatabaseData().version.record_version(),
|
|
recver,
|
|
filemanip.Record([]).version())
|
|
]),
|
|
]
|
|
pile = urwid.Pile([
|
|
urwid.AttrMap(urwid.Text(("text", i), align="center"), "bg")
|
|
for i in msgs])
|
|
fill = urwid.Filler(pile)
|
|
return urwid.AttrMap(fill, "bg")
|
|
|
|
class RecordView(StandardView):
|
|
def __init__(self, button, record=None):
|
|
dbd = DatabaseData()
|
|
if record is None:
|
|
self.rec = filemanip.Record([None] * dbd.schema().nfields())
|
|
else:
|
|
self.rec = record
|
|
self.button_name = button
|
|
def _compute_titles_from_table(self, table):
|
|
titles = []
|
|
dbd = DatabaseData()
|
|
for i in range(dbd.schema().nfields() + 1):
|
|
if i == 0:
|
|
text = " sub.dtb name"
|
|
elif table in dbd.schema().mapping():
|
|
text = dbd.schema().mapping()[table].desc[i-1] or ""
|
|
else:
|
|
text = ""
|
|
titles.append(text)
|
|
return titles
|
|
def _set_titles_for_grid(self, table):
|
|
titles = self._compute_titles_from_table(table)
|
|
for i in range(DatabaseData().schema().nfields() + 1):
|
|
header = urwid.Text(titles[i])
|
|
head = urwid.AttrMap(header, "description")
|
|
self.cells[i].widget_list[0] = head
|
|
def _map_cells(self):
|
|
cells = []
|
|
for item in DatabaseData().layout:
|
|
if item is None:
|
|
pile = urwid.Pile([])
|
|
else:
|
|
pile = self.cells[item]
|
|
cells.append(pile)
|
|
return cells
|
|
def _create_body(self, rec, i):
|
|
try:
|
|
if i == 0:
|
|
text = rec.table
|
|
else:
|
|
text = rec.fields[i-1]
|
|
text = text or ""
|
|
except:
|
|
text = ""
|
|
return RecordEditBox(edit_text=text, multiline=True)
|
|
def _render_record(self, rec):
|
|
self.cells = []
|
|
dbd = DatabaseData()
|
|
for i in range(dbd.schema().nfields() + 1):
|
|
head = urwid.AttrMap(urwid.Text(""), "description")
|
|
body = self._create_body(rec, i)
|
|
pile = urwid.Pile([head, body])
|
|
self.cells.append(pile)
|
|
mapped_cells = self._map_cells()
|
|
screen_cols = self.loop.screen.get_cols_rows()[0]
|
|
grid = urwid.GridFlow(mapped_cells,
|
|
int((screen_cols - 2) / dbd.columns), 1, 1, "left")
|
|
self.grid = grid
|
|
for i in self.cells:
|
|
i.widget_list[1].set_grid(self.grid)
|
|
self._set_titles_for_grid(rec.table)
|
|
return grid
|
|
def callback(self, type_, obj):
|
|
if type_ == "select":
|
|
return
|
|
if type_ == "cancel":
|
|
render_previous_view(self.loop)
|
|
def _setup_button(self, button):
|
|
button.cb_obj = self
|
|
def callback_glue(button):
|
|
type_ = "cancel" if button.label == "Cancel" else "select"
|
|
button.cb_obj.callback(type_, button)
|
|
urwid.connect_signal(button, 'click', callback_glue)
|
|
def _render_buttons(self):
|
|
button_ok = urwid.Button(self.button_name)
|
|
button_cancel = urwid.Button("Cancel")
|
|
self._setup_button(button_ok)
|
|
self._setup_button(button_cancel)
|
|
return urwid.Columns([button_ok, button_cancel])
|
|
def _render_standard(self, loop):
|
|
self.loop = loop
|
|
# We need a box widget for the frame, but GridFlow is a flow widget.
|
|
pile = urwid.Pile([self._render_record(self.rec)])
|
|
return urwid.Filler(pile, valign="top")
|
|
|
|
class DisplayRecordView(RecordView):
|
|
def __init__(self, record=None):
|
|
super().__init__("Commit", record)
|
|
def _build_record(self):
|
|
fields = [i.widget_list[1].get_edit_text() for i in self.cells]
|
|
rec = filemanip.Record(fields[1:])
|
|
rec.table = fields[0]
|
|
return rec
|
|
def _autofill(self, table):
|
|
dbd = DatabaseData()
|
|
if not dbd.schema().execution_allowed():
|
|
return False
|
|
if table not in dbd.schema().mapping():
|
|
return False
|
|
mapping = dbd.schema().mapping()[table].get_mapping_onto("internal",
|
|
"default")
|
|
for i in range(0, len(mapping)):
|
|
if mapping[i] == "" or mapping[i] is None:
|
|
continue
|
|
try:
|
|
val = eval(mapping[i])
|
|
if val is None:
|
|
continue
|
|
except Exception as e:
|
|
mbox = MessageBox(("{0} is not a valid " +
|
|
"expression.").format(mapping[i]))
|
|
mbox.render(self.loop)
|
|
mapping[i] = None
|
|
continue
|
|
self.cells[i+1].widget_list[1].set_edit_text(str(val))
|
|
def callback(self, type_, obj):
|
|
if type_ in ("select", "next"):
|
|
dbd = DatabaseData()
|
|
newrec = self._build_record()
|
|
recs = []
|
|
inserted = False
|
|
for r in dbd.records:
|
|
if r is self.rec:
|
|
r.fields[:] = newrec.fields[:]
|
|
r.table = newrec.table
|
|
inserted = True
|
|
recs.append(r)
|
|
if not inserted:
|
|
recs.append(newrec)
|
|
dbd.records[:] = recs
|
|
render_previous_view(self.loop)
|
|
elif type_ == "change":
|
|
self._set_titles_for_grid(obj[1])
|
|
if not any(self.rec.fields) and not self.rec.table:
|
|
self._autofill(obj[1])
|
|
else:
|
|
return super().callback(type_, obj)
|
|
def _setup_db_field(self):
|
|
def change(edit, new_text):
|
|
self.callback("change", (edit, new_text))
|
|
urwid.connect_signal(self.cells[0].widget_list[1], "change",
|
|
change)
|
|
def _render_standard(self, loop):
|
|
ret = super()._render_standard(loop)
|
|
self._setup_db_field()
|
|
return ret
|
|
|
|
class SearchRecordView(RecordView):
|
|
def __init__(self, record=None):
|
|
super().__init__("Select Display Template", record)
|
|
def _get_fields(self):
|
|
return [i.widget_list[1].get_edit_text() for i in self.cells]
|
|
def callback(self, type_, obj):
|
|
dbd = DatabaseData()
|
|
if type_ in ("select", "next"):
|
|
fields = self._get_fields()
|
|
recs = []
|
|
fieldres = []
|
|
for f in fields:
|
|
if len(f) == 0:
|
|
fieldres.append(None)
|
|
else:
|
|
try:
|
|
fieldres.append(re.compile(f, re.IGNORECASE))
|
|
except:
|
|
mb = MessageBox("'{0}' is not a valid regular expression".format(f))
|
|
mb.render(self.loop)
|
|
return
|
|
for rec in dbd.records:
|
|
include = True
|
|
if fieldres[0] is not None:
|
|
if fieldres[0].search(rec.table) is None:
|
|
continue
|
|
for field, reobj in zip(rec.fields, fieldres[1:]):
|
|
if reobj is None:
|
|
continue
|
|
if reobj.search(field) is not None:
|
|
continue
|
|
include = False
|
|
break
|
|
if include:
|
|
recs.append(rec)
|
|
view = DisplayTemplateRecordView(fields[0], recs)
|
|
view.render(self.loop)
|
|
elif type_ == "change":
|
|
self._set_titles_for_grid(obj[1])
|
|
else:
|
|
return super().callback(type_, obj)
|
|
def _setup_db_field(self):
|
|
def change(edit, new_text):
|
|
self.callback("change", (edit, new_text))
|
|
urwid.connect_signal(self.cells[0].widget_list[1], "change",
|
|
change)
|
|
def _render_standard(self, loop):
|
|
ret = super()._render_standard(loop)
|
|
self._setup_db_field()
|
|
return ret
|
|
|
|
class DisplayTemplateRecordView(RecordView):
|
|
def __init__(self, table, records):
|
|
super().__init__("Search")
|
|
self.records = records
|
|
self.table = table
|
|
def _create_body(self, rec, i):
|
|
if i == 0:
|
|
return super()._create_body(rec, i)
|
|
else:
|
|
return FancyCheckBox("")
|
|
def _get_fields(self):
|
|
return [i.widget_list[1].get_state() for i in self.cells]
|
|
def callback(self, type_, obj):
|
|
if type_.startswith("select") or type_.startswith("next"):
|
|
dbd = DatabaseData()
|
|
selected = self._get_fields()
|
|
view = SortingTemplateRecordView(self.table, self.records,
|
|
selected)
|
|
view.render(self.loop)
|
|
elif type_ == "invert-selection":
|
|
for cell in self.cells[1:]:
|
|
editbox = cell.widget_list[1]
|
|
editbox.toggle_state()
|
|
elif type_ == "change":
|
|
self._set_titles_for_grid(obj[1])
|
|
else:
|
|
return super().callback(type_, obj)
|
|
def _setup_db_field(self):
|
|
def change(edit, new_text):
|
|
self.callback("change", (edit, new_text))
|
|
urwid.connect_signal(self.cells[0].widget_list[1], "change",
|
|
change)
|
|
def _setup_button(self, button):
|
|
button.cb_obj = self
|
|
def callback_glue(button):
|
|
if button.label == "Cancel":
|
|
type_ = "cancel"
|
|
else:
|
|
type_ = "select"
|
|
button.cb_obj.callback(type_, button)
|
|
urwid.connect_signal(button, 'click', callback_glue)
|
|
def _render_buttons(self):
|
|
button_ok = urwid.Button("Select Sort Order")
|
|
button_cancel = urwid.Button("Cancel")
|
|
self._setup_button(button_ok)
|
|
self._setup_button(button_cancel)
|
|
return urwid.Columns([button_ok, button_cancel])
|
|
def _render_standard(self, loop):
|
|
ret = super()._render_standard(loop)
|
|
self._setup_db_field()
|
|
self.cells[0].widget_list[1].set_edit_text(self.table)
|
|
return ret
|
|
|
|
class SortingTemplateRecordView(RecordView):
|
|
def __init__(self, table, records, selected):
|
|
super().__init__("Search")
|
|
self.records = records
|
|
self.table = table
|
|
self.selected = selected
|
|
self._group = []
|
|
def _create_body(self, rec, i):
|
|
if i == 0:
|
|
return super()._create_body(rec, i)
|
|
else:
|
|
return FancyRadioButton(self._group, "", False)
|
|
def _get_fields(self):
|
|
l = [i.widget_list[1].get_state() for i in self.cells]
|
|
# We can't use True in l here because it looks for a True value, not
|
|
# actually the fixed value True.
|
|
try:
|
|
l[1:].index(True)
|
|
l[0] = False
|
|
except ValueError:
|
|
l[0] = True
|
|
return l
|
|
def callback(self, type_, obj):
|
|
if type_.startswith("select") or type_.startswith("next"):
|
|
dbd = DatabaseData()
|
|
fieldno = self._get_fields().index(True)
|
|
def lookup(x):
|
|
text = ""
|
|
if fieldno == 0:
|
|
text = x.table
|
|
else:
|
|
text = x.fields[fieldno-1]
|
|
return str.lower(text or "")
|
|
keyfunc = lookup
|
|
self.records.sort(key=keyfunc)
|
|
view = SearchListView(self.records, self.selected)
|
|
view.set_render_type(type_)
|
|
view.render(self.loop)
|
|
elif type_ == "change":
|
|
self._set_titles_for_grid(obj[1])
|
|
else:
|
|
return super().callback(type_, obj)
|
|
def _setup_db_field(self):
|
|
def change(edit, new_text):
|
|
self.callback("change", (edit, new_text))
|
|
urwid.connect_signal(self.cells[0].widget_list[1], "change",
|
|
change)
|
|
def _setup_button(self, button):
|
|
button.cb_obj = self
|
|
def callback_glue(button):
|
|
if button.label == "Cancel":
|
|
type_ = "cancel"
|
|
elif button.label == "Search":
|
|
type_ = "select"
|
|
else:
|
|
type_ = "next-secondary"
|
|
button.cb_obj.callback(type_, button)
|
|
urwid.connect_signal(button, 'click', callback_glue)
|
|
def _render_buttons(self):
|
|
button_ok = urwid.Button("Search")
|
|
button_ed = urwid.Button("Search and View in Editor")
|
|
button_cancel = urwid.Button("Cancel")
|
|
self._setup_button(button_ok)
|
|
self._setup_button(button_ed)
|
|
self._setup_button(button_cancel)
|
|
return urwid.Columns([button_ok, button_ed, button_cancel])
|
|
def _render_standard(self, loop):
|
|
ret = super()._render_standard(loop)
|
|
self._setup_db_field()
|
|
self.cells[0].widget_list[1].set_edit_text(self.table)
|
|
return ret
|
|
|
|
class ListView(StandardView):
|
|
def __init__(self):
|
|
self._full_rerender = False
|
|
def rerender(self, loop):
|
|
self._rerender_listview()
|
|
return super().rerender(loop)
|
|
def _get_selected_item_position(self):
|
|
return self.listwalker.get_focus()[1]
|
|
def _render_listview(self, title, items, listwalker):
|
|
self.items = items
|
|
self.listwalker = listwalker
|
|
self.title = title
|
|
self.listbox = SelectableListBox(listwalker, self)
|
|
header = urwid.Text(title + " (%d items)" % len(items))
|
|
head = urwid.AttrMap(header, "header")
|
|
top = urwid.Frame(self.listbox, head)
|
|
return top
|
|
|
|
class RecordListView(ListView):
|
|
def __init__(self):
|
|
self._render_type = "urwid"
|
|
super().__init__()
|
|
def set_render_type(self, t):
|
|
if t in ("next", "select", "primary", "urwid"):
|
|
self._render_type = "urwid"
|
|
elif t in ("next-secondary", "secondary", "editor-ro"):
|
|
self._render_type = "editor-ro"
|
|
elif t in ("next-tertiary", "tertiary", "editor-rw"):
|
|
self._render_type = "editor-rw"
|
|
else:
|
|
self._render_type = "urwid"
|
|
def _toggle_selected_record(self):
|
|
selected = self._get_selected_item_position()
|
|
if selected is None:
|
|
return
|
|
rec = self.items[selected]
|
|
rec.deleted = not rec.deleted
|
|
self.rerender(self.loop)
|
|
self.listwalker.set_focus(selected)
|
|
def _rerender_listview(self):
|
|
"""This is the re-rendering counterpart to _render_records."""
|
|
selected = self._get_selected_item_position()
|
|
retval = self._render_records(self.loop, self.title, self.items,
|
|
self.fields)
|
|
if selected is not None:
|
|
self.listwalker.set_focus(selected)
|
|
return retval
|
|
def _rerender_standard(self, loop):
|
|
return self._rerender_listview()
|
|
def _render_records(self, loop, title, recs, fields):
|
|
self.loop = loop
|
|
formatter = RecordFormatter(recs, fields)
|
|
def attr_maps(formatter, recs):
|
|
pairs = zip(formatter.format(), recs)
|
|
result = []
|
|
for text, rec in pairs:
|
|
attrs = ["item", "focused-item"]
|
|
if rec.deleted:
|
|
attrs = ["deleted", "focused-deleted"]
|
|
result.append(urwid.AttrMap(urwid.Text(text), *attrs))
|
|
return result
|
|
content = urwid.SimpleListWalker(attr_maps(formatter, recs))
|
|
self.fields = fields
|
|
if self._render_type == "urwid":
|
|
return self._render_listview(title, recs, content)
|
|
elif self._render_type == "editor-ro":
|
|
return self._render_in_editor_ro(title, recs, content, fields)
|
|
else:
|
|
return self._render_in_editor_rw(title, recs, content)
|
|
def _render_in_editor_ro(self, title, recs, content, selected):
|
|
dbd = DatabaseData()
|
|
items = []
|
|
for rec in recs:
|
|
recvals = []
|
|
for field in selected:
|
|
if field == "table":
|
|
recvals.append(rec.table)
|
|
else:
|
|
recvals.append(rec.fields[field])
|
|
items.append(filemanip.Record(recvals))
|
|
if dbd.edoptions["blank-lines"]:
|
|
items.append(filemanip.Record([]))
|
|
self._display_in_editor(items, True)
|
|
def _render_in_editor_rw(self, title, recs, content):
|
|
dbd = DatabaseData()
|
|
items = self._display_in_editor(recs, False)
|
|
if len(items) != len(recs):
|
|
MessageBox(["There was a mismatch in the number of lines."
|
|
"Your edits have been discarded."]).render(self.loop)
|
|
else:
|
|
for i, r in zip(items, recs):
|
|
r.fields[:] = i.fields
|
|
def _display_in_editor(self, recs, ro):
|
|
dbd = DatabaseData()
|
|
tempdir = tempfile.TemporaryDirectory()
|
|
tempname = tempdir.name + "/view"
|
|
vault = filemanip.FileStorage('csv', tempname)
|
|
vault.store(recs)
|
|
if ro:
|
|
os.chmod(tempname, 0o400)
|
|
else:
|
|
os.chmod(tempname, 0o600)
|
|
subprocess.call(["sensible-editor", tempname])
|
|
self.loop.screen.clear()
|
|
return vault.load()
|
|
|
|
class TableContentsListView(RecordListView):
|
|
def __init__(self, table):
|
|
self.table = table
|
|
super().__init__()
|
|
def callback(self, type_, obj):
|
|
if type_ == "remove":
|
|
self._toggle_selected_record()
|
|
self._full_rerender = True
|
|
if type_ != "select" and not type_.startswith("next"):
|
|
return
|
|
recview = DisplayRecordView(self.items[self._get_selected_item_position()])
|
|
recview.render(self.loop)
|
|
def _render_standard(self, loop):
|
|
dbd = DatabaseData()
|
|
recs = [rec for rec in dbd.records if rec.table == self.table]
|
|
title = "List of records by key field for " + self.table
|
|
return self._render_records(loop, title, recs,
|
|
dbd.schema().key_fields())
|
|
|
|
class TableListView(ListView):
|
|
def callback(self, type_, obj):
|
|
if type_ != "select" and not type_.startswith("next"):
|
|
return
|
|
if obj is None:
|
|
text = self.items[self._get_selected_item_position()]
|
|
else:
|
|
text = obj.original_widget.get_text()[0]
|
|
dbcview = TableContentsListView(text)
|
|
dbcview.set_render_type(type_)
|
|
dbcview.render(self.loop)
|
|
def _render_standard(self, loop):
|
|
self.loop = loop
|
|
dbs = []
|
|
for rec in DatabaseData().records:
|
|
table = rec.table
|
|
if table not in dbs:
|
|
dbs.append(table)
|
|
content = urwid.SimpleListWalker([
|
|
urwid.AttrMap(urwid.Text(w), "item", "focused-item") for w in dbs])
|
|
return self._render_listview("List of available sub-databases", dbs,
|
|
content)
|
|
def _rerender_listview(self):
|
|
# Punt on this one. The list walker can only increase in size and
|
|
# therefore we'll mess up which item is focused in the list.
|
|
pass
|
|
|
|
class CompleteContentsListView(RecordListView):
|
|
def callback(self, type_, obj):
|
|
if type_ == "remove":
|
|
self._toggle_selected_record()
|
|
self._full_rerender = True
|
|
if type_ != "select" and not type_.startswith("next"):
|
|
return
|
|
recview = DisplayRecordView(self.items[self._get_selected_item_position()])
|
|
recview.render(self.loop)
|
|
@staticmethod
|
|
def _key(rec):
|
|
return rec.table
|
|
def _render_standard(self, loop):
|
|
self.loop = loop
|
|
sch = DatabaseData().schema()
|
|
items = sorted(DatabaseData().records, key=self._key)
|
|
title = "List of all records by key field"
|
|
return self._render_records(loop, title, items, sch.key_fields())
|
|
|
|
class SearchListView(RecordListView):
|
|
def __init__(self, records, selected):
|
|
self.records = records
|
|
self.selected = selected
|
|
super().__init__()
|
|
def set_render_type(self, t):
|
|
super().set_render_type(t)
|
|
def callback(self, type_, obj):
|
|
if type_ == "remove":
|
|
self._toggle_selected_record()
|
|
self._full_rerender = True
|
|
if type_ != "select" and not type_.startswith("next"):
|
|
return
|
|
pos = self._get_selected_item_position()
|
|
if pos is None:
|
|
return
|
|
recview = DisplayRecordView(self.items[pos])
|
|
recview.render(self.loop)
|
|
def _render_standard(self, loop):
|
|
self.loop = loop
|
|
selected = []
|
|
if self.selected[0]:
|
|
selected.append("table")
|
|
for i in range(1, len(self.selected)):
|
|
if self.selected[i]:
|
|
selected.append(i-1)
|
|
return self._render_records(self.loop, "List of matching records",
|
|
self.records, selected)
|
|
|
|
class MessageBox(View):
|
|
def __init__(self, message):
|
|
if type(message) is str:
|
|
self.message = [message]
|
|
else:
|
|
self.message = message
|
|
def callback(self, type_, obj):
|
|
if type_ == "select":
|
|
render_previous_view(self.loop)
|
|
def _render_buttons(self):
|
|
def callback_glue(button):
|
|
self.callback("select", button)
|
|
button = urwid.Button("OK")
|
|
urwid.connect_signal(button, 'click', callback_glue)
|
|
return button
|
|
def render(self, loop):
|
|
self.loop = loop
|
|
button = self._render_buttons()
|
|
texts = [urwid.AttrMap(urwid.Text(w), "text", "text") for w in
|
|
self.message]
|
|
texts.append(button)
|
|
pile = urwid.Pile(texts)
|
|
linebox = urwid.LineBox(pile)
|
|
underlay = urwid.Pile([])
|
|
self.widget = urwid.Overlay(linebox, underlay, "center",
|
|
("relative", 50), "middle", None)
|
|
loop.widget = self.widget
|
|
DatabaseData().views.append(self)
|
|
def rerender(self, loop):
|
|
loop.widget = self.widget
|
|
|
|
class MenuView(ListView):
|
|
OPTIONS = collections.OrderedDict([
|
|
("Add Record", "add"),
|
|
("Search Records", "search"),
|
|
("Browse All Records", "browse-all"),
|
|
("Browse Tables", "browse"),
|
|
("Write to Disk", "sync-database"),
|
|
("About newfol", "about")
|
|
])
|
|
def callback(self, type_, obj):
|
|
if type_ != "select" and not type_.startswith("next"):
|
|
return
|
|
text = self.listbox.get_focus()[0].original_widget.get_text()[0]
|
|
command = self.OPTIONS[text]
|
|
if command == "browse-all":
|
|
if type_ == "select" or type_.startswith("next"):
|
|
ccview = CompleteContentsListView()
|
|
ccview.set_render_type(type_)
|
|
ccview.render(self.loop)
|
|
return
|
|
self.loop.unhandled_input(command)
|
|
def _rerender_listview(self):
|
|
pass
|
|
def _render_standard(self, loop):
|
|
self.loop = loop
|
|
content = urwid.SimpleListWalker([
|
|
urwid.AttrMap(urwid.Text(w), "item", "focused-item") for w in
|
|
self.OPTIONS.keys()])
|
|
return self._render_listview("Main Menu", self.OPTIONS.keys(), content)
|
|
|
|
def display_cheat_sheet(name, loop):
|
|
if name is None:
|
|
mbox = MessageBox("No cheat sheet was registered in the schema file.")
|
|
mbox.render(loop)
|
|
return
|
|
if not os.access(name, os.F_OK):
|
|
mbox = MessageBox("The cheat sheet file doesn't exist.")
|
|
mbox.render(loop)
|
|
return
|
|
os.chmod(name, 0o400)
|
|
subprocess.call(["sensible-editor", name])
|
|
|
|
def render_previous_view(loop):
|
|
if len(DatabaseData().views) > 1:
|
|
DatabaseData().views.pop()
|
|
DatabaseData().views[-1].rerender(loop)
|
|
|
|
def start_curses():
|
|
palette = [
|
|
('bg', 'black', 'yellow', '', 'black', '#ffa'),
|
|
('text', 'black', 'yellow', '', '#860', '#ffa'),
|
|
('focused-deleted', 'dark green', 'light red', '', '#006', '#f00'),
|
|
('deleted', 'light red', 'dark green', '', '#f00', '#006'),
|
|
('focused-item', 'yellow', 'black', '', '#ffa', '#860'),
|
|
('item', 'text'),
|
|
('description', 'text'),
|
|
('editbox', 'focused-item'),
|
|
('header', 'black', 'yellow', '', 'black', '#fd0'),
|
|
('top', 'black', 'light blue', '', 'black', '#0df')
|
|
]
|
|
DatabaseData().keys = KeyboardShortcuts()
|
|
def exit_on_q(inp):
|
|
dbd = DatabaseData()
|
|
val = inp
|
|
try:
|
|
val = DatabaseData().keys[inp]
|
|
except:
|
|
pass
|
|
if val == "quit":
|
|
raise urwid.ExitMainLoop()
|
|
if val == "previous":
|
|
render_previous_view(loop)
|
|
if val == "menu":
|
|
menu = MenuView()
|
|
menu.render(loop)
|
|
if val == "add":
|
|
recview = DisplayRecordView()
|
|
recview.render(loop)
|
|
if val == "search":
|
|
recview = SearchRecordView()
|
|
recview.render(loop)
|
|
if val in ("next", "next-secondary", "next-tertiary", "invert-selection"):
|
|
dbd.views[-1].callback(val, None)
|
|
if val == "browse-all":
|
|
ccview = CompleteContentsListView()
|
|
ccview.render(loop)
|
|
if val == "browse":
|
|
dbview = TableListView()
|
|
dbview.render(loop)
|
|
if val == "about":
|
|
view = AboutView()
|
|
view.render(loop)
|
|
if val == "cheat-sheet":
|
|
display_cheat_sheet(DatabaseData().cheatsheet, loop)
|
|
if val == "sync-database":
|
|
dbd._database.store()
|
|
|
|
intro = IntroView()
|
|
loop = urwid.MainLoop(None, palette, unhandled_input=exit_on_q)
|
|
loop.screen.set_terminal_properties(colors=256)
|
|
intro.render(loop)
|
|
loop.run()
|
|
|
|
class Schemata:
|
|
def __init__(self, nfields, mapping, keyfields=None, exe_ok=False):
|
|
self.nfields = nfields
|
|
self.mapping = mapping
|
|
self.keyflds = [i for i in keyfields]
|
|
self.exe_ok = exe_ok
|
|
def get_fields(self):
|
|
return self.keyflds
|
|
|
|
def do_import(l):
|
|
for i in l:
|
|
names = i.split('.')
|
|
globals()[names[0]] = importlib.import_module(names[0])
|
|
cur = globals()[names[0]]
|
|
for j in range(1, len(names)):
|
|
if not hasattr(cur, names[j]):
|
|
setattr(cur, names[j],
|
|
importlib.import_module(".".join(names[0:j+1])))
|
|
cur = getattr(cur, names[j])
|
|
|
|
def import_into(db, dtbname, dbtype, minfields=0, strict=False, identity=False):
|
|
vault = filemanip.FileStorage(dbtype, sys.stdin)
|
|
msg = None
|
|
mapobj = None
|
|
if dtbname in db.schema().mapping() and not identity:
|
|
msg = "Using specified mapping for db {0}"
|
|
mapobj = db.schema.mapping()[dtbname]
|
|
else:
|
|
if identity:
|
|
msg = "Using identity mapping for db {0} as requested"
|
|
else:
|
|
msg = "No mapping specified for db {0}; using identity mapping"
|
|
mapobj = newfol.database.Mapping(dtbname,
|
|
DatabaseData().schema().nfields())
|
|
for i in range(DatabaseData().schema().nfields()):
|
|
mapobj.set_field_mapping(i, i, i, "")
|
|
db.records()[:] = list(filter(lambda x: x.table != dtbname,
|
|
db.records()))
|
|
recno = 0
|
|
for rec in vault.load():
|
|
recno += 1
|
|
if len(rec.fields) == 0:
|
|
continue
|
|
if rec.fields[0].startswith("#"):
|
|
continue
|
|
if len(rec.fields) < minfields:
|
|
continue
|
|
if minfields > 0 and strict:
|
|
if len(rec.fields) != minfields:
|
|
raise newfol.exception.NewfolError("Record {0} has {1} fields, not {2}".format(
|
|
recno, len(rec.fields), minfields))
|
|
try:
|
|
newrec = filemanip.Record(mapobj.map_fields_forward(rec.fields))
|
|
newrec.table = dtbname
|
|
db.records().append(newrec)
|
|
except TypeError:
|
|
raise newfol.exception.NewfolError("Data has too many fields or corrupt mapping")
|
|
|
|
def export_from(db, dtbname, dbtype):
|
|
vault = filemanip.FileStorage(dbtype, sys.stdout)
|
|
recs = filter(lambda x: x.table == dtbname, db.records())
|
|
recs = map(lambda x: filemanip.Record(x.fields[:]), recs)
|
|
vault.store(recs)
|
|
|
|
def parse_args(args):
|
|
parser = argparse.ArgumentParser(description="store and manipulate fol")
|
|
parser.add_argument("--import", dest="cmd", action="store_const",
|
|
const="import", help="import a database")
|
|
parser.add_argument("--test", dest="cmd", action="store_const",
|
|
const="test", help="test this program")
|
|
parser.add_argument("--version", dest="cmd", action="store_const",
|
|
const="version", help="print newfol and database versions")
|
|
parser.add_argument("--validate", dest="cmd", action="store_const",
|
|
const="validate", help="validate the database")
|
|
parser.add_argument("--repair", dest="cmd", action="store_const",
|
|
const="repair", help="repair the database automatically")
|
|
parser.add_argument("--export", "--extract", dest="cmd",
|
|
action="store_const", const="export", help="export a database")
|
|
parser.add_argument("--dbname", dest="dbname", action="store",
|
|
help="database name (obsolescent)")
|
|
parser.add_argument("--table", dest="table", action="store",
|
|
help="database name")
|
|
parser.add_argument("--upgrade", dest="cmd", action="store_const",
|
|
const="upgrade", help="upgrade data files")
|
|
parser.add_argument("--strict", dest="strict", action="store_true",
|
|
help="enable strict checking")
|
|
parser.add_argument("--identity", dest="identity", action="store_true",
|
|
help="use identity mapping when importing")
|
|
parser.add_argument("--curses", dest="cmd", action="store_const",
|
|
const="curses", help="start the curses interface")
|
|
parser.add_argument("--homedir", dest="homedir", action="store",
|
|
default=os.path.expanduser("~/.newfol"),
|
|
help="specify the location of database files")
|
|
parser.add_argument("--minfields", dest="minfields", action="store",
|
|
type=int, default=0, help="minimum number of fields per record")
|
|
parser.add_argument("--txntype", dest="txntype", action="store",
|
|
default=None, help="type of transactional helper")
|
|
parser.add_argument("--dbtype", dest="dbtype", action="store",
|
|
default="csv", help="type of database for import/export")
|
|
return parser.parse_args(args)
|
|
|
|
def ensure_db_dir(path):
|
|
if not os.path.exists(path):
|
|
os.mkdir(path, 0o700)
|
|
with open(path + "/version", "w") as fp:
|
|
print(str(int(DatabaseVersion.preferred())), file=fp)
|
|
|
|
def print_version(db):
|
|
print("newfol " + __version__)
|
|
if db is None:
|
|
print("No database loaded.")
|
|
return
|
|
|
|
txntypes = db.schema().transaction_types()
|
|
txntypes.insert(0, "sha256")
|
|
print("Location: " + db.location())
|
|
print("Database Version: " + str(db.version()))
|
|
print("Record Version: " + str(db.version().record_version()))
|
|
print("Serialization Version: " + str(db.version().serialization_version()))
|
|
print("Serialization Format: " + str(db.version().serialization()))
|
|
print("Transaction Types: " + ", ".join(txntypes))
|
|
|
|
def upgrade_database(db, txntype):
|
|
def notify(text):
|
|
print(text, file=sys.stderr)
|
|
db.upgrade(txntype, notify)
|
|
|
|
def main(args):
|
|
use_curses = False
|
|
argobj = parse_args(args)
|
|
if argobj.dbname is not None:
|
|
print("--dbname argument is obsolete; use --table instead",
|
|
file=sys.stderr)
|
|
if argobj.table is None and argobj.dbname is not None:
|
|
argobj.table = argobj.dbname
|
|
if len(args) == 0:
|
|
use_curses = True
|
|
elif argobj.cmd == "curses":
|
|
use_curses = True
|
|
ensure_db_dir(argobj.homedir)
|
|
db = None
|
|
try:
|
|
db = newfol.database.Database.load(argobj.homedir, argobj.txntype)
|
|
db.lock()
|
|
dbd = DatabaseData()
|
|
dbd._database = db
|
|
do_import(db.schema().imports())
|
|
# Check to see if the database needs upgrading.
|
|
if argobj.cmd != "upgrade":
|
|
db.records()
|
|
except Exception as e:
|
|
if argobj.cmd != "version":
|
|
raise e
|
|
db = None
|
|
if use_curses:
|
|
start_curses()
|
|
elif argobj.cmd == "import":
|
|
import_into(db, argobj.table, argobj.dbtype, argobj.minfields,
|
|
argobj.strict, argobj.identity)
|
|
elif argobj.cmd == "validate":
|
|
db.validate(True)
|
|
elif argobj.cmd == "repair":
|
|
db.repair()
|
|
db.validate(True)
|
|
elif argobj.cmd == "test":
|
|
pass
|
|
elif argobj.cmd == "upgrade":
|
|
upgrade_database(db, argobj.txntype)
|
|
return
|
|
elif argobj.cmd == "export":
|
|
export_from(db, argobj.table, argobj.dbtype)
|
|
elif argobj.cmd == "version":
|
|
print_version(db)
|
|
return
|
|
else:
|
|
start_curses()
|
|
db.store()
|
|
db.unlock()
|
|
|
|
try:
|
|
locale.setlocale(locale.LC_ALL, '')
|
|
main(sys.argv[1:])
|
|
except newfol.exception.NewfolError as e:
|
|
print("E: {0}".format(str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
except newfol.exception.UpgradeNeededError as e:
|
|
print("E: {0}; try upgrading".format(str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
except newfol.exception.FilemanipError as e:
|
|
print("E: {0}".format(str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# vim: set fdm=indent:
|