qpid-broker-j git commit: QPID-7635: [Java Broker] Refactor resolution of routing address for AMQP 1.0

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

qpid-broker-j git commit: QPID-7635: [Java Broker] Refactor resolution of routing address for AMQP 1.0

lquack
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 1a1aed8af -> d79e5b6ac


QPID-7635: [Java Broker] Refactor resolution of routing address for AMQP 1.0


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d79e5b6a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d79e5b6a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d79e5b6a

Branch: refs/heads/master
Commit: d79e5b6ac5d771dee5ea63e0fe7cb3eac8123112
Parents: 1a1aed8
Author: Lorenz Quack <[hidden email]>
Authored: Fri Jun 16 10:35:23 2017 +0100
Committer: Lorenz Quack <[hidden email]>
Committed: Fri Jun 16 10:39:13 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/message/ServerMessage.java      |  2 -
 .../message/internal/InternalMessage.java       | 15 +---
 .../message/AbstractServerMessageTest.java      |  8 +-
 .../server/store/TestMessageMetaDataType.java   |  8 +-
 .../qpid/server/txn/MockServerMessage.java      |  6 --
 .../protocol/v0_10/MessageTransferMessage.java  | 14 +---
 .../qpid/server/protocol/v0_8/AMQMessage.java   | 13 +---
 .../v1_0/AnonymousRelayDestination.java         |  2 +-
 .../protocol/v1_0/ExchangeDestination.java      | 26 ++++++-
 .../qpid/server/protocol/v1_0/Message_1_0.java  | 77 +++++---------------
 .../protocol/v1_0/NodeReceivingDestination.java |  2 +-
 .../protocol/v1_0/ReceivingDestination.java     | 11 ++-
 12 files changed, 64 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 693afd6..d648b63 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -55,6 +55,4 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu
     Object getConnectionReference();
 
     boolean isResourceAcceptable(TransactionLogResource resource);
-
-    String getRoutingAddress(String destinationAddress, String initialDestinationRoutingAddress);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 5de6127..78e930d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -48,7 +48,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
     private static final String NON_AMQP_MESSAGE = "Non-AMQP Message";
     private final Object _messageBody;
     private InternalMessageHeader _header;
-    private String _initialRoutingAddress;
+    private String _initialRoutingAddress = "";
 
 
     InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
@@ -118,17 +118,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
         return true;
     }
 
-    @Override
-    public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-    {
-        String initialRoutingAddress = getInitialRoutingAddress();
-        if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
-        {
-            initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
-        }
-        return initialRoutingAddress;
-    }
-
     public Object getMessageBody()
     {
         return _messageBody;
@@ -292,6 +281,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
 
     public void setInitialRoutingAddress(final String initialRoutingAddress)
     {
-        _initialRoutingAddress = initialRoutingAddress;
+        _initialRoutingAddress = initialRoutingAddress == null ? "" : initialRoutingAddress;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
index c3dd9aa..1b2cb8f 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
@@ -44,7 +44,7 @@ public class AbstractServerMessageTest extends QpidTestCase
         @Override
         public String getInitialRoutingAddress()
         {
-            return null;
+            return "";
         }
 
         @Override
@@ -76,12 +76,6 @@ public class AbstractServerMessageTest extends QpidTestCase
         {
             return true;
         }
-
-        @Override
-        public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-        {
-            return null;
-        }
     }
 
     private TransactionLogResource createQueue(String name)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index cd7da55..24fa263 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -124,12 +124,6 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
         }
 
         @Override
-        public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-        {
-            return null;
-        }
-
-        @Override
         public long getExpiration()
         {
             return 0;
@@ -156,7 +150,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
         @Override
         public String getInitialRoutingAddress()
         {
-            return null;
+            return "";
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 4b14faf..fcb7c92 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -139,12 +139,6 @@ class MockServerMessage implements ServerMessage
     }
 
     @Override
-    public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-    {
-        return null;
-    }
-
-    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 8af13d8..07860f5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -50,7 +50,8 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
 
     public String getInitialRoutingAddress()
     {
-        return getMetaData().getRoutingKey();
+        final String routingKey = getMetaData().getRoutingKey();
+        return routingKey == null ? "" : routingKey;
     }
 
     public AMQMessageHeader getMessageHeader()
@@ -86,17 +87,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
         return true;
     }
 
-    @Override
-    public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-    {
-        String initialRoutingAddress = getInitialRoutingAddress();
-        if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
-        {
-            initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
-        }
-        return initialRoutingAddress;
-    }
-
     public Header getHeader()
     {
         return getMetaData().getHeader();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index d05d312..aaf22a2 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -69,7 +69,7 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
                 return routingKey.toString();
             }
         }
-        return null;
+        return "";
     }
 
     public AMQMessageHeader getMessageHeader()
@@ -93,17 +93,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
         return true;
     }
 
-    @Override
-    public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-    {
-        String initialRoutingAddress = getInitialRoutingAddress();
-        if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
-        {
-            initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
-        }
-        return initialRoutingAddress;
-    }
-
     public boolean isImmediate()
     {
         return getMessagePublishInfo().isImmediate();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index bf114df..9d8c819 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -76,7 +76,7 @@ public class AnonymousRelayDestination implements ReceivingDestination
     public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
     {
         final ReceivingDestination destination;
-        final String routingAddress = message.getRoutingAddress(null, null);
+        final String routingAddress = message.getInitialRoutingAddress();
         if (!routingAddress.startsWith("/") && routingAddress.contains("/"))
         {
             String[] parts = routingAddress.split("/", 2);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 29a1679..575dafd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -97,7 +97,7 @@ public class ExchangeDestination extends QueueDestination
 
     public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
     {
-        final String routingAddress = message.getRoutingAddress(_exchange.getName(), _initialRoutingAddress);
+        final String routingAddress = getRoutingAddress(message);
         _exchange.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
 
         final InstanceProperties instanceProperties =
@@ -150,6 +150,30 @@ public class ExchangeDestination extends QueueDestination
         return _exchange;
     }
 
+    private String getRoutingAddress(final ServerMessage<?> message)
+    {
+        String routingAddress;
+        if (_initialRoutingAddress == null)
+        {
+            return ReceivingDestination.getRoutingAddress(message, _exchange.getName());
+        }
+        else
+        {
+            String initialRoutingAddress = message.getInitialRoutingAddress();
+            if (initialRoutingAddress.startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/"))
+            {
+                routingAddress = initialRoutingAddress.substring(2
+                                                                 + _exchange.getName().length()
+                                                                 + _initialRoutingAddress.length());
+            }
+            else
+            {
+                routingAddress = _initialRoutingAddress;
+            }
+        }
+        return routingAddress;
+    }
+
     TerminusDurability getDurability()
     {
         return _durability;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index f308e78..c29fa09 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -72,15 +72,31 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
 
     public String getInitialRoutingAddress()
     {
-        Object routingKey = getMessageHeader().getHeader("routing-key");
-        if(routingKey != null)
+        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader();
+        String routingAddress = null;
+        final String to = messageHeader.getTo();
+        if (to != null)
+        {
+            routingAddress = to;
+        }
+        else if (messageHeader.getHeader("routing-key") instanceof String)
+        {
+            routingAddress = (String) messageHeader.getHeader("routing-key");
+        }
+        else if (messageHeader.getHeader("routing_key") instanceof String)
         {
-            return routingKey.toString();
+            routingAddress = (String) messageHeader.getHeader("routing_key");
+        }
+        else if (messageHeader.getSubject() != null)
+        {
+            routingAddress = messageHeader.getSubject();
         }
         else
         {
-            return getMessageHeader().getTo();
+            routingAddress = "";
         }
+
+        return routingAddress;
     }
 
     private MessageMetaData_1_0 getMessageMetaData()
@@ -118,59 +134,6 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
         return getMessageHeader().getNotValidBefore() == 0L || resourceSupportsDeliveryDelay(resource);
     }
 
-    @Override
-    public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress)
-    {
-        String routingAddress;
-        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader();
-        if (initialDestinationRoutingAddress == null)
-        {
-            final String to = messageHeader.getTo();
-            if (to != null && (destinationAddress == null || destinationAddress.trim().equals("")))
-            {
-                routingAddress = to;
-            }
-            else if (to != null && to.startsWith(destinationAddress + "/"))
-            {
-                routingAddress = to.substring(1 + destinationAddress.length());
-            }
-            else if (to != null && !to.equals(destinationAddress))
-            {
-                routingAddress = to;
-            }
-            else if (messageHeader.getHeader("routing-key") instanceof String)
-            {
-                routingAddress = (String) messageHeader.getHeader("routing-key");
-            }
-            else if (messageHeader.getHeader("routing_key") instanceof String)
-            {
-                routingAddress = (String) messageHeader.getHeader("routing_key");
-            }
-            else if (messageHeader.getSubject() != null)
-            {
-                routingAddress = messageHeader.getSubject();
-            }
-            else
-            {
-                routingAddress = "";
-            }
-        }
-        else
-        {
-            if (messageHeader.getTo() != null
-                && messageHeader.getTo().startsWith(destinationAddress + "/" + initialDestinationRoutingAddress + "/"))
-            {
-                final int prefixLength = 2 + destinationAddress.length() + initialDestinationRoutingAddress.length();
-                routingAddress = messageHeader.getTo().substring(prefixLength);
-            }
-            else
-            {
-                routingAddress = initialDestinationRoutingAddress;
-            }
-        }
-        return routingAddress;
-    }
-
     private boolean resourceSupportsDeliveryDelay(final TransactionLogResource resource)
     {
         return resource instanceof Queue && ((Queue<?>)resource).isHoldOnPublishEnabled();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 88908c5..ac598d6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -80,7 +80,7 @@ public class NodeReceivingDestination implements ReceivingDestination
 
     public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
     {
-        final String routingAddress = message.getRoutingAddress(_destination.getName(), null);
+        final String routingAddress = ReceivingDestination.getRoutingAddress(message, _address);
         _destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
 
         final InstanceProperties instanceProperties =

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index 502cc47..debf559 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -44,4 +43,14 @@ public interface ReceivingDestination extends Destination
     String getAddress();
 
     MessageDestination getMessageDestination();
+
+    static String getRoutingAddress(final ServerMessage<?> message, final String destinationName)
+    {
+        String initialRoutingAddress = message.getInitialRoutingAddress();
+        if (destinationName != null && initialRoutingAddress.startsWith(destinationName + "/"))
+        {
+            initialRoutingAddress = initialRoutingAddress.substring(destinationName.length() + 1);
+        }
+        return initialRoutingAddress;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...