newfol/test/testfilemanip.py
brian m. carlson ee2987f3fc
Implement lzma compression.
Signed-off-by: brian m. carlson <sandals@crustytoothpaste.net>
2014-01-28 00:31:57 +00:00

214 lines
7.7 KiB
Python
Executable file

#!/usr/bin/python3
from newfol.filemanip import Record, FileStorage, HashTransactionStore
import hashlib
import newfol.exception
import tempfile
import unittest
class TestRoundTripping(unittest.TestCase):
def setUp(self):
self.data = [
Record(["0", "1", "2", "3"]),
Record(["4", "5", "6", "7"]),
Record(["a", "b", "c", "d"], "foo"),
Record(["e", "f", "c", "d"], "bar"),
]
self.tempdir = tempfile.TemporaryDirectory()
def compare(self, l, m, lossy):
for a, b in zip(l, m):
for c, d in zip(a.fields, b.fields):
self.assertEqual(c, d)
if lossy:
self.assertEqual(a.table or "", b.table or "")
else:
self.assertEqual(a.uuid, b.uuid)
self.assertEqual(a.table, b.table)
def check_serialized_data(self, fs, filename, recs):
fp = open(filename, "rb")
data = fp.read()
fp.close()
fs.store(recs)
fp = open(filename, "rb")
data2 = fp.read()
fp.close()
self.assertEqual(data, data2)
def run_test(self, fmt, lossy=False):
options = {"table-is-field-0": True}
filename = self.tempdir.name + "/" + fmt
fs = FileStorage(fmt, filename, options=options)
fs.store(self.data)
m = fs.load()
self.compare(self.data, m, lossy)
self.check_serialized_data(fs, filename, m)
def test_csv(self):
self.run_test("csv", True)
def test_json(self):
self.run_test("json")
def test_yaml(self):
options = {"table-is-field-0": True}
fmt = "yaml"
filename = self.tempdir.name + "/" + fmt
fs = FileStorage(fmt, filename, options=options)
fs.store(self.data)
self.check_serialized_data(fs, filename, self.data)
def test_pickle(self):
self.run_test("pickle")
class TestCompression(unittest.TestCase):
def setUp(self):
self.data = [
Record(["0", "1", "2", "3"]),
Record(["4", "5", "6", "7"]),
Record(["a", "b", "c", "d"], "foo"),
Record(["e", "f", "c", "d"], "bar"),
]
self.magic = {
"xz": b"\xfd7zXZ\x00",
"lzma": b"\x5d\x00\x00",
}
def compare(self, l, m):
for a, b in zip(l, m):
for c, d in zip(a.fields, b.fields):
self.assertEqual(c, d)
self.assertEqual(a.uuid, b.uuid)
self.assertEqual(a.table, b.table)
def compression_test(self, format):
tempdir = tempfile.TemporaryDirectory()
options = {"compression": format}
magic = self.magic[format]
filename = tempdir.name + "/dtb"
fs = FileStorage("json", filename, options=options)
fs.store(self.data)
m = fs.load()
self.compare(self.data, m)
fp = open(filename, "rb")
header = fp.read(len(magic))
fp.close()
self.assertEqual(header, magic)
tempdir.cleanup()
def test_enabled_compression(self):
self.compression_test("xz")
def test_enabled_compression_lzma(self):
self.compression_test("lzma")
def test_default(self):
tempdir = tempfile.TemporaryDirectory()
filename = tempdir.name + "/dtb"
fs = FileStorage("json", filename)
fs.store(self.data)
m = fs.load()
self.compare(self.data, m)
fp = open(filename, "rb")
header = fp.read(20)
fp.close()
for k, v in self.magic.items():
self.assertNotEqual(header[0:len(v)], v)
self.assertEqual(header[0:1], b"[")
tempdir.cleanup()
class SHA256TransactionTest(unittest.TestCase):
def setUp(self):
self.testcases = ["", "abc", "©", "\ufeff"]
def generate_len_and_hash(self, func, data):
s = data.encode()
h = func()
h.update(s)
return "%s:%s" % (len(s), h.hexdigest())
def test_hashing(self):
tempdir = tempfile.TemporaryDirectory()
temp = tempdir.name + "/test"
for k in self.testcases:
v = self.generate_len_and_hash(hashlib.sha256, k)
wfp = open(temp, "w")
wfp.write(k)
wfp.close()
self.assertEqual(HashTransactionStore._hash_file("sha256", temp),
"sha256:" + v)
tempdir.cleanup()
def create_small_database(self, tempdir):
temp = tempdir.name + "/test"
fp = open(temp, "w")
fp.write(":\n")
fp.close()
return temp
def create_filestorage(self, *args):
return self.create_filestorage_hash("sha256", *args)
def create_filestorage_hash(self, hash, *args):
class Simple:
pass
x = Simple()
x.tempdir = tempfile.TemporaryDirectory()
x.tempfile = self.create_small_database(x.tempdir)
x.fs = FileStorage("csv", x.tempfile, hash, *args)
return x
def test_missing_not_forgiving(self):
x = self.create_filestorage({"forgiving": False})
with self.assertRaisesRegex(newfol.exception.UpgradeNeededError,
'missing checksum'):
x.fs.load()
x.tempdir.cleanup()
def test_missing_forgiving(self):
x = self.create_filestorage({"forgiving": True})
x.fs.load()
x.tempdir.cleanup()
def test_missing(self):
x = self.create_filestorage()
x.fs.load()
x.tempdir.cleanup()
def test_incorrect(self):
x = self.create_filestorage()
x.fs.store([Record(["", ""])])
fp = open(x.tempfile + ".checksum", "w")
# Something other than the correct value.
fp.write("sha256:2:0a79b0ca699d37228d3affd3b6d38b7f2731702f12fd1edd6cb9ac42a686e3a3\n")
fp.close()
with self.assertRaisesRegex(newfol.exception.FilemanipError, 'corrupt'):
x.fs.load()
x.tempdir.cleanup()
def test_correct(self):
x = self.create_filestorage()
for k in self.testcases:
v = self.generate_len_and_hash(hashlib.sha256, k)
f = open(x.tempfile, "w")
# These sequences of bytes are valid CSV files. They consist of a
# single record with a single field containing the characters in
# question.
f.write(k)
f.close()
fp = open(x.tempfile + ".checksum", "w")
fp.write("sha256:%s\n" % v)
fp.close()
x.fs.load()
x.tempdir.cleanup()
def test_autoselection(self):
x = self.create_filestorage_hash("hash")
x.fs.store([Record(["", ""])])
fp = open(x.tempfile + ".checksum", "r")
data = fp.read()
fp.close()
bitness = HashTransactionStore._bitness()
self.assertIn(bitness, [32, 64])
self.assertTrue(data.startswith("sha512" if bitness == 64 else
"sha256"))
x.tempdir.cleanup()
def test_autodetection(self):
for i in ["sha256", "sha384", "sha512"]:
x = self.create_filestorage_hash(i)
x.fs.store([Record(["", ""])])
fs = FileStorage("csv", x.tempfile, "hash")
recs = fs.load()
self.assertEqual(len(recs), 1)
self.assertIsInstance(recs[0], Record)
self.assertEqual(recs[0].fields[:], ["", ""])
fp = open(x.tempfile + ".checksum", "w")
# Something other than the correct value.
fp.write(i + ":2:0a79b0ca699d37228d3affd3b6d38b7f2731702f12fd1edd6cb9ac42a686e3a3\n")
fp.close()
with self.assertRaisesRegex(newfol.exception.FilemanipError,
'corrupt'):
x.fs.load()
x.tempdir.cleanup()
if __name__ == '__main__':
unittest.main()