Merge branch 'master' into peter
This commit is contained in:
commit
2f663da0ec
3 changed files with 70 additions and 4 deletions
|
|
@ -7,7 +7,9 @@ import json
|
|||
import os
|
||||
import os.path
|
||||
import pickle
|
||||
import pwd
|
||||
import re
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import uuid
|
||||
|
|
@ -113,10 +115,21 @@ class GitTransactionStore(TransactionStore):
|
|||
def _call_git(*args):
|
||||
with open(os.devnull, "r") as devnullr:
|
||||
with open(os.devnull, "w+") as devnullw:
|
||||
cmdline = ["git"]
|
||||
env = dict(os.environ)
|
||||
for i in ('NAME', 'EMAIL', 'DATE'):
|
||||
for j in ('AUTHOR', 'COMMITTER'):
|
||||
try:
|
||||
del env['GIT_%s_%s' % (j, i)]
|
||||
except KeyError:
|
||||
pass
|
||||
user = pwd.getpwuid(os.getuid())[0]
|
||||
fqdn = socket.getfqdn()
|
||||
cmdline = ["git", "-c", "user.name=newfol",
|
||||
"-c", "user.email=" + user + "@" + fqdn]
|
||||
cmdline.extend(args)
|
||||
return subprocess.call(cmdline, stdin=devnullr, stdout=devnullw,
|
||||
stderr=devnullw)
|
||||
p = subprocess.Popen(cmdline, stdin=devnullr, stdout=devnullw,
|
||||
stderr=devnullw, env=env)
|
||||
return p.wait()
|
||||
def prepare_open(self, filename, mode):
|
||||
(self._prefix, self._name) = os.path.split(filename)
|
||||
self._mode = mode
|
||||
|
|
|
|||
|
|
@ -2,6 +2,11 @@
|
|||
|
||||
from newfol.database import DatabaseVersion, Database, Schema
|
||||
from newfol.filemanip import Record
|
||||
import os
|
||||
import os.path
|
||||
import pwd
|
||||
import socket
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
class TestDatabaseVersion(unittest.TestCase):
|
||||
|
|
@ -41,5 +46,35 @@ class TestDatabaseAccessors(unittest.TestCase):
|
|||
self.assertEqual(obj.serialization(),
|
||||
DatabaseVersion.preferred().serialization())
|
||||
|
||||
class TestGitTransactions(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ddir = tempfile.TemporaryDirectory()
|
||||
fp = open(self.ddir.name + "/schema", "w")
|
||||
fp.write("fmt:0:newfol schema file:\ntxn:git\n")
|
||||
fp.close()
|
||||
self.db = Database.load(self.ddir.name)
|
||||
def test_has_git_dir(self):
|
||||
self.db.store()
|
||||
self.assertTrue(os.path.isdir(self.ddir.name + "/.git"))
|
||||
def get_format_data(self, fmt):
|
||||
cwd = os.getcwd()
|
||||
data = None
|
||||
try:
|
||||
os.chdir(self.ddir.name)
|
||||
fp = os.popen("git log --pretty=format:" + fmt + " -n1")
|
||||
data = fp.read()
|
||||
fp.close()
|
||||
finally:
|
||||
os.chdir(cwd)
|
||||
return data
|
||||
def test_correct_committer_name(self):
|
||||
self.db.store()
|
||||
self.assertEqual(self.get_format_data("%cn"), "newfol")
|
||||
def test_correct_committer_email(self):
|
||||
self.db.store()
|
||||
user = pwd.getpwuid(os.getuid())[0]
|
||||
fqdn = socket.getfqdn()
|
||||
self.assertEqual(self.get_format_data("%ce"), user + "@" + fqdn)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
|
|
@ -22,16 +22,34 @@ class TestRoundTripping(unittest.TestCase):
|
|||
else:
|
||||
self.assertEqual(a.uuid, b.uuid)
|
||||
self.assertEqual(a.table, b.table)
|
||||
def check_serialized_data(self, fs, filename, recs):
|
||||
fp = open(filename, "rb")
|
||||
data = fp.read()
|
||||
fp.close()
|
||||
fs.store(recs)
|
||||
fp = open(filename, "rb")
|
||||
data2 = fp.read()
|
||||
fp.close()
|
||||
self.assertEqual(data, data2)
|
||||
def run_test(self, fmt, lossy=False):
|
||||
options = {"table-is-field-0": True}
|
||||
fs = FileStorage(fmt, self.tempdir.name + "/" + fmt, options=options)
|
||||
filename = self.tempdir.name + "/" + fmt
|
||||
fs = FileStorage(fmt, filename, options=options)
|
||||
fs.store(self.data)
|
||||
m = fs.load()
|
||||
self.compare(self.data, m, lossy)
|
||||
self.check_serialized_data(fs, filename, m)
|
||||
def test_csv(self):
|
||||
self.run_test("csv", True)
|
||||
def test_json(self):
|
||||
self.run_test("json")
|
||||
def test_yaml(self):
|
||||
options = {"table-is-field-0": True}
|
||||
fmt = "yaml"
|
||||
filename = self.tempdir.name + "/" + fmt
|
||||
fs = FileStorage(fmt, filename, options=options)
|
||||
fs.store(self.data)
|
||||
self.check_serialized_data(fs, filename, self.data)
|
||||
def test_pickle(self):
|
||||
self.run_test("pickle")
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue