Initial commit.

Signed-off-by: brian m. carlson <sandals@crustytoothpaste.net>
This commit is contained in:
brian m. carlson 2011-09-02 17:38:53 +00:00
commit efaefd32a5
5 changed files with 309 additions and 0 deletions

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
q201*.py
__pycache__
*.pyc
*.pyo

53
filemanip.py Normal file
View file

@ -0,0 +1,53 @@
#!/usr/bin/python3
import csv
import sys
class Record:
"""A record containing one or more fields."""
def __init__(self, fields):
self._fields = fields
@property
def fields(self):
return self._fields
class CSVFile:
"""A comma-separated values file."""
def __init__(self, filename):
self._filename = filename
def store(self, records):
"""Store the records to this file."""
writer = csv.writer(open(self._filename, "w"), delimiter=':')
for rec in records:
writer.writerow(rec.fields)
def load(self):
"""Read this file as a list of records."""
reader = csv.reader(open(self._filename, "r"), delimiter=':')
return [Record(row) for row in reader]
class FileStorage:
"""A file (or file-like object) in a certain format."""
def __init__(self, fmt, filename):
if fmt == "csv":
self._backend = CSVFile(filename)
else:
raise ValueError("{0}: not a supported backend".format(fmt))
def store(self, records):
"""Store the records."""
self._backend.store(records)
def load(self):
"""Load the records."""
return self._backend.load()
def main(filename):
fs = FileStorage("csv", filename)
l = [Record([0, 1, 2, 3]), Record([4, 5, 6, 7])]
fs.store(l)
m = fs.load()
for a, b in zip(l, m):
for c, d in zip(a.fields, b.fields):
if str(c) != str(d):
raise ValueError("{0} and {1} differ!".format(c, d))
if __name__ == '__main__':
main(sys.argv[1])

188
newfol Executable file
View file

@ -0,0 +1,188 @@
#! /usr/bin/python3
import argparse
import curses
import curses.textpad
import curses.panel
import filemanip
import locale
import os.path
import sys
import urwid
class NewfolError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
# The external field number is the field number of the legacy file and the
# internal field number is the one for newfol. In this class, internal field
# number 0 is the hidden database name field and in the schema file number 0 is
# the first actual field.
#
# The forward transformation is used for importing and the inverse
# transformation for extraction or exporting.
class Mapping:
NFIELDS = 18
def __init__(self, name):
self.name = name
# Maps the external field number to the internal field number.
self.fwd = [None] * self.NFIELDS
# Maps the internal field number to the external field number.
self.inv = [None] * self.NFIELDS
# Maps the internal field number to a description.
self.desc = [None] * self.NFIELDS
self.extfields = 0
def set_field_mapping(self, external, internal, description):
if external is not None:
self.fwd[external] = internal
self.extfields += 1
self.inv[internal] = external
self.desc[internal] = description
def map_fields_forward(self, fields):
result = [None] * self.NFIELDS
result[0] = self.name
for fieldno in range(len(fields)):
#print(fieldno, self.fwd[fieldno])
result[self.fwd[fieldno]] = fields[fieldno]
return result
def convert_color(x):
return int(x * 1000 / 255)
locale.setlocale(locale.LC_ALL, '')
def start_curses():
try:
mainwin = curses.initscr()
curses.start_color()
attr = 0
if curses.has_colors() and curses.can_change_color():
curses.start_color()
curses.use_default_colors()
curses.init_color(22, convert_color(0xee), convert_color(0xd6),
convert_color(0x80))
curses.init_color(23, convert_color(0x66), convert_color(0x38),
convert_color(0x22))
curses.init_pair(1, 23, 22)
attr = curses.color_pair(1)
mainpanel = curses.panel.new_panel(mainwin)
mainpanel.show()
curses.panel.update_panels()
mainwin.addstr("Hello, world!", attr)
curses.doupdate()
mainwin.getch()
curses.endwin()
except Exception as e:
curses.endwin()
raise e
def load_schemata():
vault = filemanip.FileStorage('csv', os.path.expanduser("~/.newfol/schema"))
recs = vault.load()
if recs[0].fields[0] != "fmt":
raise NewfolError("Schema file does not have format declaration")
if recs[0].fields[1] != "0":
raise NewfolError("Schema file has format other than 0")
mapping = {}
for i in recs:
rectype = i.fields[0]
dbname = i.fields[1]
if rectype == "fmt":
continue
if rectype == "def":
mapping[dbname] = Mapping(dbname)
elif rectype == "fld":
external = int(i.fields[2])
internal = int(i.fields[3]) + 1
mapping[dbname].set_field_mapping(external, internal, i.fields[4])
elif rectype == "dsc":
internal = int(i.fields[3])
mapping[dbname].set_field_mapping(None, internal, i.fields[4])
return mapping
# TODO: create a dictionary of Mappings indexed by dtbname and return it.
# Also, print whether we're using a mapping to stderr when importing so that
# the user can abort if no schema exists.
def import_into(recs, dtbname, minfields=0, strict=False):
vault = filemanip.FileStorage('csv', "/dev/stdin")
mapping = load_schemata()
msg = None
mapobj = None
if dtbname in mapping:
msg = "Using specified mapping for db {0}"
mapobj = mapping[dtbname]
else:
msg = "No mapping specified for db {0}; using identity mapping"
mapobj = Mapping(dtbname)
for i in range(1, Mapping.NFIELDS):
mapobj.set_field_mapping(i-1, i, "")
print(msg.format(dtbname), file=sys.stderr)
recs = list(filter(lambda x: x.fields[0] != dtbname, recs))
recno = 0
for rec in vault.load():
recno += 1
if len(rec.fields) == 0:
continue
if rec.fields[0].startswith("#"):
continue
if len(rec.fields) < minfields:
continue
if minfields > 0 and strict:
if len(rec.fields) != minfields:
raise NewfolError("Record {0} has {1} fields, not {2}".format(
recno, len(rec.fields), minfields))
try:
recs.append(filemanip.Record(mapobj.map_fields_forward(rec.fields)))
except TypeError:
raise NewfolError("Data has too many fields or corrupt mapping")
return recs
def export(recs, dtbname):
vault = filemanip.FileStorage('csv', "/dev/stdout")
recs = filter(lambda x: x.fields[0] == dtbname, recs)
recs = map(lambda x: filemanip.Record(x.fields[1:]), recs)
vault.store(recs)
def parse_args(args):
parser = argparse.ArgumentParser(description="store and manipulate fol")
parser.add_argument("--import", dest="cmd", action="store_const",
const="import", help="import a database")
parser.add_argument("--export", "--extract", dest="cmd",
action="store_const", const="export", help="export a database")
parser.add_argument("--dbname", dest="dbname", action="store",
help="database name")
parser.add_argument("--strict", dest="strict", action="store_true",
help="enable strict checking")
parser.add_argument("--minfields", dest="minfields", action="store",
type=int, default=0, help="minimum number of fields per record")
return parser.parse_args(args)
def main(args):
vault = filemanip.FileStorage('csv', os.path.expanduser("~/.newfol/dtb"))
recs = []
try:
recs = vault.load()
except:
pass
if len(args) == 1:
start_curses()
else:
argobj = parse_args(args)
if argobj.cmd == "import":
recs = import_into(recs, argobj.dbname, argobj.minfields,
argobj.strict)
elif argobj.cmd == "validate":
pass
elif argobj.cmd == "export":
export(recs, argobj.dbname)
else:
raise NewfolError("Je ne comprends pas!")
vault.store(recs)
try:
main(sys.argv[1:])
except NewfolError as e:
print("E: {0}".format(str(e)), file=sys.stderr)
sys.exit(2)

22
newfol-check-fields Executable file
View file

@ -0,0 +1,22 @@
#! /usr/bin/python3
# Run as ./util filename expected-number-of-fields
import filemanip
import sys
if len(sys.argv) < 3:
raise ValueError("need a filename and expected number of fields")
# Ternary operator
# C: condition ? expr-if-true : expr-if-false
# Python: expr-if-true if condition else expr-if-false
vault = filemanip.FileStorage("csv", sys.argv[1])
expected = int(sys.argv[2])
recs = vault.load()
lineno = 1
for rec in recs:
print("line {0}: {1} fields; line is {2}".format(lineno, len(rec.fields),
"faulty" if len(rec.fields) != expected else "fine"))
lineno += 1

42
newfol-cvt-dbmem Executable file
View file

@ -0,0 +1,42 @@
#! /usr/bin/python3
import filemanip
import re
import sys
def filter_dbmem(rec):
if len(rec.fields) < 3:
return None
if ";" not in rec.fields[1]:
return None
(date, purview) = re.split(";\s*", rec.fields[1], 1)
fields = [""] * 16
fields[6] = rec.fields[0]
fields[5] = rec.fields[2]
fields[11] = purview
fields[0] = date
return Record(fields)
def call_filter(dtbname, recs):
final = []
for rec in recs:
if rec.fields[0].startswith("#"):
continue
if dtbname == "dbmem":
return filter_dbmem(rec)
else:
raise ValueError("no such filter for database")
result = filter_dbmem(rec)
if result is not None:
final.append(result)
return final
def main(args):
recs = filemanip.FileStorage('csv', "/dev/stdin").load()
if len(args) == 0:
raise ValueError("need a type of database")
dtbname = args[0]
call_filter(dtbname, recs)
filemanip.FileStorage('csv', "/dev/stdout").store(recs)
main(sys.argv[1:])