[qpid-dispatch] branch master updated: DISPATCH-1893: fix race in qd_message_next_stream_data()

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[qpid-dispatch] branch master updated: DISPATCH-1893: fix race in qd_message_next_stream_data()

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

The following commit(s) were added to refs/heads/master by this push:
     new ef29646  DISPATCH-1893: fix race in qd_message_next_stream_data()
ef29646 is described below

commit ef2964653a844ec110214ec1c236f44d963fed01
Author: Kenneth Giusti <[hidden email]>
AuthorDate: Mon Jan 11 19:01:00 2021 -0500

    DISPATCH-1893: fix race in qd_message_next_stream_data()
    The race occurs when attempting to get the next stream_data entry from
    a message where no further stream_data is present but the message has
    not been completely received.
    The search for the next stream_data fails since the message body
    cursor is at the end of the content buffer list.  The lock is then
    dropped.  The "receive complete" flag is then checked to determine if
    the proper return code.  Depending on the receive complete flag the
    return code will indicate either the body parsing is complete
    (NO_MORE) or wait for more data (DATA_INCOMPLETE).
    The race ocurs when the input I/O thread takes control right after the
    qd_message_next_stream_data drops the lock and adds more body data and
    completes the recieve before the thread calling
    qd_message_next_stream_data checks the receive complete flag.
    This leads to qd_message_next_stream_data() to indicate NO_MORE while
    it has not completely consumed all stream data entries.  This results
    in a truncated message body.
    This closes #977
 src/message.c | 31 ++++++++++++++++++-------------
 1 file changed, 18 insertions(+), 13 deletions(-)

diff --git a/src/message.c b/src/message.c
index 7679857..3e86f54 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2340,7 +2340,7 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
- * find_last_buffer
+ * find_last_buffer_LH
  * Given a field location, find the following:
@@ -2352,7 +2352,7 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
  * buffer list, *buffer _must_ refer to the buffer that contains the last octet of the field and *cursor must
  * point at the octet following that octet, even if it points past the end of the buffer.
-static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
+static void find_last_buffer_LH(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
     qd_buffer_t *buf       = location->buffer;
     size_t       remaining = location->hdr_length + location->length;
@@ -2372,7 +2372,7 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs
-void trim_stream_data_headers(qd_message_stream_data_t *stream_data, bool remove_vbin_header)
+void trim_stream_data_headers_LH(qd_message_stream_data_t *stream_data, bool remove_vbin_header)
     const qd_field_location_t *location = &stream_data->section;
     qd_buffer_t               *buffer   = location->buffer;
@@ -2641,10 +2641,12 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg
     qd_field_location_t location;
-    qd_buffer_t * const old_body_buffer  = msg->body_buffer;
-    bool is_footer                       = false;
+    qd_buffer_t * const old_body_buffer    = msg->body_buffer;
+    bool is_footer                         = false;
+    qd_message_stream_data_result_t result = QD_MESSAGE_STREAM_DATA_NO_MORE;
     section_status = message_section_check_LH(&msg->body_buffer, &msg->body_cursor,
                                               BODY_DATA_SHORT, 3, TAGS_BINARY,
@@ -2656,21 +2658,21 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg
                                                   FOOTER_SHORT, 3, TAGS_MAP,
                                                   &location, true, false);
-    UNLOCK(content->lock);
     switch (section_status) {
+        break;
         stream_data = new_qd_message_stream_data_t();
         stream_data->owning_message = msg;
         stream_data->section        = location;
-        find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
+        find_last_buffer_LH(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
         stream_data->last_buffer = msg->body_buffer;
-        trim_stream_data_headers(stream_data, !is_footer);
+        trim_stream_data_headers_LH(stream_data, !is_footer);
         DEQ_INSERT_TAIL(msg->stream_data_list, stream_data);
         *out_stream_data = stream_data;
@@ -2682,16 +2684,19 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg
             if (old_body_buffer == DEQ_PREV(stream_data->section.buffer))
                 stream_data->free_prev = true;
+        break;
         if (msg->content->receive_complete)
-            return QD_MESSAGE_STREAM_DATA_NO_MORE;
+            result = QD_MESSAGE_STREAM_DATA_NO_MORE;
+        break;
+    UNLOCK(content->lock);
+    return result;

To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]