qpid-broker-j git commit: QPID-7648: [Java Broker] Reject AMQP 1.0 durable messages if no persistent store is configured.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

qpid-broker-j git commit: QPID-7648: [Java Broker] Reject AMQP 1.0 durable messages if no persistent store is configured.

lquack
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master b36c7180f -> 82ef9deaa


QPID-7648: [Java Broker] Reject AMQP 1.0 durable messages if no persistent store is configured.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/82ef9dea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/82ef9dea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/82ef9dea

Branch: refs/heads/master
Commit: 82ef9deaa97508998fc0db3e56969242778de4a3
Parents: b36c718
Author: Lorenz Quack <[hidden email]>
Authored: Mon Jun 12 12:10:43 2017 +0100
Committer: Lorenz Quack <[hidden email]>
Committed: Mon Jun 12 12:10:43 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/store/MemoryMessageStore.java   |   2 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |  25 +++-
 pom.xml                                         |   7 +-
 .../tests/protocol/v1_0/MessageEncoder.java     |  20 +--
 .../protocol/v1_0/messaging/TransferTest.java   | 142 +++++++++++++++++++
 5 files changed, 184 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 0da7db7..d9992ec 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -321,7 +321,7 @@ public class MemoryMessageStore implements MessageStore
     @Override
     public boolean isPersistent()
     {
-        return false;
+        return Boolean.parseBoolean(System.getProperty("qpid.tests.mms.messagestore.persistence", "false"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 331db41..e33bbb6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -237,9 +238,29 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     session.getAMQPConnection()
                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
 
-                    Outcome outcome = getReceivingDestination().send(serverMessage, transaction,
-                                                                     session.getSecurityToken());
+                    Outcome outcome;
                     Source source = getSource();
+                    if (serverMessage.isPersistent() && !getAddressSpace().getMessageStore().isPersistent())
+                    {
+                        final Error preconditionFailedError = new Error(AmqpError.PRECONDITION_FAILED,
+                                                                        "Non-durable message store cannot accept durable message.");
+                        if (source.getOutcomes() != null && Arrays.asList(source.getOutcomes())
+                                                                  .contains(Rejected.REJECTED_SYMBOL))
+                        {
+                            final Rejected rejected = new Rejected();
+                            rejected.setError(preconditionFailedError);
+                            outcome = rejected;
+                        }
+                        else
+                        {
+                            return preconditionFailedError;
+                        }
+                    }
+                    else
+                    {
+                        outcome = getReceivingDestination().send(serverMessage, transaction,
+                                                                 session.getSecurityToken());
+                    }
 
                     DeliveryState resultantState;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 45f6fa1..612ba6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
     <profile.test_receive_timeout>1000</profile.test_receive_timeout>
     <profile.java.naming.factory.initial>org.apache.qpid.jndi.PropertiesFileInitialContextFactory</profile.java.naming.factory.initial>
     <profile.java.naming.provider.url>test-profiles${file.separator}test-provider.properties</profile.java.naming.provider.url>
+    <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
 
     <dollar.sign>$</dollar.sign>
     <at.sign>@</at.sign>
@@ -531,6 +532,7 @@
             <test.output.dir>${test.output.dir}</test.output.dir>
             <broker.clean.between.tests>true</broker.clean.between.tests>
             <qpid.test_receive_timeout>${profile.test_receive_timeout}</qpid.test_receive_timeout>
+            <qpid.tests.mms.messagestore.persistence>${profile.qpid.tests.mms.messagestore.persistence}</qpid.tests.mms.messagestore.persistence>
             <java.naming.factory.initial>${profile.java.naming.factory.initial}</java.naming.factory.initial>
             <java.naming.provider.url>${profile.java.naming.provider.url}</java.naming.provider.url>
           </systemPropertyVariables>
@@ -732,6 +734,7 @@
         <profile.broker.persistent>false</profile.broker.persistent>
         <profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
         <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+        <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
       </properties>
     </profile>
 
@@ -751,6 +754,7 @@
         <profile.broker.persistent>false</profile.broker.persistent>
         <profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
         <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+        <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
       </properties>
     </profile>
 
@@ -770,6 +774,7 @@
         <profile.broker.persistent>false</profile.broker.persistent>
         <profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
         <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+        <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
       </properties>
     </profile>
 
@@ -965,7 +970,7 @@
         <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
         <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
         <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
-
+        <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
       </properties>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
index 78ffe9d..0c77ab1 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
@@ -25,29 +25,33 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 
 public class MessageEncoder
 {
-    private static final AMQPDescribedTypeRegistry
-            AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                    .registerTransportLayer()
-                                                                    .registerMessagingLayer();
+    private Header _header;
     private List<String> _data = new LinkedList<>();
-    private SectionEncoder _encoder = new SectionEncoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY);
 
     public void addData(final String data)
     {
         _data.add(data);
     }
 
+    public void setHeader(Header header)
+    {
+        _header = header;
+    }
+
     public List<QpidByteBuffer> getPayload()
     {
         List<QpidByteBuffer> payload = new ArrayList<>();
+        if (_header != null)
+        {
+            payload.addAll(_header.createEncodingRetainingSection().getEncodedForm());
+        }
+
         if (_data.isEmpty())
         {
             throw new IllegalStateException("Message should have at least one data section");

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index c099e20..8d93002 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -32,15 +32,19 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -66,14 +70,31 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 public class TransferTest extends ProtocolTestBase
 {
     private InetSocketAddress _brokerAddress;
+    private String _originalMmsMessageStorePersistence;
 
     @Before
     public void setUp()
     {
+        _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+        System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
         getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
         _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
     }
 
+    @After
+    public void tearDown()
+    {
+        if (_originalMmsMessageStorePersistence != null)
+        {
+            System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+        }
+        else
+        {
+            System.clearProperty("qpid.tests.mms.messagestore.persistence");
+        }
+    }
+
     @Test
     @SpecificationTest(section = "1.3.4",
             description = "Transfer without mandatory fields should result in a decoding error.")
@@ -278,6 +299,8 @@ public class TransferTest extends ProtocolTestBase
             MessageEncoder messageEncoder = new MessageEncoder();
             messageEncoder.addData("foo");
             Transfer transfer = new Transfer();
+            transfer.setDeliveryId(UnsignedInteger.ONE);
+            transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
             transfer.setHandle(linkHandle);
             transfer.setPayload(messageEncoder.getPayload());
             transfer.setSettled(Boolean.TRUE);
@@ -324,4 +347,123 @@ public class TransferTest extends ProtocolTestBase
             assertThat(closeResponse.getFrameBody(), is(instanceOf(Close.class)));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "3.2.1",
+            description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and "
+                          + "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages "
+                          + "where the durable header is set to true: if the source allows the rejected outcome then the "
+                          + "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+                          + "detached by the receiver with the same error.")
+    public void durableTransferWithRejectedOutcome() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress))
+        {
+            final Attach attach = new Attach();
+            attach.setName("testLink");
+            attach.setRole(Role.SENDER);
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            attach.setHandle(linkHandle);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            Target target = new Target();
+            target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+            attach.setTarget(target);
+            final Source source = new Source();
+            source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+            attach.setSource(source);
+            transport.doAttachSendingLink(attach);
+
+            MessageEncoder messageEncoder = new MessageEncoder();
+            final Header header = new Header();
+            header.setDurable(true);
+            messageEncoder.setHeader(header);
+            messageEncoder.addData("test message data.");
+            Transfer transfer = new Transfer();
+            transfer.setDeliveryId(UnsignedInteger.ONE);
+            transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+            transfer.setHandle(linkHandle);
+            transfer.setPayload(messageEncoder.getPayload());
+
+            transport.sendPerformative(transfer);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            if (getBrokerAdmin().supportsRestart())
+            {
+                assertThat(response, is(notNullValue()));
+                assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+                final Disposition receivedDisposition = (Disposition) response.getFrameBody();
+                assertThat(receivedDisposition.getSettled(), is(true));
+                assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
+                assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL));
+            }
+            else
+            {
+                assertThat(response, is(notNullValue()));
+                assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+                final Disposition receivedDisposition = (Disposition) response.getFrameBody();
+                assertThat(receivedDisposition.getSettled(), is(true));
+                assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
+                assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Rejected.REJECTED_SYMBOL));
+            }
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "3.2.1",
+            description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and "
+                          + "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages "
+                          + "where the durable header is set to true: if the source allows the rejected outcome then the "
+                          + "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+                          + "detached by the receiver with the same error.")
+    public void durableTransferWithoutRejectedOutcome() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress))
+        {
+            final Attach attach = new Attach();
+            attach.setName("testLink");
+            attach.setRole(Role.SENDER);
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            attach.setHandle(linkHandle);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            Target target = new Target();
+            target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+            attach.setTarget(target);
+            final Source source = new Source();
+            source.setOutcomes(Accepted.ACCEPTED_SYMBOL);
+            attach.setSource(source);
+            transport.doAttachSendingLink(attach);
+
+            MessageEncoder messageEncoder = new MessageEncoder();
+            final Header header = new Header();
+            header.setDurable(true);
+            messageEncoder.setHeader(header);
+            messageEncoder.addData("test message data.");
+            Transfer transfer = new Transfer();
+            transfer.setDeliveryId(UnsignedInteger.ONE);
+            transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+            transfer.setHandle(linkHandle);
+            transfer.setPayload(messageEncoder.getPayload());
+
+            transport.sendPerformative(transfer);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            if (getBrokerAdmin().supportsRestart())
+            {
+                assertThat(response, is(notNullValue()));
+                assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+                final Disposition receivedDisposition = (Disposition) response.getFrameBody();
+                assertThat(receivedDisposition.getSettled(), is(true));
+                assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
+                assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL));
+            }
+            else
+            {
+                assertThat(response, is(notNullValue()));
+                assertThat(response.getFrameBody(), is(instanceOf(Detach.class)));
+                final Detach receivedDetach = (Detach) response.getFrameBody();
+                assertThat(receivedDetach.getError(), is(notNullValue()));
+                assertThat(receivedDetach.getError().getCondition(), is(AmqpError.PRECONDITION_FAILED));
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]