qpid-broker-j git commit: QPID-7664: [Java Broker] [AMQP1.0] outcome related fixes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

qpid-broker-j git commit: QPID-7664: [Java Broker] [AMQP1.0] outcome related fixes

lquack
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master d79e5b6ac -> 2f918544a


QPID-7664: [Java Broker] [AMQP1.0] outcome related fixes

* Supply outcomes on SendingLinks
* respect outcomes on Receiving Link
* respect outcomes on transaction discharge


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2f918544
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2f918544
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2f918544

Branch: refs/heads/master
Commit: 2f918544a00f7f2458e81feab6182a8e2539e4b1
Parents: d79e5b6
Author: Lorenz Quack <[hidden email]>
Authored: Thu Jun 15 15:10:33 2017 +0100
Committer: Lorenz Quack <[hidden email]>
Committed: Fri Jun 16 15:21:31 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/SendingLinkEndpoint.java      |   2 +
 .../v1_0/StandardReceivingLinkEndpoint.java     |  26 ++-
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  50 ++++-
 .../v1_0/transaction/DischargeTest.java         | 206 +++++++++++++++++++
 4 files changed, 265 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2f918544/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 55431e4..78b6290 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -58,6 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
@@ -641,6 +642,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
             final Modified defaultOutcome = new Modified();
             defaultOutcome.setDeliveryFailed(true);
             source.setDefaultOutcome(defaultOutcome);
+            source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Released.RELEASED_SYMBOL, Rejected.REJECTED_SYMBOL);
             source.setAddress(attachSource.getAddress());
             source.setDynamic(attachSource.getDynamic());
             if (Boolean.TRUE.equals(attachSource.getDynamic()) && attachSource.getDynamicNodeProperties() != null)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2f918544/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index e33bbb6..96dbdc1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -264,7 +265,17 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
                     DeliveryState resultantState;
 
-                    if(source.getOutcomes() == null || Arrays.asList(source.getOutcomes()).contains(outcome.getSymbol()))
+                    final List<Symbol> sourceSupportedOutcomes = new ArrayList<>();
+                    if (source.getOutcomes() != null)
+                    {
+                        sourceSupportedOutcomes.addAll(Arrays.asList(source.getOutcomes()));
+                    }
+                    else if (source.getDefaultOutcome() == null)
+                    {
+                        sourceSupportedOutcomes.add(Accepted.ACCEPTED_SYMBOL);
+                    }
+
+                    if (sourceSupportedOutcomes.contains(outcome.getSymbol()))
                     {
                         if (transactionId == null)
                         {
@@ -278,21 +289,16 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                             resultantState = transactionalState;
                         }
                     }
-                    else if(transactionId != null)
+                    else
                     {
-                        // cause the txn to fail
-                        if(transaction instanceof LocalTransaction)
+                        if(transactionId != null && transaction instanceof LocalTransaction
+                           && source.getDefaultOutcome() != null
+                           && outcome.getSymbol() != source.getDefaultOutcome().getSymbol())
                         {
                             ((LocalTransaction) transaction).setRollbackOnly();
                         }
                         resultantState = null;
                     }
-                    else
-                    {
-                        // we should just use the default outcome
-                        resultantState = null;
-                    }
-
 
                     boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2f918544/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index d6a7be6..25dd8bd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -20,6 +20,7 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,16 +33,17 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -137,9 +139,28 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                     {
                         Discharge discharge = (Discharge) command;
 
-                        final Error error = discharge(session.binaryToInteger(discharge.getTxnId()),
-                                                      Boolean.TRUE.equals(discharge.getFail()));
-                        updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
+                        Error error = discharge(discharge.getTxnId(), Boolean.TRUE.equals(discharge.getFail()));
+                        final DeliveryState outcome;
+                        if (error == null)
+                        {
+                            outcome = new Accepted();
+                        }
+                        else if (Arrays.asList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL))
+                        {
+                            final Rejected rejected = new Rejected();
+                            rejected.setError(error);
+                            outcome = rejected;
+                            error = null;
+                        }
+                        else
+                        {
+                            outcome = null;
+                        }
+
+                        if (error == null)
+                        {
+                            updateDisposition(deliveryTag, outcome, true);
+                        }
                         return error;
                     }
                     else
@@ -168,10 +189,21 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
         return null;
     }
 
-    private Error discharge(Integer transactionId, boolean fail)
+    private Error discharge(Binary transactionIdAsBinary, boolean fail)
     {
         Error error = null;
-        ServerTransaction txn = _createdTransactions.get(transactionId);
+        Integer transactionId = null;
+        ServerTransaction txn = null;
+        try
+        {
+            transactionId = getSession().binaryToInteger(transactionIdAsBinary);
+            txn = _createdTransactions.get(transactionId);
+        }
+        catch (IllegalArgumentException e)
+        {
+           // pass
+        }
+
         if(txn != null)
         {
             if(fail)
@@ -189,7 +221,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                 txn.rollback();
                 getSession().incrementRolledBackTransactions();
                 error = new Error();
-                error.setCondition(LinkError.DETACH_FORCED);
+                error.setCondition(TransactionErrors.TRANSACTION_ROLLBACK);
                 error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
             }
             _createdTransactions.remove(transactionId);
@@ -198,8 +230,8 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
         else
         {
             error = new Error();
-            error.setCondition(AmqpError.NOT_FOUND);
-            error.setDescription("Unknown transactionId" + transactionId);
+            error.setCondition(TransactionErrors.UNKNOWN_ID);
+            error.setDescription("Unknown transactionId " + transactionIdAsBinary.toString());
         }
         return error;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2f918544/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
new file mode 100644
index 0000000..46f8506
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transaction;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class DischargeTest extends ProtocolTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3",
+            description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller "
+                          + "as a transaction-error. If the source for the link to the coordinator supports the rejected outcome, then the "
+                          + "message MUST be rejected with this outcome carrying the transaction-error.")
+    public void dischargeUnknownTransactionIdWhenSourceSupportsRejectedOutcome() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            final Attach attach = new Attach();
+            attach.setName("testSendingLink");
+            attach.setHandle(linkHandle);
+            attach.setRole(Role.SENDER);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            Source source = new Source();
+            source.setOutcomes(Rejected.REJECTED_SYMBOL);
+            attach.setSource(source);
+
+            Coordinator target = new Coordinator();
+            attach.setTarget(target);
+            transport.doAttachSendingLink(attach);
+
+            final Binary txnId = declareTransaction(transport, linkHandle);
+            assertThat(txnId, is(notNullValue()));
+
+            PerformativeResponse flowResponse =  transport.getNextResponse();
+            assertThat(flowResponse, is(notNullValue()));
+            assertThat(flowResponse.getBody(), is(instanceOf(Flow.class)));
+
+            dischargeTransaction(transport, linkHandle, new Binary("nonExistingTransaction".getBytes(UTF_8)));
+
+            PerformativeResponse dischargeResponse =  transport.getNextResponse();
+
+            assertThat(dischargeResponse, is(notNullValue()));
+            assertThat(dischargeResponse.getBody(), is(instanceOf(Disposition.class)));
+            Disposition dischargeDisposition = (Disposition) dischargeResponse.getBody();
+            assertThat(dischargeDisposition.getState(), is(instanceOf(Rejected.class)));
+            final Error error = ((Rejected) dischargeDisposition.getState()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(TransactionErrors.UNKNOWN_ID)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3",
+            description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller "
+                          + "as a transaction-error. [...] If the source does not support "
+                          + "the rejected outcome, the transactional resource MUST detach the link to the coordinator, with the detach "
+                          + "performative carrying the transaction-error.")
+    public void dischargeUnknownTransactionIdWhenSourceDoesNotSupportRejectedOutcome() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            final Attach attach = new Attach();
+            attach.setName("testSendingLink");
+            attach.setHandle(linkHandle);
+            attach.setRole(Role.SENDER);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            Source source = new Source();
+            source.setOutcomes(Accepted.ACCEPTED_SYMBOL);
+            attach.setSource(source);
+
+            Coordinator target = new Coordinator();
+            attach.setTarget(target);
+            transport.doAttachSendingLink(attach);
+
+            final Binary txnId = declareTransaction(transport, linkHandle);
+            assertThat(txnId, is(notNullValue()));
+
+            PerformativeResponse flowResponse =  transport.getNextResponse();
+            assertThat(flowResponse, is(notNullValue()));
+            assertThat(flowResponse.getBody(), is(instanceOf(Flow.class)));
+
+            dischargeTransaction(transport, linkHandle, new Binary("nonExistingTransaction".getBytes(UTF_8)));
+
+            PerformativeResponse detachResponse = transport.getNextResponse();
+
+            assertThat(detachResponse, is(notNullValue()));
+            assertThat(detachResponse.getBody(), is(instanceOf(Detach.class)));
+            Error error = ((Detach) detachResponse.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(TransactionErrors.UNKNOWN_ID)));
+        }
+    }
+
+    private void dischargeTransaction(final FrameTransport transport,
+                                      final UnsignedInteger linkHandle,
+                                      final Binary txnId) throws Exception
+    {
+        Transfer dischargeTransactionTransfer = new Transfer();
+        dischargeTransactionTransfer.setDeliveryId(UnsignedInteger.ONE);
+        dischargeTransactionTransfer.setDeliveryTag(new Binary("discharge".getBytes(UTF_8)));
+        dischargeTransactionTransfer.setHandle(linkHandle);
+        final Discharge discharge = new Discharge();
+        discharge.setTxnId(txnId);
+        setPayload(discharge, dischargeTransactionTransfer);
+        transport.sendPerformative(dischargeTransactionTransfer);
+    }
+
+    private Binary declareTransaction(final FrameTransport transport, final UnsignedInteger linkHandle) throws Exception
+    {
+        Transfer declareTransactionTransfer = new Transfer();
+        declareTransactionTransfer.setDeliveryId(UnsignedInteger.ZERO);
+        declareTransactionTransfer.setDeliveryTag(new Binary("declare".getBytes(UTF_8)));
+        declareTransactionTransfer.setHandle(linkHandle);
+        setPayload(new Declare(), declareTransactionTransfer);
+        transport.sendPerformative(declareTransactionTransfer);
+
+        PerformativeResponse declareResponse =  transport.getNextResponse();
+
+        assertThat(declareResponse, is(notNullValue()));
+        assertThat(declareResponse.getBody(), is(instanceOf(Disposition.class)));
+        Disposition disposition = (Disposition) declareResponse.getBody();
+        assertThat(disposition.getState(), is(instanceOf(Declared.class)));
+        assertThat(disposition.getSettled(), is(equalTo(true)));
+        return ((Declared) disposition.getState()).getTxnId();
+    }
+
+    private void setPayload(final Object payload, final Transfer transfer)
+    {
+        AmqpValue amqpValue = new AmqpValue(payload);
+        final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
+        final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
+        transfer.setPayload(encodedForm);
+        section.dispose();
+
+        for (QpidByteBuffer qbb: encodedForm)
+        {
+            qbb.dispose();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...