qpid-broker-j git commit: QPID-7817: [WebSocket] Fix defect in buffer handling

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

qpid-broker-j git commit: QPID-7817: [WebSocket] Fix defect in buffer handling

kwall
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master e4598dcd6 -> 9713e49a0


 QPID-7817: [WebSocket] Fix defect in buffer handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/9713e49a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/9713e49a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/9713e49a

Branch: refs/heads/master
Commit: 9713e49a0b12fa4a544b2ff92222446fd74416e2
Parents: e4598dc
Author: Keith Wall <[hidden email]>
Authored: Fri Jun 16 07:38:33 2017 +0100
Committer: Keith Wall <[hidden email]>
Committed: Fri Jun 16 07:39:08 2017 +0100

----------------------------------------------------------------------
 .../transport/websocket/WebSocketProvider.java  | 48 +++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9713e49a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 82ee3eb..76b3fbd 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -363,6 +363,7 @@ class WebSocketProvider implements AcceptingTransport
         private volatile QpidByteBuffer _netInputBuffer;
         private volatile MultiVersionProtocolEngine _protocolEngine;
         private volatile ConnectionWrapper _connectionWrapper;
+        private volatile boolean _unexpectedByteBufferSizeReported;
 
         AmqpWebSocket()
         {
@@ -422,7 +423,7 @@ class WebSocketProvider implements AcceptingTransport
                         _netInputBuffer.flip();
                         _protocolEngine.received(_netInputBuffer);
                         _connectionWrapper.doWrite();
-                        _netInputBuffer.compact();
+                        restoreApplicationBufferForWrite();
                     }
                     while(remaining > 0);
 
@@ -439,6 +440,51 @@ class WebSocketProvider implements AcceptingTransport
             _idleTimeoutChecker.wakeup();
         }
 
+        private void restoreApplicationBufferForWrite()
+        {
+            QpidByteBuffer oldNetInputBuffer = _netInputBuffer;
+            int unprocessedDataLength = _netInputBuffer.remaining();
+
+            _netInputBuffer.limit(_netInputBuffer.capacity());
+            _netInputBuffer = oldNetInputBuffer.slice();
+            _netInputBuffer.limit(unprocessedDataLength);
+            oldNetInputBuffer.dispose();
+            if (_netInputBuffer.limit() != _netInputBuffer.capacity())
+            {
+                _netInputBuffer.position(_netInputBuffer.limit());
+                _netInputBuffer.limit(_netInputBuffer.capacity());
+            }
+            else
+            {
+                QpidByteBuffer currentBuffer = _netInputBuffer;
+                int newBufSize;
+
+                if (currentBuffer.capacity() < _broker.getNetworkBufferSize())
+                {
+                    newBufSize = _broker.getNetworkBufferSize();
+                }
+                else
+                {
+                    newBufSize = currentBuffer.capacity() + _broker.getNetworkBufferSize();
+                    reportUnexpectedByteBufferSizeUsage();
+                }
+
+                _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+                _netInputBuffer.put(currentBuffer);
+                currentBuffer.dispose();
+            }
+        }
+
+        private void reportUnexpectedByteBufferSizeUsage()
+        {
+            if (!_unexpectedByteBufferSizeReported)
+            {
+                LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
+                            _broker.getNetworkBufferSize(), this.toString());
+                _unexpectedByteBufferSizeReported = true;
+            }
+        }
+        
         /** AMQP frames MUST be sent as binary data payloads of WebSocket messages.*/
         @OnWebSocketMessage @SuppressWarnings("unused")
         public void onWebSocketText(Session sess, String text)


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...