245 lines
8.1 KiB
Python
Executable file
245 lines
8.1 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
from newfol.filemanip import Record, FileStorage, HashTransactionStore
|
|
import hashlib
|
|
import newfol.exception
|
|
import tempfile
|
|
import unittest
|
|
|
|
|
|
class TestRoundTripping(unittest.TestCase):
|
|
def setUp(self):
|
|
self.data = [
|
|
Record(["0", "1", "2", "3"]),
|
|
Record(["4", "5", "6", "7"]),
|
|
Record(["a", "b", "c", "d"], "foo"),
|
|
Record(["e", "f", "c", "d"], "bar"),
|
|
]
|
|
self.tempdir = tempfile.TemporaryDirectory()
|
|
|
|
def tearDown(self):
|
|
self.tempdir.cleanup()
|
|
|
|
def compare(self, l, m, lossy):
|
|
for a, b in zip(l, m):
|
|
for c, d in zip(a.fields, b.fields):
|
|
self.assertEqual(c, d)
|
|
if lossy:
|
|
self.assertEqual(a.table or "", b.table or "")
|
|
else:
|
|
self.assertEqual(a.uuid, b.uuid)
|
|
self.assertEqual(a.table, b.table)
|
|
|
|
def check_serialized_data(self, fs, filename, recs):
|
|
with open(filename, "rb") as fp:
|
|
data = fp.read()
|
|
fs.store(recs)
|
|
with open(filename, "rb") as fp:
|
|
data2 = fp.read()
|
|
self.assertEqual(data, data2)
|
|
|
|
def run_test(self, fmt, lossy=False):
|
|
options = {"table-is-field-0": True}
|
|
filename = self.tempdir.name + "/" + fmt
|
|
fs = FileStorage(fmt, filename, options=options)
|
|
fs.store(self.data)
|
|
m = fs.load()
|
|
self.compare(self.data, m, lossy)
|
|
self.check_serialized_data(fs, filename, m)
|
|
|
|
def test_csv(self):
|
|
self.run_test("csv", True)
|
|
|
|
def test_json(self):
|
|
self.run_test("json")
|
|
|
|
def test_yaml(self):
|
|
options = {"table-is-field-0": True}
|
|
fmt = "yaml"
|
|
filename = self.tempdir.name + "/" + fmt
|
|
fs = FileStorage(fmt, filename, options=options)
|
|
fs.store(self.data)
|
|
self.check_serialized_data(fs, filename, self.data)
|
|
|
|
def test_pickle(self):
|
|
self.run_test("pickle")
|
|
|
|
|
|
class TestCompression(unittest.TestCase):
|
|
def setUp(self):
|
|
self.data = [
|
|
Record(["0", "1", "2", "3"]),
|
|
Record(["4", "5", "6", "7"]),
|
|
Record(["a", "b", "c", "d"], "foo"),
|
|
Record(["e", "f", "c", "d"], "bar"),
|
|
]
|
|
self.magic = {
|
|
"xz": b"\xfd7zXZ\x00",
|
|
"lzma": b"\x5d\x00\x00",
|
|
}
|
|
|
|
def compare(self, l, m):
|
|
for a, b in zip(l, m):
|
|
for c, d in zip(a.fields, b.fields):
|
|
self.assertEqual(c, d)
|
|
self.assertEqual(a.uuid, b.uuid)
|
|
self.assertEqual(a.table, b.table)
|
|
|
|
def compression_test(self, format):
|
|
tempdir = tempfile.TemporaryDirectory()
|
|
options = {"compression": format}
|
|
magic = self.magic[format]
|
|
filename = tempdir.name + "/dtb"
|
|
fs = FileStorage("json", filename, options=options)
|
|
fs.store(self.data)
|
|
m = fs.load()
|
|
self.compare(self.data, m)
|
|
with open(filename, "rb") as fp:
|
|
header = fp.read(len(magic))
|
|
self.assertEqual(header, magic)
|
|
tempdir.cleanup()
|
|
|
|
def test_enabled_compression(self):
|
|
self.compression_test("xz")
|
|
|
|
def test_enabled_compression_lzma(self):
|
|
self.compression_test("lzma")
|
|
|
|
def test_default(self):
|
|
tempdir = tempfile.TemporaryDirectory()
|
|
filename = tempdir.name + "/dtb"
|
|
fs = FileStorage("json", filename)
|
|
fs.store(self.data)
|
|
m = fs.load()
|
|
self.compare(self.data, m)
|
|
with open(filename, "rb") as fp:
|
|
header = fp.read(20)
|
|
for k, v in self.magic.items():
|
|
self.assertNotEqual(header[0:len(v)], v)
|
|
self.assertEqual(header[0:1], b"[")
|
|
tempdir.cleanup()
|
|
|
|
|
|
class SHA256TransactionTest(unittest.TestCase):
|
|
# Not only not the correct answer, but also the SHA-256 value of "WRONG".
|
|
WRONG = "02b60b3b9ed309b730abc7006e62bf07ad5b911bc8ae57c4cf001bad35819e7a"
|
|
|
|
def setUp(self):
|
|
self.testcases = ["", "abc", "©", "\ufeff"]
|
|
|
|
def generate_len_and_hash(self, func, data):
|
|
s = data.encode('utf-8')
|
|
h = func()
|
|
h.update(s)
|
|
return "%s:%s" % (len(s), h.hexdigest())
|
|
|
|
def test_hashing(self):
|
|
tempdir = tempfile.TemporaryDirectory()
|
|
temp = tempdir.name + "/test"
|
|
for k in self.testcases:
|
|
v = self.generate_len_and_hash(hashlib.sha256, k)
|
|
with open(temp, "w") as wfp:
|
|
wfp.write(k)
|
|
self.assertEqual(HashTransactionStore._hash_file("sha256", temp),
|
|
"sha256:" + v)
|
|
tempdir.cleanup()
|
|
|
|
def create_small_database(self, tempdir):
|
|
temp = tempdir.name + "/test"
|
|
with open(temp, "w") as fp:
|
|
fp.write(":\n")
|
|
return temp
|
|
|
|
def create_filestorage(self, *args):
|
|
return self.create_filestorage_hash("sha256", *args)
|
|
|
|
def create_filestorage_hash(self, hash, *args):
|
|
|
|
class Simple:
|
|
pass
|
|
x = Simple()
|
|
x.tempdir = tempfile.TemporaryDirectory()
|
|
x.tempfile = self.create_small_database(x.tempdir)
|
|
x.fs = FileStorage("csv", x.tempfile, hash, *args)
|
|
return x
|
|
|
|
def test_missing_not_forgiving(self):
|
|
x = self.create_filestorage({"forgiving": False})
|
|
with self.assertRaisesRegex(newfol.exception.UpgradeNeededError,
|
|
'missing checksum'):
|
|
x.fs.load()
|
|
x.tempdir.cleanup()
|
|
|
|
def test_missing_forgiving(self):
|
|
x = self.create_filestorage({"forgiving": True})
|
|
x.fs.load()
|
|
x.tempdir.cleanup()
|
|
|
|
def test_missing(self):
|
|
x = self.create_filestorage()
|
|
x.fs.load()
|
|
x.tempdir.cleanup()
|
|
|
|
def test_incorrect(self):
|
|
x = self.create_filestorage()
|
|
x.fs.store([Record(["", ""])])
|
|
with open(x.tempfile + ".checksum", "w") as fp:
|
|
# Something other than the correct value.
|
|
fp.write("sha256:2:%s\n" % self.WRONG)
|
|
with self.assertRaisesRegex(newfol.exception.FilemanipError,
|
|
'corrupt'):
|
|
x.fs.load()
|
|
x.tempdir.cleanup()
|
|
|
|
def test_correct(self):
|
|
x = self.create_filestorage()
|
|
for k in self.testcases:
|
|
v = self.generate_len_and_hash(hashlib.sha256, k)
|
|
with open(x.tempfile, "w") as f:
|
|
# These sequences of bytes are valid CSV files. They consist
|
|
# of a single record with a single field containing the
|
|
# characters in question.
|
|
f.write(k)
|
|
with open(x.tempfile + ".checksum", "w") as fp:
|
|
fp.write("sha256:%s\n" % v)
|
|
x.fs.load()
|
|
x.tempdir.cleanup()
|
|
|
|
def test_autoselection(self):
|
|
x = self.create_filestorage_hash("hash")
|
|
x.fs.store([Record(["", ""])])
|
|
with open(x.tempfile + ".checksum", "r") as fp:
|
|
data = fp.read()
|
|
bitness = HashTransactionStore._bitness()
|
|
self.assertIn(bitness, [32, 64])
|
|
self.assertTrue(data.startswith("sha512" if bitness == 64 else
|
|
"sha256"))
|
|
x.tempdir.cleanup()
|
|
|
|
def test_autodetection(self):
|
|
for i in ["sha256", "sha384", "sha512"]:
|
|
x = self.create_filestorage_hash(i)
|
|
x.fs.store([Record(["", ""])])
|
|
fs = FileStorage("csv", x.tempfile, "hash")
|
|
recs = fs.load()
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertIsInstance(recs[0], Record)
|
|
self.assertEqual(recs[0].fields[:], ["", ""])
|
|
with open(x.tempfile + ".checksum", "w") as fp:
|
|
# Something other than the correct value.
|
|
fp.write("%s:2:%s\n" % (i, self.WRONG))
|
|
with self.assertRaisesRegex(newfol.exception.FilemanipError,
|
|
'corrupt'):
|
|
x.fs.load()
|
|
x.tempdir.cleanup()
|
|
|
|
def test_canonicalization(self):
|
|
for i in ("sha256", "sha384", "sha512"):
|
|
result = FileStorage._canonicalize_transaction_types(["hash", i])
|
|
self.assertEqual(result, [i])
|
|
self.assertEqual(FileStorage._canonicalize_transaction_types(None),
|
|
None)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|