[1/2] qpid-dispatch git commit: DISPATCH-825: Fix interlocking between message send and receive

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/2] qpid-dispatch git commit: DISPATCH-825: Fix interlocking between message send and receive

chug
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 484fc63b0 -> fa2a4632d


DISPATCH-825: Fix interlocking between message send and receive

Don't allow a buffer on the message buffer chain that might be removed.
Use a content-based pending buffer instead.
Don't release message lock between adding final buffer and setting
receive_complete flag.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b6e10668
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b6e10668
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b6e10668

Branch: refs/heads/master
Commit: b6e10668a59fc3953698ce1fcc27f45b307276c3
Parents: 484fc63
Author: Chuck Rolke <[hidden email]>
Authored: Wed Sep 13 11:49:30 2017 -0400
Committer: Chuck Rolke <[hidden email]>
Committed: Wed Sep 13 11:49:30 2017 -0400

----------------------------------------------------------------------
 src/message.c         | 182 +++++++++++++++++++++++++++------------------
 src/message_private.h |   1 +
 2 files changed, 112 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6e10668/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 0788194..97cb382 100644
--- a/src/message.c
+++ b/src/message.c
@@ -894,6 +894,9 @@ void qd_message_free(qd_message_t *in_msg)
             buf = DEQ_HEAD(content->buffers);
         }
 
+        if (content->pending)
+            qd_buffer_free(content->pending);
+
         sys_mutex_free(content->lock);
         free_qd_message_content_t(content);
     }
@@ -1085,19 +1088,53 @@ qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg)
     return msg->cursor;
 }
 
+
+/**
+ * Receive and discard large messages for which there is no destination.
+ * Don't waste resources by putting the message into internal buffers.
+ * Don't fiddle with locking as no sender is competing with reception.
+ */
+qd_message_t *discard_receive(pn_delivery_t *delivery,
+                              pn_link_t     *link,
+                              qd_message_t  *msg_in)
+{
+    qd_message_pvt_t *msg  = (qd_message_pvt_t*)msg_in;
+
+    while (1) {
+        char dummy[BUFFER_SIZE];
+        ssize_t rc = pn_link_recv(link, dummy, BUFFER_SIZE);
+
+        if (rc == 0) {
+            // have read all available pn_link incoming bytes
+            break;
+        } else if (rc == PN_EOS || rc < 0) {
+            // end of message or error. Call the message complete
+            msg->content->receive_complete = true;
+
+            pn_record_t *record = pn_delivery_attachments(delivery);
+            pn_record_set(record, PN_DELIVERY_CTX, 0);
+            break;
+        } else {
+            // rc was > 0. bytes were read and discarded.
+        }
+    }
+
+    return msg_in;
+}
+
+
 qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
     pn_link_t        *link = pn_delivery_link(delivery);
     ssize_t           rc;
-    qd_buffer_t      *buf = 0;
 
     pn_record_t *record    = pn_delivery_attachments(delivery);
     qd_message_pvt_t *msg  = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
 
     //
-    // If there is no message associated with the delivery, this is the first time
-    // we've received anything on this delivery.  Allocate a message descriptor and
-    // link it and the delivery together.
+    // If there is no message associated with the delivery then this is the
+    // first time we've received anything on this delivery.
+    // Allocate a message descriptor and link it and the delivery together.
     //
     if (!msg) {
         msg = (qd_message_pvt_t*) qd_message();
@@ -1109,88 +1146,89 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     }
 
     //
-    // The discard flag indicates if we should continue receiving the message.
-    // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the
-    // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers
-    //
-    bool discard = qd_message_is_discard((qd_message_t*)msg);
-
-    //
-    // Get a reference to the tail buffer on the message.  This is the buffer into which
-    // we will store incoming message data.  If there is no buffer in the message, this is the
-    // first time we are here and we need to allocate an empty one and add it to the message.
+    // The discard flag indicates we should keep reading the input stream
+    // but not process the message for delivery.
     //
-    if (!discard) {
-        buf = DEQ_TAIL(msg->content->buffers);
-        if (!buf) {
-            buf = qd_buffer();
-            DEQ_INSERT_TAIL(msg->content->buffers, buf);
-        }
+    if (qd_message_is_discard((qd_message_t*)msg)) {
+        return discard_receive(delivery, link, (qd_message_t *)msg);
     }
 
+
+    // Loop until msg is complete, error seen, or incoming bytes are consumed
+    bool recv_error = false;
     while (1) {
-        if (discard) {
-            char dummy[BUFFER_SIZE];
-            rc = pn_link_recv(link, dummy, BUFFER_SIZE);
-        }
-        else {
-            //
-            // Try to receive enough data to fill the remaining space in the tail buffer.
-            //
+        //
+        // handle EOS and clean up after pn receive errors
+        //
+        bool at_eos = (pn_delivery_partial(delivery) == false) &&
+                      (pn_delivery_pending(delivery) == 0);
+
+        if (at_eos || recv_error) {
+            // Message is complete
+            sys_mutex_lock(msg->content->lock);
+            {
+                // Append last buffer if any with data
+                if (msg->content->pending) {
+                    if (qd_buffer_size(msg->content->pending) > 0) {
+                        // pending buffer has bytes that are port of message
+                        DEQ_INSERT_TAIL(msg->content->buffers,
+                                        msg->content->pending);
+                    } else {
+                        // pending buffer is empty
+                        qd_buffer_free(msg->content->pending);
+                    }
+                    msg->content->pending = 0;
+                } else {
+                    // pending buffer is absent
+                }
+
+                msg->content->receive_complete = true;
 
-            rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
+                // unlink message and delivery
+                pn_record_set(record, PN_DELIVERY_CTX, 0);
+            }
+            sys_mutex_unlock(msg->content->lock);
+            return (qd_message_t*) msg;
         }
 
         //
-        // If we receive PN_EOS, we have come to the end of the message.
+        // Handle a missing or full pending buffer
         //
-        if (rc == PN_EOS) {
-            //
-            // We have received the entire message since rc == PN_EOS, set the receive_complete flag to true
-            //
-            msg->content->receive_complete = true;
-
-            //
-            // Clear the value in the record with key PN_DELIVERY_CTX
-            //
-            pn_record_set(record, PN_DELIVERY_CTX, 0);
-
-            //
-            // If the last buffer in the list is empty, remove it and free it.  This
-            // will only happen if the size of the message content is an exact multiple
-            // of the buffer size.
-            //
-            if (buf && qd_buffer_size(buf) == 0) {
+        if (!msg->content->pending) {
+            // Pending buffer is absent: get a new one
+            msg->content->pending = qd_buffer();
+        } else {
+            // Pending buffer exists
+            if (qd_buffer_capacity(msg->content->pending) == 0) {
+                // Pending buffer is full
                 sys_mutex_lock(msg->content->lock);
-                DEQ_REMOVE_TAIL(msg->content->buffers);
+                DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
                 sys_mutex_unlock(msg->content->lock);
-                qd_buffer_free(buf);
+                msg->content->pending = qd_buffer();
+            } else {
+                // Pending buffer still has capacity
             }
-
-            return (qd_message_t*) msg;
         }
 
-        if (rc > 0) {
-            if (discard)
-                continue;
+        //
+        // Try to fill the remaining space in the pending buffer.
+        //
+        rc = pn_link_recv(link,
+                          (char*) qd_buffer_cursor(msg->content->pending),
+                          qd_buffer_capacity(msg->content->pending));
+
+        assert (rc != PN_EOS); // Just checked for this moments ago
+
+        if (rc < 0) {
+            // error seen. next pass breaks out of loop
+            recv_error = true;
+        } else if (rc > 0) {
             //
             // We have received a positive number of bytes for the message.  Advance
             // the cursor in the buffer.
             //
-            qd_buffer_insert(buf, rc);
-
-            //
-            // If the buffer is full, allocate a new empty buffer and append it to the
-            // tail of the message's list.
-            //
-            sys_mutex_lock(msg->content->lock);
-            if (qd_buffer_capacity(buf) == 0) {
-                buf = qd_buffer();
-                DEQ_INSERT_TAIL(msg->content->buffers, buf);
-            }
-            sys_mutex_unlock(msg->content->lock);
-
-        } else
+            qd_buffer_insert(msg->content->pending, rc);
+        } else {
             //
             // We received zero bytes, and no PN_EOS.  This means that we've received
             // all of the data available up to this point, but it does not constitute
@@ -1198,6 +1236,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
             // Return the message so that the caller can start sending out whatever we have received so far
             //
             return (qd_message_t*) msg;
+        }
     }
 
     return 0;
@@ -1440,16 +1479,17 @@ void qd_message_send(qd_message_t *in_msg,
 
     buf = msg->cursor.buffer;
 
-    if (!buf)
-        return;
+    assert (buf);
 
     while (buf) {
         size_t buf_size = qd_buffer_size(buf);
 
         // This will send the remaining data in the buffer if any.
         int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf));
-        if (num_bytes_to_send > 0)
+        if (num_bytes_to_send > 0) {
             pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send);
+            // TODO: DISPATCH-819 check pn_link_send return value
+        }
 
         // If the entire message has already been received,  taking out this lock is not that expensive
         // because there is no contention for this lock.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6e10668/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 073ac7f..eab4ae0 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -68,6 +68,7 @@ typedef struct {
     sys_mutex_t         *lock;
     sys_atomic_t         ref_count;                       // The number of messages referencing this
     qd_buffer_list_t     buffers;                         // The buffer chain containing the message
+    qd_buffer_t         *pending;                         // Buffer owned by and filled by qd_message_receive
     qd_field_location_t  section_message_header;          // The message header list
     qd_field_location_t  section_delivery_annotation;     // The delivery annotation map
     qd_field_location_t  section_message_annotation;      // The message annotation map


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

[2/2] qpid-dispatch git commit: DISPATCH-825: get rid of poorly conceived EOS assert

chug
DISPATCH-825: get rid of poorly conceived EOS assert


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fa2a4632
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fa2a4632
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fa2a4632

Branch: refs/heads/master
Commit: fa2a4632db9804f574af4e4763677f11ae50c1af
Parents: b6e1066
Author: Chuck Rolke <[hidden email]>
Authored: Wed Sep 13 16:11:16 2017 -0400
Committer: Chuck Rolke <[hidden email]>
Committed: Wed Sep 13 16:11:16 2017 -0400

----------------------------------------------------------------------
 src/message.c | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa2a4632/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 97cb382..6074ebb 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1155,7 +1155,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 
 
     // Loop until msg is complete, error seen, or incoming bytes are consumed
-    bool recv_error = false;
+    bool recv_error_or_eos = false;
     while (1) {
         //
         // handle EOS and clean up after pn receive errors
@@ -1163,7 +1163,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         bool at_eos = (pn_delivery_partial(delivery) == false) &&
                       (pn_delivery_pending(delivery) == 0);
 
-        if (at_eos || recv_error) {
+        if (at_eos || recv_error_or_eos) {
             // Message is complete
             sys_mutex_lock(msg->content->lock);
             {
@@ -1217,11 +1217,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                           (char*) qd_buffer_cursor(msg->content->pending),
                           qd_buffer_capacity(msg->content->pending));
 
-        assert (rc != PN_EOS); // Just checked for this moments ago
-
         if (rc < 0) {
-            // error seen. next pass breaks out of loop
-            recv_error = true;
+            // error or eos seen. next pass breaks out of loop
+            recv_error_or_eos = true;
         } else if (rc > 0) {
             //
             // We have received a positive number of bytes for the message.  Advance


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]