newfol/test/testfilemanip.py
brian m. carlson ce8bed3006
Add tests for pluggable hooks.
Signed-off-by: brian m. carlson <sandals@crustytoothpaste.net>
2015-01-22 15:15:32 +00:00

290 lines
9.6 KiB
Python
Executable file

#!/usr/bin/python3
from newfol.filemanip import Record, FileStorage, HashHook
import hashlib
import newfol.exception
import tempfile
import unittest
class TestRoundTripping(unittest.TestCase):
def setUp(self):
self.data = [
Record(["0", "1", "2", "3"]),
Record(["4", "5", "6", "7"]),
Record(["a", "b", "c", "d"], "foo"),
Record(["e", "f", "c", "d"], "bar"),
]
self.tempdir = tempfile.TemporaryDirectory()
def tearDown(self):
self.tempdir.cleanup()
def compare(self, l, m, lossy):
for a, b in zip(l, m):
for c, d in zip(a.fields, b.fields):
self.assertEqual(c, d)
if lossy:
self.assertEqual(a.table or "", b.table or "")
else:
self.assertEqual(a.uuid, b.uuid)
self.assertEqual(a.table, b.table)
def check_serialized_data(self, fs, filename, recs):
with open(filename, "rb") as fp:
data = fp.read()
fs.store(recs)
with open(filename, "rb") as fp:
data2 = fp.read()
self.assertEqual(data, data2)
def run_test(self, fmt, lossy=False):
options = {"table-is-field-0": True}
filename = self.tempdir.name + "/" + fmt
fs = FileStorage(fmt, filename, options=options)
fs.store(self.data)
m = fs.load()
self.compare(self.data, m, lossy)
self.check_serialized_data(fs, filename, m)
def test_csv(self):
self.run_test("csv", True)
def test_json(self):
self.run_test("json")
def test_yaml(self):
options = {"table-is-field-0": True}
fmt = "yaml"
filename = self.tempdir.name + "/" + fmt
fs = FileStorage(fmt, filename, options=options)
fs.store(self.data)
self.check_serialized_data(fs, filename, self.data)
def test_pickle(self):
self.run_test("pickle")
class TestCompression(unittest.TestCase):
def setUp(self):
self.data = [
Record(["0", "1", "2", "3"]),
Record(["4", "5", "6", "7"]),
Record(["a", "b", "c", "d"], "foo"),
Record(["e", "f", "c", "d"], "bar"),
]
self.magic = {
"xz": b"\xfd7zXZ\x00",
"lzma": b"\x5d\x00\x00",
}
def compare(self, l, m):
for a, b in zip(l, m):
for c, d in zip(a.fields, b.fields):
self.assertEqual(c, d)
self.assertEqual(a.uuid, b.uuid)
self.assertEqual(a.table, b.table)
def compression_test(self, format):
tempdir = tempfile.TemporaryDirectory()
options = {"compression": format}
magic = self.magic[format]
filename = tempdir.name + "/dtb"
fs = FileStorage("json", filename, options=options)
fs.store(self.data)
m = fs.load()
self.compare(self.data, m)
with open(filename, "rb") as fp:
header = fp.read(len(magic))
self.assertEqual(header, magic)
tempdir.cleanup()
def test_enabled_compression(self):
self.compression_test("xz")
def test_enabled_compression_lzma(self):
self.compression_test("lzma")
def test_default(self):
tempdir = tempfile.TemporaryDirectory()
filename = tempdir.name + "/dtb"
fs = FileStorage("json", filename)
fs.store(self.data)
m = fs.load()
self.compare(self.data, m)
with open(filename, "rb") as fp:
header = fp.read(20)
for k, v in self.magic.items():
self.assertNotEqual(header[0:len(v)], v)
self.assertEqual(header[0:1], b"[")
tempdir.cleanup()
class SHA256TransactionTest(unittest.TestCase):
# Not only not the correct answer, but also the SHA-256 value of "WRONG".
WRONG = "02b60b3b9ed309b730abc7006e62bf07ad5b911bc8ae57c4cf001bad35819e7a"
def setUp(self):
self.testcases = ["", "abc", "©", "\ufeff"]
def generate_len_and_hash(self, func, data):
s = data.encode('utf-8')
h = func()
h.update(s)
return "%s:%s" % (len(s), h.hexdigest())
def test_hashing(self):
tempdir = tempfile.TemporaryDirectory()
temp = tempdir.name + "/test"
for k in self.testcases:
v = self.generate_len_and_hash(hashlib.sha256, k)
with open(temp, "w") as wfp:
wfp.write(k)
self.assertEqual(HashHook._hash_file("sha256", temp),
"sha256:" + v)
tempdir.cleanup()
def create_small_database(self, tempdir):
temp = tempdir.name + "/test"
with open(temp, "w") as fp:
fp.write(":\n")
return temp
def create_filestorage(self, *args):
return self.create_filestorage_hash("sha256", *args)
def create_filestorage_hash(self, hash, *args):
class Simple:
pass
x = Simple()
x.tempdir = tempfile.TemporaryDirectory()
x.tempfile = self.create_small_database(x.tempdir)
x.fs = FileStorage("csv", x.tempfile, hash, *args)
return x
def test_missing_not_forgiving(self):
x = self.create_filestorage({"forgiving": False})
with self.assertRaisesRegex(newfol.exception.UpgradeNeededError,
'missing checksum'):
x.fs.load()
x.tempdir.cleanup()
def test_missing_forgiving(self):
x = self.create_filestorage({"forgiving": True})
x.fs.load()
x.tempdir.cleanup()
def test_missing(self):
x = self.create_filestorage()
x.fs.load()
x.tempdir.cleanup()
def test_incorrect(self):
x = self.create_filestorage()
x.fs.store([Record(["", ""])])
with open(x.tempfile + ".checksum", "w") as fp:
# Something other than the correct value.
fp.write("sha256:2:%s\n" % self.WRONG)
with self.assertRaisesRegex(newfol.exception.FilemanipError,
'corrupt'):
x.fs.load()
x.tempdir.cleanup()
def test_correct(self):
x = self.create_filestorage()
for k in self.testcases:
v = self.generate_len_and_hash(hashlib.sha256, k)
with open(x.tempfile, "w") as f:
# These sequences of bytes are valid CSV files. They consist
# of a single record with a single field containing the
# characters in question.
f.write(k)
with open(x.tempfile + ".checksum", "w") as fp:
fp.write("sha256:%s\n" % v)
x.fs.load()
x.tempdir.cleanup()
def test_autoselection(self):
x = self.create_filestorage_hash("hash")
x.fs.store([Record(["", ""])])
with open(x.tempfile + ".checksum", "r") as fp:
data = fp.read()
bitness = HashHook._bitness()
self.assertIn(bitness, [32, 64])
self.assertTrue(data.startswith("sha512" if bitness == 64 else
"sha256"))
x.tempdir.cleanup()
def test_autodetection(self):
for i in ["sha256", "sha384", "sha512"]:
x = self.create_filestorage_hash(i)
x.fs.store([Record(["", ""])])
fs = FileStorage("csv", x.tempfile, "hash")
recs = fs.load()
self.assertEqual(len(recs), 1)
self.assertIsInstance(recs[0], Record)
self.assertEqual(recs[0].fields[:], ["", ""])
with open(x.tempfile + ".checksum", "w") as fp:
# Something other than the correct value.
fp.write("%s:2:%s\n" % (i, self.WRONG))
with self.assertRaisesRegex(newfol.exception.FilemanipError,
'corrupt'):
x.fs.load()
x.tempdir.cleanup()
def test_canonicalization(self):
for i in ("sha256", "sha384", "sha512"):
result = FileStorage._canonicalize_transaction_types(["hash", i])
self.assertEqual(result, [i])
self.assertEqual(FileStorage._canonicalize_transaction_types(None),
None)
class ExampleFile(newfol.filemanip.FileFormat):
def __init__(self, filename, txn, options):
self.inited = True
def store(self, records):
self.stored = True
pass
def load(self):
return [Record(['a', 'b'])]
class ExampleHook(newfol.filemanip.Hook):
pass
class PluggableBackendsTest(unittest.TestCase):
def test_adding_backend(self):
FileStorage.register_backend('example', ExampleFile)
self.assertEqual(FileStorage.BACKENDS['example'], ExampleFile)
fs = FileStorage('example', 'filename')
recs = fs.load()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0].fields, ['a', 'b'])
def test_removing_backend(self):
FileStorage.register_backend('example', ExampleFile)
self.assertEqual(FileStorage.BACKENDS['example'], ExampleFile)
FileStorage.unregister_backend('example')
with self.assertRaises(KeyError):
FileStorage.BACKENDS['example']
class PluggableHooksTest(unittest.TestCase):
def test_adding_hook(self):
FileStorage.register_hook('example', ExampleHook)
self.assertEqual(FileStorage.HOOKS['example'], ExampleHook)
def test_removing_hook(self):
FileStorage.register_hook('example', ExampleFile)
self.assertEqual(FileStorage.HOOKS['example'], ExampleFile)
FileStorage.unregister_hook('example')
with self.assertRaises(KeyError):
FileStorage.HOOKS['example']
if __name__ == '__main__':
unittest.main()