[GitHub] qpid-dispatch pull request #194: DISPATCH-825: Fix interlocking between mess...

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] qpid-dispatch pull request #194: DISPATCH-825: Fix interlocking between mess...

jdanekrh
GitHub user ChugR opened a pull request:

    https://github.com/apache/qpid-dispatch/pull/194

    DISPATCH-825: Fix interlocking between message send and receive

    Don't allow a buffer on the message buffer chain that might be removed.
    Use a content-based pending buffer instead.
    Don't release message lock between adding final buffer and setting
    receive_complete flag.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ChugR/qpid-dispatch DISPATCH-825

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/qpid-dispatch/pull/194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #194
   
----
commit b6e10668a59fc3953698ce1fcc27f45b307276c3
Author: Chuck Rolke <[hidden email]>
Date:   2017-09-13T15:49:30Z

    DISPATCH-825: Fix interlocking between message send and receive
   
    Don't allow a buffer on the message buffer chain that might be removed.
    Use a content-based pending buffer instead.
    Don't release message lock between adding final buffer and setting
    receive_complete flag.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

[GitHub] qpid-dispatch pull request #194: DISPATCH-825: Fix interlocking between mess...

jdanekrh
Github user ChugR commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/194#discussion_r138710888
 
    --- Diff: src/message.c ---
    @@ -1109,95 +1146,97 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         }
     
         //
    -    // The discard flag indicates if we should continue receiving the message.
    -    // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the
    -    // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers
    -    //
    -    bool discard = qd_message_is_discard((qd_message_t*)msg);
    -
    -    //
    -    // Get a reference to the tail buffer on the message.  This is the buffer into which
    -    // we will store incoming message data.  If there is no buffer in the message, this is the
    -    // first time we are here and we need to allocate an empty one and add it to the message.
    +    // The discard flag indicates we should keep reading the input stream
    +    // but not process the message for delivery.
         //
    -    if (!discard) {
    -        buf = DEQ_TAIL(msg->content->buffers);
    -        if (!buf) {
    -            buf = qd_buffer();
    -            DEQ_INSERT_TAIL(msg->content->buffers, buf);
    -        }
    +    if (qd_message_is_discard((qd_message_t*)msg)) {
    +        return discard_receive(delivery, link, (qd_message_t *)msg);
         }
     
    +
    +    // Loop until msg is complete, error seen, or incoming bytes are consumed
    +    bool recv_error = false;
         while (1) {
    -        if (discard) {
    -            char dummy[BUFFER_SIZE];
    -            rc = pn_link_recv(link, dummy, BUFFER_SIZE);
    -        }
    -        else {
    -            //
    -            // Try to receive enough data to fill the remaining space in the tail buffer.
    -            //
    +        //
    +        // handle EOS and clean up after pn receive errors
    +        //
    +        bool at_eos = (pn_delivery_partial(delivery) == false) &&
    +                      (pn_delivery_pending(delivery) == 0);
    +
    +        if (at_eos || recv_error) {
    +            // Message is complete
    +            sys_mutex_lock(msg->content->lock);
    +            {
    +                // Append last buffer if any with data
    +                if (msg->content->pending) {
    +                    if (qd_buffer_size(msg->content->pending) > 0) {
    +                        // pending buffer has bytes that are port of message
    +                        DEQ_INSERT_TAIL(msg->content->buffers,
    +                                        msg->content->pending);
    +                    } else {
    +                        // pending buffer is empty
    +                        qd_buffer_free(msg->content->pending);
    +                    }
    +                    msg->content->pending = 0;
    +                } else {
    +                    // pending buffer is absent
    +                }
    +
    +                msg->content->receive_complete = true;
     
    -            rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
    +                // unlink message and delivery
    +                pn_record_set(record, PN_DELIVERY_CTX, 0);
    +            }
    +            sys_mutex_unlock(msg->content->lock);
    +            return (qd_message_t*) msg;
             }
     
             //
    -        // If we receive PN_EOS, we have come to the end of the message.
    +        // Handle a missing or full pending buffer
             //
    -        if (rc == PN_EOS) {
    -            //
    -            // We have received the entire message since rc == PN_EOS, set the receive_complete flag to true
    -            //
    -            msg->content->receive_complete = true;
    -
    -            //
    -            // Clear the value in the record with key PN_DELIVERY_CTX
    -            //
    -            pn_record_set(record, PN_DELIVERY_CTX, 0);
    -
    -            //
    -            // If the last buffer in the list is empty, remove it and free it.  This
    -            // will only happen if the size of the message content is an exact multiple
    -            // of the buffer size.
    -            //
    -            if (buf && qd_buffer_size(buf) == 0) {
    +        if (!msg->content->pending) {
    +            // Pending buffer is absent: get a new one
    +            msg->content->pending = qd_buffer();
    +        } else {
    +            // Pending buffer exists
    +            if (qd_buffer_capacity(msg->content->pending) == 0) {
    +                // Pending buffer is full
                     sys_mutex_lock(msg->content->lock);
    -                DEQ_REMOVE_TAIL(msg->content->buffers);
    +                DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
                     sys_mutex_unlock(msg->content->lock);
    -                qd_buffer_free(buf);
    +                msg->content->pending = qd_buffer();
    +            } else {
    +                // Pending buffer still has capacity
                 }
    -
    -            return (qd_message_t*) msg;
             }
     
    -        if (rc > 0) {
    -            if (discard)
    -                continue;
    +        //
    +        // Try to fill the remaining space in the pending buffer.
    +        //
    +        rc = pn_link_recv(link,
    +                          (char*) qd_buffer_cursor(msg->content->pending),
    +                          qd_buffer_capacity(msg->content->pending));
    +
    +        assert (rc != PN_EOS); // Just checked for this moments ago
    --- End diff --
   
    This assert is a mistake. Between sensing it a moment ago and now it may have become true.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]