mirror of
git://git.sv.gnu.org/emacs.git
synced 2026-06-14 04:21:24 +00:00
Most of these tests are for the scontrol/"anxious continuation" mechanism The new ERT tests use Python subprocesses via stdin/stdout pipe as JSONRPC endpoints. A shared framing library lives in jsonrpc-resources/common.py. * test/lisp/jsonrpc-tests.el (jsonrpc--test-dir): New constant. (jsonrpc--with-python-fixture): New macro. (scontrol-remote-during-sync): New test. (scontrol-anxious-nested): New test. (scontrol-remote-error): New test. (shutdown-clean-after-notification): New test. * test/lisp/jsonrpc-resources/common.py: New file. * test/lisp/jsonrpc-resources/server-remote-during-sync.py: New file. * test/lisp/jsonrpc-resources/server-anxious-nested.py: New file. * test/lisp/jsonrpc-resources/server-remote-error.py: New file. * test/lisp/jsonrpc-resources/server-harakiri.py: New file.
37 lines
1 KiB
Python
37 lines
1 KiB
Python
"""Common JSONRPC framing helpers for jsonrpc.el test servers."""
|
|
|
|
import json
|
|
import sys
|
|
|
|
|
|
def read_msg():
|
|
"""Read one Content-Length-framed JSON-RPC message from stdin."""
|
|
headers = {}
|
|
while True:
|
|
line = sys.stdin.buffer.readline()
|
|
if not line:
|
|
return None
|
|
text = line.decode('utf-8').rstrip('\r\n')
|
|
if not text:
|
|
break
|
|
if ':' in text:
|
|
k, _, v = text.partition(':')
|
|
headers[k.strip()] = v.strip()
|
|
n = int(headers.get('Content-Length', 0))
|
|
if not n:
|
|
return None
|
|
return json.loads(sys.stdin.buffer.read(n).decode('utf-8'))
|
|
|
|
|
|
def write_msg(msg):
|
|
"""Write one Content-Length-framed JSON-RPC message to stdout."""
|
|
body = json.dumps(msg, ensure_ascii=False).encode('utf-8')
|
|
sys.stdout.buffer.write(
|
|
f'Content-Length: {len(body)}\r\n\r\n'.encode('utf-8') + body
|
|
)
|
|
sys.stdout.buffer.flush()
|
|
|
|
|
|
def log(text):
|
|
"""Write a log line to stderr."""
|
|
print(f'[test-server] {text}', file=sys.stderr, flush=True)
|