diff options
author | Bram Moolenaar <Bram@vim.org> | 2016-10-09 17:28:01 +0200 |
---|---|---|
committer | Bram Moolenaar <Bram@vim.org> | 2016-10-09 17:28:01 +0200 |
commit | dc0ccaee68ca24d10050117fbec757ad33590a17 (patch) | |
tree | 8a98b130b1cd6b91b209c8163220da3f6c5ae440 | |
parent | 9b4579481892a62e7e002498b9eddaaf75bbda49 (diff) | |
download | vim-dc0ccaee68ca24d10050117fbec757ad33590a17.zip |
patch 8.0.0027
Problem: A channel is closed when reading on stderr or stdout fails, but
there may still be something to read on another part.
Solution: Turn ch_to_be_closed into a bitfield. (Ozaki Kiichi)
-rw-r--r-- | src/channel.c | 220 | ||||
-rw-r--r-- | src/eval.c | 4 | ||||
-rw-r--r-- | src/proto/channel.pro | 29 | ||||
-rw-r--r-- | src/structs.h | 24 | ||||
-rw-r--r-- | src/testdir/test_channel.vim | 17 | ||||
-rw-r--r-- | src/version.c | 2 |
6 files changed, 161 insertions, 135 deletions
diff --git a/src/channel.c b/src/channel.c index ba6e7ec95..2d68287a4 100644 --- a/src/channel.c +++ b/src/channel.c @@ -54,7 +54,7 @@ # define fd_close(sd) close(sd) #endif -static void channel_read(channel_T *channel, int part, char *func); +static void channel_read(channel_T *channel, ch_part_T part, char *func); /* Whether a redraw is needed for appending a line to a buffer. */ static int channel_need_redraw = FALSE; @@ -309,7 +309,7 @@ static int next_ch_id = 0; channel_T * add_channel(void) { - int part; + ch_part_T part; channel_T *channel = (channel_T *)alloc_clear((int)sizeof(channel_T)); if (channel == NULL) @@ -318,7 +318,7 @@ add_channel(void) channel->ch_id = next_ch_id++; ch_log(channel, "Created channel"); - for (part = PART_SOCK; part <= PART_IN; ++part) + for (part = PART_SOCK; part < PART_COUNT; ++part) { channel->ch_part[part].ch_fd = INVALID_FD; #ifdef FEAT_GUI_X11 @@ -421,9 +421,7 @@ channel_free(channel_T *channel) if (!in_free_unref_items) { if (safe_to_invoke_callback == 0) - { channel->ch_to_be_freed = TRUE; - } else { channel_free_contents(channel); @@ -511,7 +509,7 @@ free_unused_channels(int copyID, int mask) channel_read_fd(int fd) { channel_T *channel; - int part; + ch_part_T part; channel = channel_fd2channel(fd, &part); if (channel == NULL) @@ -557,7 +555,7 @@ messageFromServer(gpointer clientData, #endif static void -channel_gui_register_one(channel_T *channel, int part) +channel_gui_register_one(channel_T *channel, ch_part_T part) { if (!CH_HAS_GUI) return; @@ -627,7 +625,7 @@ channel_gui_register_all(void) } static void -channel_gui_unregister_one(channel_T *channel, int part) +channel_gui_unregister_one(channel_T *channel, ch_part_T part) { # ifdef FEAT_GUI_X11 if (channel->ch_part[part].ch_inputHandler != (XtInputId)NULL) @@ -653,7 +651,7 @@ channel_gui_unregister_one(channel_T *channel, int part) static void channel_gui_unregister(channel_T *channel) { - int part; + ch_part_T part; for (part = PART_SOCK; part < PART_IN; ++part) channel_gui_unregister_one(channel, part); @@ -928,6 +926,7 @@ channel_open( channel->ch_nb_close_cb = nb_close_cb; channel->ch_hostname = (char *)vim_strsave((char_u *)hostname); channel->ch_port = port_in; + channel->ch_to_be_closed |= (1 << PART_SOCK); #ifdef FEAT_GUI channel_gui_register_one(channel, PART_SOCK); @@ -998,12 +997,19 @@ theend: } static void -may_close_part(sock_T *fd) +ch_close_part(channel_T *channel, ch_part_T part) { + sock_T *fd = &channel->ch_part[part].ch_fd; + if (*fd != INVALID_FD) { - fd_close(*fd); + if (part == PART_SOCK) + sock_close(*fd); + else + fd_close(*fd); *fd = INVALID_FD; + + channel->ch_to_be_closed &= ~(1 << part); } } @@ -1012,7 +1018,7 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err) { if (in != INVALID_FD) { - may_close_part(&channel->CH_IN_FD); + ch_close_part(channel, PART_IN); channel->CH_IN_FD = in; } if (out != INVALID_FD) @@ -1020,8 +1026,9 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err) # if defined(FEAT_GUI) channel_gui_unregister_one(channel, PART_OUT); # endif - may_close_part(&channel->CH_OUT_FD); + ch_close_part(channel, PART_OUT); channel->CH_OUT_FD = out; + channel->ch_to_be_closed |= (1 << PART_OUT); # if defined(FEAT_GUI) channel_gui_register_one(channel, PART_OUT); # endif @@ -1031,8 +1038,9 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err) # if defined(FEAT_GUI) channel_gui_unregister_one(channel, PART_ERR); # endif - may_close_part(&channel->CH_ERR_FD); + ch_close_part(channel, PART_ERR); channel->CH_ERR_FD = err; + channel->ch_to_be_closed |= (1 << PART_ERR); # if defined(FEAT_GUI) channel_gui_register_one(channel, PART_ERR); # endif @@ -1151,10 +1159,10 @@ set_callback( void channel_set_options(channel_T *channel, jobopt_T *opt) { - int part; + ch_part_T part; if (opt->jo_set & JO_MODE) - for (part = PART_SOCK; part <= PART_IN; ++part) + for (part = PART_SOCK; part < PART_COUNT; ++part) channel->ch_part[part].ch_mode = opt->jo_mode; if (opt->jo_set & JO_IN_MODE) channel->ch_part[PART_IN].ch_mode = opt->jo_in_mode; @@ -1164,7 +1172,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt) channel->ch_part[PART_ERR].ch_mode = opt->jo_err_mode; if (opt->jo_set & JO_TIMEOUT) - for (part = PART_SOCK; part <= PART_IN; ++part) + for (part = PART_SOCK; part < PART_COUNT; ++part) channel->ch_part[part].ch_timeout = opt->jo_timeout; if (opt->jo_set & JO_OUT_TIMEOUT) channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout; @@ -1282,7 +1290,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt) void channel_set_req_callback( channel_T *channel, - int part, + ch_part_T part, char_u *callback, partial_T *partial, int id) @@ -1448,7 +1456,7 @@ channel_write_in(channel_T *channel) ch_log(channel, "Finished writing all lines to channel"); /* Close the pipe/socket, so that the other side gets EOF. */ - may_close_part(&channel->CH_IN_FD); + ch_close_part(channel, PART_IN); } else ch_logn(channel, "Still %d more lines to write", @@ -1462,10 +1470,10 @@ channel_write_in(channel_T *channel) channel_buffer_free(buf_T *buf) { channel_T *channel; - int part; + ch_part_T part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) - for (part = PART_SOCK; part <= PART_IN; ++part) + for (part = PART_SOCK; part < PART_COUNT; ++part) { chanpart_T *ch_part = &channel->ch_part[part]; @@ -1574,7 +1582,7 @@ invoke_callback(channel_T *channel, char_u *callback, partial_T *partial, * Returns NULL if there is nothing. */ readq_T * -channel_peek(channel_T *channel, int part) +channel_peek(channel_T *channel, ch_part_T part) { readq_T *head = &channel->ch_part[part].ch_head; @@ -1604,7 +1612,7 @@ channel_first_nl(readq_T *node) * Returns NULL if there is nothing. */ char_u * -channel_get(channel_T *channel, int part) +channel_get(channel_T *channel, ch_part_T part) { readq_T *head = &channel->ch_part[part].ch_head; readq_T *node = head->rq_next; @@ -1628,7 +1636,7 @@ channel_get(channel_T *channel, int part) * Replaces NUL bytes with NL. */ static char_u * -channel_get_all(channel_T *channel, int part) +channel_get_all(channel_T *channel, ch_part_T part) { readq_T *head = &channel->ch_part[part].ch_head; readq_T *node = head->rq_next; @@ -1677,7 +1685,7 @@ channel_get_all(channel_T *channel, int part) * Caller must check these bytes are available. */ void -channel_consume(channel_T *channel, int part, int len) +channel_consume(channel_T *channel, ch_part_T part, int len) { readq_T *head = &channel->ch_part[part].ch_head; readq_T *node = head->rq_next; @@ -1693,7 +1701,7 @@ channel_consume(channel_T *channel, int part, int len) * When "want_nl" is TRUE collapse more buffers until a NL is found. */ int -channel_collapse(channel_T *channel, int part, int want_nl) +channel_collapse(channel_T *channel, ch_part_T part, int want_nl) { readq_T *head = &channel->ch_part[part].ch_head; readq_T *node = head->rq_next; @@ -1753,7 +1761,7 @@ channel_collapse(channel_T *channel, int part, int want_nl) * Returns OK or FAIL. */ static int -channel_save(channel_T *channel, int part, char_u *buf, int len, +channel_save(channel_T *channel, ch_part_T part, char_u *buf, int len, int prepend, char *lead) { readq_T *node; @@ -1828,7 +1836,7 @@ channel_save(channel_T *channel, int part, char_u *buf, int len, channel_fill(js_read_T *reader) { channel_T *channel = (channel_T *)reader->js_cookie; - int part = reader->js_cookie_arg; + ch_part_T part = reader->js_cookie_arg; char_u *next = channel_get(channel, part); int unused; int len; @@ -1866,7 +1874,7 @@ channel_fill(js_read_T *reader) * Return TRUE if there is more to read. */ static int -channel_parse_json(channel_T *channel, int part) +channel_parse_json(channel_T *channel, ch_part_T part) { js_read_T reader; typval_T listtv; @@ -2046,7 +2054,7 @@ remove_json_node(jsonq_T *head, jsonq_T *node) * Return FAIL otherwise. */ static int -channel_get_json(channel_T *channel, int part, int id, typval_T **rettv) +channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv) { jsonq_T *head = &channel->ch_part[part].ch_json_head; jsonq_T *item = head->jq_next; @@ -2080,7 +2088,7 @@ channel_get_json(channel_T *channel, int part, int id, typval_T **rettv) * "argv[1]" etc. have further arguments, type is VAR_UNKNOWN if missing. */ static void -channel_exe_cmd(channel_T *channel, int part, typval_T *argv) +channel_exe_cmd(channel_T *channel, ch_part_T part, typval_T *argv) { char_u *cmd = argv[0].vval.v_string; char_u *arg; @@ -2237,7 +2245,7 @@ invoke_one_time_callback( } static void -append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, int part) +append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, ch_part_T part) { buf_T *save_curbuf = curbuf; linenr_T lnum = buffer->b_ml.ml_line_count; @@ -2332,7 +2340,7 @@ append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, int part) } static void -drop_messages(channel_T *channel, int part) +drop_messages(channel_T *channel, ch_part_T part) { char_u *msg; @@ -2349,7 +2357,7 @@ drop_messages(channel_T *channel, int part) * Return TRUE when a message was handled, there might be another one. */ static int -may_invoke_callback(channel_T *channel, int part) +may_invoke_callback(channel_T *channel, ch_part_T part) { char_u *msg = NULL; typval_T *listtv = NULL; @@ -2596,7 +2604,7 @@ channel_is_open(channel_T *channel) * Return TRUE if "channel" has JSON or other typeahead. */ static int -channel_has_readahead(channel_T *channel, int part) +channel_has_readahead(channel_T *channel, ch_part_T part) { ch_mode_T ch_mode = channel->ch_part[part].ch_mode; @@ -2617,7 +2625,7 @@ channel_has_readahead(channel_T *channel, int part) char * channel_status(channel_T *channel, int req_part) { - int part; + ch_part_T part; int has_readahead = FALSE; if (channel == NULL) @@ -2640,7 +2648,7 @@ channel_status(channel_T *channel, int req_part) { if (channel_is_open(channel)) return "open"; - for (part = PART_SOCK; part <= PART_ERR; ++part) + for (part = PART_SOCK; part < PART_IN; ++part) if (channel_has_readahead(channel, part)) { has_readahead = TRUE; @@ -2654,7 +2662,7 @@ channel_status(channel_T *channel, int req_part) } static void -channel_part_info(channel_T *channel, dict_T *dict, char *name, int part) +channel_part_info(channel_T *channel, dict_T *dict, char *name, ch_part_T part) { chanpart_T *chanpart = &channel->ch_part[part]; char namebuf[20]; /* longest is "sock_timeout" */ @@ -2736,28 +2744,24 @@ channel_close(channel_T *channel, int invoke_close_cb) channel_gui_unregister(channel); #endif - if (channel->CH_SOCK_FD != INVALID_FD) - { - sock_close(channel->CH_SOCK_FD); - channel->CH_SOCK_FD = INVALID_FD; - } - may_close_part(&channel->CH_IN_FD); - may_close_part(&channel->CH_OUT_FD); - may_close_part(&channel->CH_ERR_FD); + ch_close_part(channel, PART_SOCK); + ch_close_part(channel, PART_IN); + ch_close_part(channel, PART_OUT); + ch_close_part(channel, PART_ERR); if (invoke_close_cb && channel->ch_close_cb != NULL) { typval_T argv[1]; typval_T rettv; int dummy; - int part; + ch_part_T part; /* Invoke callbacks before the close callback, since it's weird to * first invoke the close callback. Increment the refcount to avoid * the channel being freed halfway. */ ++channel->ch_refcount; ch_log(channel, "Invoking callbacks before closing"); - for (part = PART_SOCK; part <= PART_ERR; ++part) + for (part = PART_SOCK; part < PART_IN; ++part) while (may_invoke_callback(channel, part)) ; @@ -2789,7 +2793,7 @@ channel_close(channel_T *channel, int invoke_close_cb) } /* any remaining messages are useless now */ - for (part = PART_SOCK; part <= PART_ERR; ++part) + for (part = PART_SOCK; part < PART_IN; ++part) drop_messages(channel, part); } @@ -2802,14 +2806,14 @@ channel_close(channel_T *channel, int invoke_close_cb) void channel_close_in(channel_T *channel) { - may_close_part(&channel->CH_IN_FD); + ch_close_part(channel, PART_IN); } /* * Clear the read buffer on "channel"/"part". */ static void -channel_clear_one(channel_T *channel, int part) +channel_clear_one(channel_T *channel, ch_part_T part) { jsonq_T *json_head = &channel->ch_part[part].ch_json_head; cbq_T *cb_head = &channel->ch_part[part].ch_cb_head; @@ -3043,11 +3047,20 @@ channel_wait(channel_T *channel, sock_T fd, int timeout) } static void -channel_close_on_error(channel_T *channel, char *func) +ch_close_part_on_error( + channel_T *channel, ch_part_T part, int is_err, char *func) { - /* Do not call emsg(), most likely the other end just exited. */ - ch_errors(channel, "%s(): Cannot read from channel, will close it soon", - func); + char msgbuf[80]; + + vim_snprintf(msgbuf, sizeof(msgbuf), + "%%s(): Read %s from ch_part[%d], closing", + (is_err ? "error" : "EOF"), part); + + if (is_err) + /* Do not call emsg(), most likely the other end just exited. */ + ch_errors(channel, msgbuf, func); + else + ch_logs(channel, msgbuf, func); /* Queue a "DETACH" netbeans message in the command queue in order to * terminate the netbeans session later. Do not end the session here @@ -3064,21 +3077,20 @@ channel_close_on_error(channel_T *channel, char *func) channel_save(channel, PART_SOCK, (char_u *)DETACH_MSG_RAW, (int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT "); - /* When reading from stdout is not possible, assume the other side has - * died. Don't close the channel right away, it may be the wrong moment - * to invoke callbacks. */ - channel->ch_to_be_closed = TRUE; + /* When reading is not possible close this part of the channel. Don't + * close the channel yet, there may be something to read on another part. */ + ch_close_part(channel, part); #ifdef FEAT_GUI /* Stop listening to GUI events right away. */ - channel_gui_unregister(channel); + channel_gui_unregister_one(channel, part); #endif } static void channel_close_now(channel_T *channel) { - ch_log(channel, "Closing channel because of previous read error"); + ch_log(channel, "Closing channel because all readable fds are closed"); channel_close(channel, TRUE); if (channel->ch_nb_close_cb != NULL) (*channel->ch_nb_close_cb)(); @@ -3090,7 +3102,7 @@ channel_close_now(channel_T *channel) * The data is put in the read queue. No callbacks are invoked here. */ static void -channel_read(channel_T *channel, int part, char *func) +channel_read(channel_T *channel, ch_part_T part, char *func) { static char_u *buf = NULL; int len = 0; @@ -3098,14 +3110,11 @@ channel_read(channel_T *channel, int part, char *func) sock_T fd; int use_socket = FALSE; - /* If we detected a read error don't try reading again. */ - if (channel->ch_to_be_closed) - return; - fd = channel->ch_part[part].ch_fd; if (fd == INVALID_FD) { - ch_error(channel, "channel_read() called while socket is closed"); + ch_errors(channel, "channel_read() called while %s part is closed", + part_names[part]); return; } use_socket = fd == channel->CH_SOCK_FD; @@ -3141,7 +3150,7 @@ channel_read(channel_T *channel, int part, char *func) /* Reading a disconnection (readlen == 0), or an error. */ if (readlen <= 0) - channel_close_on_error(channel, func); + ch_close_part_on_error(channel, part, (len < 0), func); #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK) /* signal the main loop that there is something to read */ @@ -3157,7 +3166,7 @@ channel_read(channel_T *channel, int part, char *func) * Returns NULL in case of error or timeout. */ char_u * -channel_read_block(channel_T *channel, int part, int timeout) +channel_read_block(channel_T *channel, ch_part_T part, int timeout) { char_u *buf; char_u *msg; @@ -3237,7 +3246,7 @@ channel_read_block(channel_T *channel, int part, int timeout) int channel_read_json_block( channel_T *channel, - int part, + ch_part_T part, int timeout_arg, int id, typval_T **rettv) @@ -3323,7 +3332,7 @@ channel_read_json_block( common_channel_read(typval_T *argvars, typval_T *rettv, int raw) { channel_T *channel; - int part = -1; + ch_part_T part = PART_COUNT; jobopt_T opt; int mode; int timeout; @@ -3344,7 +3353,7 @@ common_channel_read(typval_T *argvars, typval_T *rettv, int raw) channel = get_channel_arg(&argvars[0], TRUE, TRUE, part); if (channel != NULL) { - if (part < 0) + if (part == PART_COUNT) part = channel_part_read(channel); mode = channel_get_mode(channel, part); timeout = channel_get_timeout(channel, part); @@ -3382,10 +3391,10 @@ theend: * Returns NULL when the socket isn't found. */ channel_T * -channel_fd2channel(sock_T fd, int *partp) +channel_fd2channel(sock_T fd, ch_part_T *partp) { channel_T *channel; - int part; + ch_part_T part; if (fd != INVALID_FD) for (channel = first_channel; channel != NULL; @@ -3411,17 +3420,13 @@ channel_fd2channel(sock_T fd, int *partp) channel_handle_events(void) { channel_T *channel; - int part; + ch_part_T part; sock_T fd; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { - /* If we detected a read error don't try reading again. */ - if (channel->ch_to_be_closed) - continue; - /* check the socket and pipes */ - for (part = PART_SOCK; part <= PART_ERR; ++part) + for (part = PART_SOCK; part < PART_IN; ++part) { fd = channel->ch_part[part].ch_fd; if (fd != INVALID_FD) @@ -3431,7 +3436,8 @@ channel_handle_events(void) if (r == CW_READY) channel_read(channel, part, "channel_handle_events"); else if (r == CW_ERROR) - channel_close_on_error(channel, "channel_handle_events()"); + ch_close_part_on_error(channel, part, TRUE, + "channel_handle_events"); } } } @@ -3444,7 +3450,7 @@ channel_handle_events(void) * Return FAIL or OK. */ int -channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun) +channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun) { int res; sock_T fd; @@ -3496,7 +3502,7 @@ channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun) * Sets "part_read" to the read fd. * Otherwise returns NULL. */ - channel_T * + static channel_T * send_common( typval_T *argvars, char_u *text, @@ -3504,10 +3510,10 @@ send_common( int eval, jobopt_T *opt, char *fun, - int *part_read) + ch_part_T *part_read) { channel_T *channel; - int part_send; + ch_part_T part_send; clear_job_options(opt); channel = get_channel_arg(&argvars[0], TRUE, FALSE, 0); @@ -3550,8 +3556,8 @@ ch_expr_common(typval_T *argvars, typval_T *rettv, int eval) channel_T *channel; int id; ch_mode_T ch_mode; - int part_send; - int part_read; + ch_part_T part_send; + ch_part_T part_read; jobopt_T opt; int timeout; @@ -3610,7 +3616,7 @@ ch_raw_common(typval_T *argvars, typval_T *rettv, int eval) char_u buf[NUMBUFLEN]; char_u *text; channel_T *channel; - int part_read; + ch_part_T part_read; jobopt_T opt; int timeout; @@ -3644,7 +3650,7 @@ channel_poll_setup(int nfd_in, void *fds_in) int nfd = nfd_in; channel_T *channel; struct pollfd *fds = fds_in; - int part; + ch_part_T part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { @@ -3678,7 +3684,7 @@ channel_poll_check(int ret_in, void *fds_in) int ret = ret_in; channel_T *channel; struct pollfd *fds = fds_in; - int part; + ch_part_T part; int idx; chanpart_T *in_part; @@ -3725,7 +3731,7 @@ channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in) channel_T *channel; fd_set *rfds = rfds_in; fd_set *wfds = wfds_in; - int part; + ch_part_T part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) { @@ -3757,7 +3763,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in) channel_T *channel; fd_set *rfds = rfds_in; fd_set *wfds = wfds_in; - int part; + ch_part_T part; chanpart_T *in_part; for (channel = first_channel; channel != NULL; channel = channel->ch_next) @@ -3803,7 +3809,7 @@ channel_parse_messages(void) channel_T *channel = first_channel; int ret = FALSE; int r; - int part = PART_SOCK; + ch_part_T part = PART_SOCK; ++safe_to_invoke_callback; @@ -3816,9 +3822,9 @@ channel_parse_messages(void) } while (channel != NULL) { - if (channel->ch_to_be_closed) + if (channel->ch_to_be_closed == 0) { - channel->ch_to_be_closed = FALSE; + channel->ch_to_be_closed = (1 << PART_COUNT); channel_close_now(channel); /* channel may have been freed, start over */ channel = first_channel; @@ -3840,7 +3846,7 @@ channel_parse_messages(void) continue; } if (channel->ch_part[part].ch_fd != INVALID_FD - || channel_has_readahead(channel, part)) + || channel_has_readahead(channel, part)) { /* Increase the refcount, in case the handler causes the channel * to be unreferenced or closed. */ @@ -3899,7 +3905,7 @@ set_ref_in_channel(int copyID) /* * Return the "part" to write to for "channel". */ - int + ch_part_T channel_part_send(channel_T *channel) { if (channel->CH_SOCK_FD == INVALID_FD) @@ -3910,7 +3916,7 @@ channel_part_send(channel_T *channel) /* * Return the default "part" to read from for "channel". */ - int + ch_part_T channel_part_read(channel_T *channel) { if (channel->CH_SOCK_FD == INVALID_FD) @@ -3923,7 +3929,7 @@ channel_part_read(channel_T *channel) * If "channel" is invalid returns MODE_JSON. */ ch_mode_T -channel_get_mode(channel_T *channel, int part) +channel_get_mode(channel_T *channel, ch_part_T part) { if (channel == NULL) return MODE_JSON; @@ -3934,7 +3940,7 @@ channel_get_mode(channel_T *channel, int part) * Return the timeout of "channel"/"part" */ int -channel_get_timeout(channel_T *channel, int part) +channel_get_timeout(channel_T *channel, ch_part_T part) { return channel->ch_part[part].ch_timeout; } @@ -3962,7 +3968,7 @@ handle_mode(typval_T *item, jobopt_T *opt, ch_mode_T *modep, int jo) } static int -handle_io(typval_T *item, int part, jobopt_T *opt) +handle_io(typval_T *item, ch_part_T part, jobopt_T *opt) { char_u *val = get_tv_string(item); @@ -4045,7 +4051,7 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported) dict_T *dict; int todo; hashitem_T *hi; - int part; + ch_part_T part; opt->jo_set = 0; if (tv->v_type == VAR_UNKNOWN) @@ -4343,10 +4349,10 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported) * Returns NULL if the handle is invalid. * When "check_open" is TRUE check that the channel can be used. * When "reading" is TRUE "check_open" considers typeahead useful. - * "part" is used to check typeahead, when -1 use the default part. + * "part" is used to check typeahead, when PART_COUNT use the default part. */ channel_T * -get_channel_arg(typval_T *tv, int check_open, int reading, int part) +get_channel_arg(typval_T *tv, int check_open, int reading, ch_part_T part) { channel_T *channel = NULL; int has_readahead = FALSE; @@ -4367,7 +4373,7 @@ get_channel_arg(typval_T *tv, int check_open, int reading, int part) } if (channel != NULL && reading) has_readahead = channel_has_readahead(channel, - part >= 0 ? part : channel_part_read(channel)); + part != PART_COUNT ? part : channel_part_read(channel)); if (check_open && (channel == NULL || (!channel_is_open(channel) && !(reading && has_readahead)))) @@ -4659,7 +4665,7 @@ job_start(typval_T *argvars) garray_T ga; #endif jobopt_T opt; - int part; + ch_part_T part; job = job_alloc(); if (job == NULL) @@ -4679,7 +4685,7 @@ job_start(typval_T *argvars) goto theend; /* Check that when io is "file" that there is a file name. */ - for (part = PART_OUT; part <= PART_IN; ++part) + for (part = PART_OUT; part < PART_COUNT; ++part) if ((opt.jo_set & (JO_OUT_IO << (part - PART_OUT))) && opt.jo_io[part] == JIO_FILE && (!(opt.jo_set & (JO_OUT_NAME << (part - PART_OUT))) diff --git a/src/eval.c b/src/eval.c index 3b5abe9c3..18eb87936 100644 --- a/src/eval.c +++ b/src/eval.c @@ -5622,7 +5622,7 @@ set_ref_in_item( else if (tv->v_type == VAR_CHANNEL) { channel_T *ch =tv->vval.v_channel; - int part; + ch_part_T part; typval_T dtv; jsonq_T *jq; cbq_T *cq; @@ -5630,7 +5630,7 @@ set_ref_in_item( if (ch != NULL && ch->ch_copyID != copyID) { ch->ch_copyID = copyID; - for (part = PART_SOCK; part <= PART_IN; ++part) + for (part = PART_SOCK; part < PART_COUNT; ++part) { for (jq = ch->ch_part[part].ch_json_head.jq_next; jq != NULL; jq = jq->jq_next) diff --git a/src/proto/channel.pro b/src/proto/channel.pro index 53c5dab94..6a52d8b2b 100644 --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -14,15 +14,15 @@ channel_T *channel_open_func(typval_T *argvars); void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err); void channel_set_job(channel_T *channel, job_T *job, jobopt_T *options); void channel_set_options(channel_T *channel, jobopt_T *opt); -void channel_set_req_callback(channel_T *channel, int part, char_u *callback, partial_T *partial, int id); +void channel_set_req_callback(channel_T *channel, ch_part_T part, char_u *callback, partial_T *partial, int id); void channel_buffer_free(buf_T *buf); void channel_write_any_lines(void); void channel_write_new_lines(buf_T *buf); -readq_T *channel_peek(channel_T *channel, int part); +readq_T *channel_peek(channel_T *channel, ch_part_T part); char_u *channel_first_nl(readq_T *node); -char_u *channel_get(channel_T *channel, int part); -void channel_consume(channel_T *channel, int part, int len); -int channel_collapse(channel_T *channel, int part, int want_nl); +char_u *channel_get(channel_T *channel, ch_part_T part); +void channel_consume(channel_T *channel, ch_part_T part, int len); +int channel_collapse(channel_T *channel, ch_part_T part, int want_nl); int channel_can_write_to(channel_T *channel); int channel_is_open(channel_T *channel); char *channel_status(channel_T *channel, int req_part); @@ -31,13 +31,12 @@ void channel_close(channel_T *channel, int invoke_close_cb); void channel_close_in(channel_T *channel); void channel_clear(channel_T *channel); void channel_free_all(void); -char_u *channel_read_block(channel_T *channel, int part, int timeout); -int channel_read_json_block(channel_T *channel, int part, int timeout_arg, int id, typval_T **rettv); +char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout); +int channel_read_json_block(channel_T *channel, ch_part_T part, int timeout_arg, int id, typval_T **rettv); void common_channel_read(typval_T *argvars, typval_T *rettv, int raw); -channel_T *channel_fd2channel(sock_T fd, int *partp); +channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp); void channel_handle_events(void); -int channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun); -channel_T *send_common(typval_T *argvars, char_u *text, int id, int eval, jobopt_T *opt, char *fun, int *part_read); +int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun); void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval); void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval); int channel_poll_setup(int nfd_in, void *fds_in); @@ -46,14 +45,14 @@ int channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in); int channel_select_check(int ret_in, void *rfds_in, void *wfds_in); int channel_parse_messages(void); int set_ref_in_channel(int copyID); -int channel_part_send(channel_T *channel); -int channel_part_read(channel_T *channel); -ch_mode_T channel_get_mode(channel_T *channel, int part); -int channel_get_timeout(channel_T *channel, int part); +ch_part_T channel_part_send(channel_T *channel); +ch_part_T channel_part_read(channel_T *channel); +ch_mode_T channel_get_mode(channel_T *channel, ch_part_T part); +int channel_get_timeout(channel_T *channel, ch_part_T part); void clear_job_options(jobopt_T *opt); void free_job_options(jobopt_T *opt); int get_job_options(typval_T *tv, jobopt_T *opt, int supported); -channel_T *get_channel_arg(typval_T *tv, int check_open, int reading, int part); +channel_T *get_channel_arg(typval_T *tv, int check_open, int reading, ch_part_T part); void job_free_all(void); int set_ref_in_job(int copyID); void job_unref(job_T *job); diff --git a/src/structs.h b/src/structs.h index 2a4284ac5..7a4d7fbe4 100644 --- a/src/structs.h +++ b/src/structs.h @@ -1499,19 +1499,21 @@ typedef enum { /* Ordering matters, it is used in for loops: IN is last, only SOCK/OUT/ERR * are polled. */ -#define PART_SOCK 0 +typedef enum { + PART_SOCK = 0, #define CH_SOCK_FD ch_part[PART_SOCK].ch_fd - #ifdef FEAT_JOB_CHANNEL -# define INVALID_FD (-1) - -# define PART_OUT 1 -# define PART_ERR 2 -# define PART_IN 3 + PART_OUT, # define CH_OUT_FD ch_part[PART_OUT].ch_fd + PART_ERR, # define CH_ERR_FD ch_part[PART_ERR].ch_fd + PART_IN, # define CH_IN_FD ch_part[PART_IN].ch_fd #endif + PART_COUNT +} ch_part_T; + +#define INVALID_FD (-1) /* The per-fd info for a channel. */ typedef struct { @@ -1566,14 +1568,14 @@ struct channel_S { int ch_id; /* ID of the channel */ int ch_last_msg_id; /* ID of the last message */ - chanpart_T ch_part[4]; /* info for socket, out, err and in */ + chanpart_T ch_part[PART_COUNT]; /* info for socket, out, err and in */ char *ch_hostname; /* only for socket, allocated */ int ch_port; /* only for socket */ - int ch_to_be_closed; /* When TRUE reading or writing failed and - * the channel must be closed when it's safe - * to invoke callbacks. */ + int ch_to_be_closed; /* bitset of readable fds to be closed. + * When all readable fds have been closed, + * set to (1 << PART_COUNT). */ int ch_to_be_freed; /* When TRUE channel must be freed when it's * safe to invoke callbacks. */ int ch_error; /* When TRUE an error was reported. Avoids diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim index 0756dd51c..fbcd496e9 100644 --- a/src/testdir/test_channel.vim +++ b/src/testdir/test_channel.vim @@ -1505,6 +1505,23 @@ func Test_read_nonl_line() call assert_equal(3, g:linecount) endfunc +func Test_read_from_terminated_job() + if !has('job') + return + endif + + let g:linecount = 0 + if has('win32') + " workaround: 'shellescape' does improper escaping double quotes + let arg = 'import os,sys;os.close(1);sys.stderr.write(\"test\n\")' + else + let arg = 'import os,sys;os.close(1);sys.stderr.write("test\n")' + endif + call job_start([s:python, '-c', arg], {'callback': 'MyLineCountCb'}) + call WaitFor('1 <= g:linecount') + call assert_equal(1, g:linecount) +endfunc + function Ch_test_close_lambda(port) let handle = ch_open('localhost:' . a:port, s:chopt) if ch_status(handle) == "fail" diff --git a/src/version.c b/src/version.c index da7263116..808afb1b1 100644 --- a/src/version.c +++ b/src/version.c @@ -765,6 +765,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 27, +/**/ 26, /**/ 25, |