qpid-broker-j git commit: QPID-7817: [WebSocket] Change implementation to allow an AMQP frame to cross a web-socket frame boundary

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

qpid-broker-j git commit: QPID-7817: [WebSocket] Change implementation to allow an AMQP frame to cross a web-socket frame boundary

kwall
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master ce077c25c -> 19d6b838e


QPID-7817: [WebSocket] Change implementation to allow an AMQP frame to cross a web-socket frame boundary

Corrected the supporting protocol test too and removed the exclusion.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/19d6b838
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/19d6b838
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/19d6b838

Branch: refs/heads/master
Commit: 19d6b838e5e96bcbdf6816eacbfd085697041a99
Parents: ce077c2
Author: Keith Wall <[hidden email]>
Authored: Mon Jun 12 07:34:23 2017 +0100
Committer: Keith Wall <[hidden email]>
Committed: Tue Jun 13 15:50:13 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/type/UnsignedShort.java       |  5 +-
 .../transport/websocket/WebSocketProvider.java  | 69 ++++++++++++++------
 .../websocket/WebSocketFrameTransport.java      | 47 +++++++------
 .../extensions/websocket/WebSocketTest.java     | 21 +++---
 4 files changed, 88 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java
index c0b828f..439cd43 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java
@@ -28,7 +28,6 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh
     private final short _underlying;
     private static final UnsignedShort[] cachedValues = new UnsignedShort[256];
     public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) 0xffff);
-
     static
     {
         for(short i = 0; i < 256; i++)
@@ -37,6 +36,8 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh
         }
     }
 
+    public static final UnsignedShort ZERO = UnsignedShort.valueOf((short) 0);
+
     private UnsignedShort(short underlying)
     {
         _underlying = underlying;
@@ -56,7 +57,7 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh
     @Override
     public long longValue()
     {
-        return ((long) _underlying) & 0xFFFFl;
+        return ((long) _underlying) & 0xFFFFL;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 7784dac..ba66d1e 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -50,13 +50,15 @@ import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.util.annotation.Name;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.ThreadPool;
 import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
 import org.eclipse.jetty.websocket.server.WebSocketHandler;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
@@ -88,6 +90,7 @@ class WebSocketProvider implements AcceptingTransport
     private final Transport _transport;
     private final SSLContext _sslContext;
     private final AmqpPort<?> _port;
+    private final Broker<?> _broker;
     private final Set<Protocol> _supported;
     private final Protocol _defaultSupportedProtocolReply;
     private final MultiVersionProtocolEngineFactory _factory;
@@ -108,11 +111,12 @@ class WebSocketProvider implements AcceptingTransport
         _transport = transport;
         _sslContext = sslContext;
         _port = port;
+        _broker = ((Broker<?>) port.getParent());
         _supported = supported;
         _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
 
         _factory = new MultiVersionProtocolEngineFactory(
-                        (Broker<?>) _port.getParent(),
+                        _broker,
                         _supported,
                         _defaultSupportedProtocolReply,
                         _port,
@@ -353,16 +357,21 @@ class WebSocketProvider implements AcceptingTransport
                 ((ServerConnector) server.getConnectors()[0]).getLocalPort();
     }
 
-    public class AmqpWebSocket extends WebSocketAdapter
+    @WebSocket
+    public class AmqpWebSocket
     {
+        private volatile QpidByteBuffer _netInputBuffer;
         private volatile MultiVersionProtocolEngine _protocolEngine;
         private volatile ConnectionWrapper _connectionWrapper;
 
-        @Override
-        public void onWebSocketConnect(final Session session)
+        AmqpWebSocket()
         {
-            super.onWebSocketConnect(session);
+            _netInputBuffer = QpidByteBuffer.allocateDirect(_broker.getNetworkBufferSize());
+        }
 
+        @OnWebSocketConnect @SuppressWarnings("unused")
+        public void onWebSocketConnect(final Session session)
+        {
             SocketAddress localAddress = session.getLocalAddress();
             SocketAddress remoteAddress = session.getRemoteAddress();
             _protocolEngine = _factory.newProtocolEngine(remoteAddress);
@@ -386,12 +395,11 @@ class WebSocketProvider implements AcceptingTransport
 
         }
 
-        @Override
-        public void onWebSocketBinary(final byte[] data, final int offset, final int length)
+        @OnWebSocketMessage @SuppressWarnings("unused")
+        public void onWebSocketBinary(Session sess, final byte[] payload, int offset, final int len)
         {
             synchronized (_connectionWrapper)
             {
-
                 _protocolEngine.clearWork();
                 try
                 {
@@ -402,13 +410,26 @@ class WebSocketProvider implements AcceptingTransport
                         iter.next().run();
                     }
 
-                    for (QpidByteBuffer qpidByteBuffer : QpidByteBuffer.asQpidByteBuffers(data, offset, length))
+                    int lastRead;
+                    int remaining = len;
+                    do
                     {
-                        _protocolEngine.received(qpidByteBuffer);
-                        qpidByteBuffer.dispose();
+                        int chunkLen = Math.min(remaining, _netInputBuffer.remaining());
+                        _netInputBuffer.put(payload, offset, chunkLen);
+                        remaining =- chunkLen;
+                        offset =+ chunkLen;
+
+                        _netInputBuffer.flip();
+                        _protocolEngine.received(_netInputBuffer);
+                        _connectionWrapper.doWrite();
+                        _netInputBuffer.compact();
                     }
+                    while(remaining > 0);
 
-                    _connectionWrapper.doWrite();
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Read {} byte(s)", len);
+                    }
                 }
                 finally
                 {
@@ -416,17 +437,17 @@ class WebSocketProvider implements AcceptingTransport
                 }
             }
             _idleTimeoutChecker.wakeup();
-
         }
 
-        @Override
-        public void onWebSocketError(final Throwable cause)
+        /** AMQP frames MUST be sent as binary data payloads of WebSocket messages.*/
+        @OnWebSocketMessage @SuppressWarnings("unused")
+        public void onWebSocketText(Session sess, String text)
         {
-            super.onWebSocketError(cause);
-            LOGGER.error("onWebSocketError", cause);
+            LOGGER.info("Unexpected websocket text message received, closing connection");
+            sess.close();
         }
 
-        @Override
+        @OnWebSocketClose @SuppressWarnings("unused")
         public void onWebSocketClose(final int statusCode, final String reason)
         {
             if (_protocolEngine != null)
@@ -435,6 +456,7 @@ class WebSocketProvider implements AcceptingTransport
             }
             _activeConnections.remove(_connectionWrapper);
             _idleTimeoutChecker.wakeup();
+            _netInputBuffer.dispose();
         }
     }
 
@@ -486,7 +508,6 @@ class WebSocketProvider implements AcceptingTransport
         @Override
         public void start()
         {
-
         }
 
         @Override
@@ -605,6 +626,8 @@ class WebSocketProvider implements AcceptingTransport
             QpidByteBuffer buf;
             while((buf = _buffers.poll())!= null)
             {
+                // TODO: For efficiency perhaps only coalesce sequential small buffers and let large buffers
+                // go alone in a binary message.  This would likely avoid the memory copies of large transfer payloads
                 size += buf.remaining();
                 toBeWritten.add(buf);
             }
@@ -624,6 +647,10 @@ class WebSocketProvider implements AcceptingTransport
                 try
                 {
                     _connection.getRemote().sendBytes(ByteBuffer.wrap(data));
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Written {} byte(s)", data.length);
+                    }
                 }
                 catch (IOException e)
                 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
index b5ccd08..00150ba 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
@@ -53,27 +53,24 @@ public class WebSocketFrameTransport extends FrameTransport
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketFrameTransport.class);
 
-    private WebSocketFramingOutputHandler _webSocketFramingOutputHandler;
-    private WebSocketDeframingInputHandler _webSocketDeframingInputHandler;
-    private WebSocketClientHandler _webSocketClientHandler;
+    private final WebSocketFramingOutputHandler _webSocketFramingOutputHandler = new WebSocketFramingOutputHandler();
+    private final WebSocketDeframingInputHandler _webSocketDeframingInputHandler = new WebSocketDeframingInputHandler();
+    private final WebSocketClientHandler _webSocketClientHandler;
 
     public WebSocketFrameTransport(final InetSocketAddress addr)
     {
         super(addr);
-    }
-
-    @Override
-    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
-    {
         URI uri = URI.create(String.format("tcp://%s:%d/",
                                            getBrokerAddress().getHostString(),
                                            getBrokerAddress().getPort()));
         _webSocketClientHandler = new WebSocketClientHandler(
                 WebSocketClientHandshakerFactory.newHandshaker(
                         uri, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()), uri);
-        _webSocketFramingOutputHandler = new WebSocketFramingOutputHandler();
-        _webSocketDeframingInputHandler = new WebSocketDeframingInputHandler();
+    }
 
+    @Override
+    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+    {
         pipeline.addLast(new HttpClientCodec());
         pipeline.addLast(new HttpObjectAggregator(65536));
         pipeline.addLast(_webSocketClientHandler);
@@ -105,26 +102,36 @@ public class WebSocketFrameTransport extends FrameTransport
         {
             if (msg instanceof ByteBuf)
             {
-                final ByteBuf buf = ((ByteBuf) msg);
+                final ByteBuf buf = ((ByteBuf) msg).retain();
+
                 if (_splitFrames)
                 {
-                    buf.forEachByte(b ->
-                                    {
-                                        ByteBuf byteBuf = Unpooled.copiedBuffer(new byte[] {b});
-                                        BinaryWebSocketFrame frame = new BinaryWebSocketFrame(byteBuf);
-                                        ctx.write(frame, promise);
-                                        return false;
-                                    });
+                    while(buf.isReadable())
+                    {
+
+                        byte b = buf.readByte();
+                        BinaryWebSocketFrame frame = new BinaryWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {b}));
+                        if (buf.isReadable())
+                        {
+                            ctx.writeAndFlush(frame);
+                        }
+                        else
+                        {
+                            ctx.writeAndFlush(frame, promise);
+                        }
+                    }
+
+                    buf.release();
                 }
                 else
                 {
                     BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg);
-                    ctx.write(frame, promise);
+                    ctx.writeAndFlush(frame, promise);
                 }
             }
             else
             {
-                ctx.write(msg, promise);
+                ctx.writeAndFlush(msg, promise);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/19d6b838/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index c0d1a52..f6d952f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -32,7 +32,6 @@ import static org.junit.Assert.assertArrayEquals;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -46,6 +45,9 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 
 public class WebSocketTest extends ProtocolTestBase
 {
+
+    public static final byte[] AMQP_HEADER = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+
     @Test
     @SpecificationTest(section = "2.1", description = "Opening a WebSocket Connection")
     public void protocolHeader() throws Exception
@@ -53,29 +55,26 @@ public class WebSocketTest extends ProtocolTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
         {
-            byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
-            transport.sendProtocolHeader(bytes);
+            transport.sendProtocolHeader(AMQP_HEADER);
             HeaderResponse response = transport.getNextResponse();
-            assertArrayEquals("Unexpected protocol header response", bytes, response.getBody());
+            assertArrayEquals("Unexpected protocol header response", AMQP_HEADER, response.getBody());
         }
     }
 
     @Test
     @SpecificationTest(section = "2.4", description = "[...] a single AMQP frame MAY be split over one or more consecutive WebSocket messages. ")
-    @Ignore("QPID-7817")
     public void amqpFramesSplitOverManyWebSocketFrames() throws Exception
     {
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
         {
-            byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
-            transport.sendProtocolHeader(bytes);
-            HeaderResponse response = transport.getNextResponse();
-            assertArrayEquals("Unexpected protocol header response", bytes, response.getBody());
+            transport.sendProtocolHeader(AMQP_HEADER);
+            HeaderResponse response = transport.getNextResponse(HeaderResponse.class);
+            assertArrayEquals("Unexpected protocol header response", AMQP_HEADER, response.getBody());
 
             Open open = new Open();
             open.setContainerId("testContainerId");
-            transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
+            transport.sendPerformative(open, UnsignedShort.ZERO);
             Open responseOpen = transport.getNextResponseBody(Open.class);
 
             assertThat(responseOpen.getContainerId(), is(notNullValue()));
@@ -99,7 +98,7 @@ public class WebSocketTest extends ProtocolTestBase
 
             Open open = new Open();
             open.setContainerId("testContainerId");
-            transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
+            transport.sendPerformative(open, UnsignedShort.ZERO);
             Open responseOpen = transport.getNextResponseBody(Open.class);
 
             assertThat(responseOpen.getContainerId(), is(notNullValue()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]