Add some synchronization when sending/receiving notifications

Originally created by @606u
This commit is contained in:
Ivan Yonchovski 2022-11-13 20:37:54 +02:00 committed by Benson Chu
parent f540ccd92b
commit 10ae6ffecf

View file

@ -20,6 +20,7 @@ along with GNU Emacs. If not, see <https://www.gnu.org/licenses/>. */
#include <config.h>
#include <errno.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
@ -1007,6 +1008,7 @@ usage: (json-parse-string STRING &rest ARGS) */)
struct json_rpc_state
{
pthread_mutex_t handle_mx;
struct SSP_Handle* handle;
json_t* message;
json_error_t error;
@ -1015,12 +1017,48 @@ struct json_rpc_state
int error_buffer_read;
};
/* Usage:
* if (can_use_handle (state))
* {
* ... use state->handle
* end_using_handle (state);
* }
*/
inline static void
end_using_handle (struct json_rpc_state *state)
{
assert (state->handle);
pthread_mutex_unlock (&state->handle_mx);
}
inline static int
can_use_handle (struct json_rpc_state *state)
{
if (pthread_mutex_lock (&state->handle_mx) == 0)
{
if (state->handle)
return 1; /* handle is good */
/* else handle is already gone */
end_using_handle (state);
}
return 0;
}
inline static void
CHECK_RPC_CONNECTION (Lisp_Object obj)
{
CHECK_TYPE (USER_PTRP (obj), Quser_ptrp, obj);
}
static void
json_rpc_state_free (void *ptr)
{
struct json_rpc_state *state = ptr;
assert (state->handle == NULL); /* Loop must be exited */
pthread_mutex_destroy (&state->handle_mx);
free (state);
}
DEFUN ("json-rpc-connection", Fjson_rpc_connection, Sjson_rpc_connection, 1, MANY,
NULL,
doc: /* Create JSONRPC connection. */)
@ -1050,15 +1088,18 @@ DEFUN ("json-rpc-connection", Fjson_rpc_connection, Sjson_rpc_connection, 1, MAN
else
{
struct json_rpc_state *state = malloc (sizeof (struct json_rpc_state));
/* TODO: state might be NULL */
pthread_mutex_init (&state->handle_mx, NULL);
/* TODO: mutex_init could fail */
state->handle = handle;
SAFE_FREE ();
return make_user_ptr (json_free, state);
return make_user_ptr (json_rpc_state_free, state);
}
}
struct json_rpc_send_params
{
struct SSP_Handle* handle;
struct json_rpc_state *state;
json_t* message;
};
@ -1066,21 +1107,28 @@ static void
json_rpc_send_callback (void * arg)
{
struct json_rpc_send_params *param = arg;
struct json_rpc_state *state = param->state;
json_t *message = param->message;
struct thread_state *self = current_thread;
release_global_lock ();
sys_thread_yield ();
if (can_use_handle (state))
{
release_global_lock ();
sys_thread_yield ();
struct SSP_Handle* process = param->handle;
char *string = json_dumps (param->message, JSON_COMPACT | JSON_ENCODE_ANY);
size_t size = strlen(string);
char *msg = malloc(size + 100);
sprintf(msg, "Content-Length: %zu\r\n\r\n%s", size, string);
process->send(process, msg, strlen(msg));
free(string);
free(msg);
acquire_global_lock (self);
char *string = json_dumps (message, JSON_COMPACT | JSON_ENCODE_ANY);
/* TODO: no point in copying whole message */
size_t size = strlen (string);
char *msg = malloc (size + 100);
/* TODO: missing test if msg != NULL */
sprintf (msg, "Content-Length: %zu\r\n\r\n%s", size, string);
/* TODO: send could do a partial send */
state->handle->send (state->handle, msg, strlen(msg));
end_using_handle (state);
free (msg);
free (string);
acquire_global_lock (self);
}
}
static struct json_rpc_state * json_rpc_state(Lisp_Object connection) {
@ -1094,7 +1142,6 @@ DEFUN ("json-rpc-send", Fjson_rpc_send, Sjson_rpc_send, 1, MANY,
{
Lisp_Object connection = args[0];
CHECK_RPC_CONNECTION(connection);
struct SSP_Handle* handle = json_rpc_state(connection)->handle;
struct json_configuration conf =
{json_object_hashtable, json_array_array, QCnull, QCfalse};
@ -1102,7 +1149,11 @@ DEFUN ("json-rpc-send", Fjson_rpc_send, Sjson_rpc_send, 1, MANY,
json_t *message = lisp_to_json (args[1], &conf);
struct json_rpc_send_params params = {.message = message, .handle = handle};
/* TODO: params is on the stack; is this an issue? */
struct json_rpc_send_params params = {
.state = json_rpc_state(connection),
.message = message
};
flush_stack_call_func (json_rpc_send_callback, &params);
return Qnil;
}
@ -1111,9 +1162,13 @@ DEFUN ("json-rpc-shutdown", Fjson_rpc_shutdown, Sjson_rpc_shutdown, 1, 1, 0,
doc: /* Shutdowns json rpc connection */)
(Lisp_Object connection)
{
CHECK_RPC_CONNECTION(connection);
struct SSP_Handle* handle = json_rpc_state(connection)->handle;
handle->cancel_recv(handle);
CHECK_RPC_CONNECTION (connection);
struct json_rpc_state *state = json_rpc_state (connection);
if (can_use_handle (state))
{
state->handle->cancel_recv (state->handle);
end_using_handle (state);
}
return Qnil;
}
@ -1121,9 +1176,15 @@ DEFUN ("json-rpc-pid", Fjson_rpc_pid, Sjson_rpc_pid, 1, 1, 0,
doc: /* Shutdowns json rpc connection */)
(Lisp_Object connection)
{
CHECK_RPC_CONNECTION(connection);
struct SSP_Handle* handle = json_rpc_state(connection)->handle;
return make_int(handle->pid);
int res = 0; /* or -1? */
CHECK_RPC_CONNECTION (connection);
struct json_rpc_state *state = json_rpc_state (connection);
if (can_use_handle (state))
{
res = state->handle->pid;
end_using_handle (state);
}
return make_int (res);
}
DEFUN ("json-rpc-stderr", Fjson_rpc_stderr, Sjson_rpc_stderr, 1, 1, 0,
@ -1139,9 +1200,15 @@ DEFUN ("json-rpc-alive-p", Fjson_rpc_alive_p, Sjson_rpc_alive_p, 1, 1, 0,
doc: /* Returns if json rpc connection is alive */)
(Lisp_Object connection)
{
int res = 0; /* is not, by default */
CHECK_RPC_CONNECTION(connection);
struct SSP_Handle* handle = json_rpc_state(connection)->handle;
return (handle->isalive(handle))? Qt : Qnil;
struct json_rpc_state *state = json_rpc_state (connection);
if (can_use_handle (state))
{
res = state->handle->isalive (state->handle);
end_using_handle (state);
}
return res ? Qt : Qnil;
}
static size_t read_stdout (struct json_rpc_state *param, char *buffer,
@ -1317,7 +1384,13 @@ DEFUN ("json-rpc", Fjson_rpc, Sjson_rpc, 1, MANY,
}
}
CALLN (Ffuncall, callback, Qnil, Qnil, Qt);
param->handle->close(param->handle);
if (pthread_mutex_lock (&param->handle_mx) == 0)
{
param->handle->close (param->handle);
param->handle = NULL;
pthread_mutex_unlock (&param->handle_mx);
}
/* TODO: what if mutex_lock fails? */
return Qnil;
}