From 10ae6ffecf90402a1ee1e7c04af4e1fd7190eb73 Mon Sep 17 00:00:00 2001 From: Ivan Yonchovski Date: Sun, 13 Nov 2022 20:37:54 +0200 Subject: [PATCH] Add some synchronization when sending/receiving notifications Originally created by @606u --- src/json.c | 123 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 25 deletions(-) diff --git a/src/json.c b/src/json.c index c87f156432c..102927a770c 100644 --- a/src/json.c +++ b/src/json.c @@ -20,6 +20,7 @@ along with GNU Emacs. If not, see . */ #include #include +#include #include #include #include @@ -1007,6 +1008,7 @@ usage: (json-parse-string STRING &rest ARGS) */) struct json_rpc_state { + pthread_mutex_t handle_mx; struct SSP_Handle* handle; json_t* message; json_error_t error; @@ -1015,12 +1017,48 @@ struct json_rpc_state int error_buffer_read; }; +/* Usage: + * if (can_use_handle (state)) + * { + * ... use state->handle + * end_using_handle (state); + * } + */ +inline static void +end_using_handle (struct json_rpc_state *state) +{ + assert (state->handle); + pthread_mutex_unlock (&state->handle_mx); +} +inline static int +can_use_handle (struct json_rpc_state *state) +{ + if (pthread_mutex_lock (&state->handle_mx) == 0) + { + if (state->handle) + return 1; /* handle is good */ + /* else handle is already gone */ + end_using_handle (state); + } + return 0; +} + inline static void CHECK_RPC_CONNECTION (Lisp_Object obj) { CHECK_TYPE (USER_PTRP (obj), Quser_ptrp, obj); } +static void +json_rpc_state_free (void *ptr) +{ + struct json_rpc_state *state = ptr; + assert (state->handle == NULL); /* Loop must be exited */ + pthread_mutex_destroy (&state->handle_mx); + free (state); +} + + DEFUN ("json-rpc-connection", Fjson_rpc_connection, Sjson_rpc_connection, 1, MANY, NULL, doc: /* Create JSONRPC connection. */) @@ -1050,15 +1088,18 @@ DEFUN ("json-rpc-connection", Fjson_rpc_connection, Sjson_rpc_connection, 1, MAN else { struct json_rpc_state *state = malloc (sizeof (struct json_rpc_state)); + /* TODO: state might be NULL */ + pthread_mutex_init (&state->handle_mx, NULL); + /* TODO: mutex_init could fail */ state->handle = handle; SAFE_FREE (); - return make_user_ptr (json_free, state); + return make_user_ptr (json_rpc_state_free, state); } } struct json_rpc_send_params { - struct SSP_Handle* handle; + struct json_rpc_state *state; json_t* message; }; @@ -1066,21 +1107,28 @@ static void json_rpc_send_callback (void * arg) { struct json_rpc_send_params *param = arg; + struct json_rpc_state *state = param->state; + json_t *message = param->message; struct thread_state *self = current_thread; - release_global_lock (); - sys_thread_yield (); + if (can_use_handle (state)) + { + release_global_lock (); + sys_thread_yield (); - struct SSP_Handle* process = param->handle; - - char *string = json_dumps (param->message, JSON_COMPACT | JSON_ENCODE_ANY); - size_t size = strlen(string); - char *msg = malloc(size + 100); - sprintf(msg, "Content-Length: %zu\r\n\r\n%s", size, string); - process->send(process, msg, strlen(msg)); - free(string); - free(msg); - acquire_global_lock (self); + char *string = json_dumps (message, JSON_COMPACT | JSON_ENCODE_ANY); + /* TODO: no point in copying whole message */ + size_t size = strlen (string); + char *msg = malloc (size + 100); + /* TODO: missing test if msg != NULL */ + sprintf (msg, "Content-Length: %zu\r\n\r\n%s", size, string); + /* TODO: send could do a partial send */ + state->handle->send (state->handle, msg, strlen(msg)); + end_using_handle (state); + free (msg); + free (string); + acquire_global_lock (self); + } } static struct json_rpc_state * json_rpc_state(Lisp_Object connection) { @@ -1094,7 +1142,6 @@ DEFUN ("json-rpc-send", Fjson_rpc_send, Sjson_rpc_send, 1, MANY, { Lisp_Object connection = args[0]; CHECK_RPC_CONNECTION(connection); - struct SSP_Handle* handle = json_rpc_state(connection)->handle; struct json_configuration conf = {json_object_hashtable, json_array_array, QCnull, QCfalse}; @@ -1102,7 +1149,11 @@ DEFUN ("json-rpc-send", Fjson_rpc_send, Sjson_rpc_send, 1, MANY, json_t *message = lisp_to_json (args[1], &conf); - struct json_rpc_send_params params = {.message = message, .handle = handle}; + /* TODO: params is on the stack; is this an issue? */ + struct json_rpc_send_params params = { + .state = json_rpc_state(connection), + .message = message + }; flush_stack_call_func (json_rpc_send_callback, ¶ms); return Qnil; } @@ -1111,9 +1162,13 @@ DEFUN ("json-rpc-shutdown", Fjson_rpc_shutdown, Sjson_rpc_shutdown, 1, 1, 0, doc: /* Shutdowns json rpc connection */) (Lisp_Object connection) { - CHECK_RPC_CONNECTION(connection); - struct SSP_Handle* handle = json_rpc_state(connection)->handle; - handle->cancel_recv(handle); + CHECK_RPC_CONNECTION (connection); + struct json_rpc_state *state = json_rpc_state (connection); + if (can_use_handle (state)) + { + state->handle->cancel_recv (state->handle); + end_using_handle (state); + } return Qnil; } @@ -1121,9 +1176,15 @@ DEFUN ("json-rpc-pid", Fjson_rpc_pid, Sjson_rpc_pid, 1, 1, 0, doc: /* Shutdowns json rpc connection */) (Lisp_Object connection) { - CHECK_RPC_CONNECTION(connection); - struct SSP_Handle* handle = json_rpc_state(connection)->handle; - return make_int(handle->pid); + int res = 0; /* or -1? */ + CHECK_RPC_CONNECTION (connection); + struct json_rpc_state *state = json_rpc_state (connection); + if (can_use_handle (state)) + { + res = state->handle->pid; + end_using_handle (state); + } + return make_int (res); } DEFUN ("json-rpc-stderr", Fjson_rpc_stderr, Sjson_rpc_stderr, 1, 1, 0, @@ -1139,9 +1200,15 @@ DEFUN ("json-rpc-alive-p", Fjson_rpc_alive_p, Sjson_rpc_alive_p, 1, 1, 0, doc: /* Returns if json rpc connection is alive */) (Lisp_Object connection) { + int res = 0; /* is not, by default */ CHECK_RPC_CONNECTION(connection); - struct SSP_Handle* handle = json_rpc_state(connection)->handle; - return (handle->isalive(handle))? Qt : Qnil; + struct json_rpc_state *state = json_rpc_state (connection); + if (can_use_handle (state)) + { + res = state->handle->isalive (state->handle); + end_using_handle (state); + } + return res ? Qt : Qnil; } static size_t read_stdout (struct json_rpc_state *param, char *buffer, @@ -1317,7 +1384,13 @@ DEFUN ("json-rpc", Fjson_rpc, Sjson_rpc, 1, MANY, } } CALLN (Ffuncall, callback, Qnil, Qnil, Qt); - param->handle->close(param->handle); + if (pthread_mutex_lock (¶m->handle_mx) == 0) + { + param->handle->close (param->handle); + param->handle = NULL; + pthread_mutex_unlock (¶m->handle_mx); + } + /* TODO: what if mutex_lock fails? */ return Qnil; }