qpid-broker-j git commit: QPID-7815: [Java Broker] Move overflow policy handlers creation from afterSet into changeAttributes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

qpid-broker-j git commit: QPID-7815: [Java Broker] Move overflow policy handlers creation from afterSet into changeAttributes

orudyy
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master a946173df -> 42bebb9ff


QPID-7815: [Java Broker] Move overflow policy handlers creation from afterSet into changeAttributes


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/42bebb9f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/42bebb9f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/42bebb9f

Branch: refs/heads/master
Commit: 42bebb9ff6e92f81eaffbd8d8a5350f3e6c10b1d
Parents: a946173
Author: Alex Rudyy <[hidden email]>
Authored: Fri Aug 11 14:21:58 2017 +0100
Committer: Alex Rudyy <[hidden email]>
Committed: Fri Aug 11 14:21:58 2017 +0100

----------------------------------------------------------------------
 .../apache/qpid/server/queue/AbstractQueue.java | 38 +++++++++++++-------
 .../queue/FlowToDiskOverflowPolicyHandler.java  |  5 ++-
 ...roducerFlowControlOverflowPolicyHandler.java |  5 ++-
 3 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bebb9f/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c6accc2..0e72802 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -241,7 +241,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @ManagedAttributeField
     private volatile boolean _holdOnPublishEnabled;
 
-    @ManagedAttributeField(afterSet = "postSetOverflowPolicy")
+    @ManagedAttributeField()
     private OverflowPolicy _overflowPolicy;
     @ManagedAttributeField
     private long _maximumQueueDepthMessages;
@@ -573,17 +573,19 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             }
         }
 
-        if (_rejectPolicyHandler != null)
+        OverflowPolicy overflowPolicy = getOverflowPolicy();
+        _postEnqueueOverflowPolicyHandler = createPostEnqueueOverflowPolicyHandler(overflowPolicy);
+        if (overflowPolicy == OverflowPolicy.REJECT)
         {
+            _rejectPolicyHandler = new RejectPolicyHandler(this);
             _rejectPolicyHandler.onQueueOpen();
         }
 
         updateAlertChecks();
     }
 
-    private void createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+    private OverflowPolicyHandler createPostEnqueueOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
     {
-        RejectPolicyHandler rejectPolicyHandler = null;
         OverflowPolicyHandler overflowPolicyHandler;
         switch (overflowPolicy)
         {
@@ -601,15 +603,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                 break;
             case REJECT:
                 overflowPolicyHandler = new NoneOverflowPolicyHandler();
-                rejectPolicyHandler = new RejectPolicyHandler(this);
                 break;
             default:
                 throw new IllegalStateException(String.format("Overflow policy '%s' is not implemented",
                                                               overflowPolicy.name()));
         }
 
-        _rejectPolicyHandler = rejectPolicyHandler;
-        _postEnqueueOverflowPolicyHandler = overflowPolicyHandler;
+        return overflowPolicyHandler;
     }
 
     protected LogMessage getCreatedLogMessage()
@@ -3091,15 +3091,29 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
 
         return super.changeAttribute(name, desired);
-
     }
 
-    @SuppressWarnings("ignore")
-    private void postSetOverflowPolicy()
+    @Override
+    protected void changeAttributes(final Map<String, Object> attributes)
     {
-        createOverflowPolicyHandler(getOverflowPolicy());
-        if (getState() == State.ACTIVE)
+        OverflowPolicy existingPolicy = getOverflowPolicy();
+        super.changeAttributes(attributes);
+
+        // Overflow policies depend on queue depth attributes.
+        // Thus, we need to create and invoke  overflow policy handler
+        // after all required attributes are changed.
+        if (attributes.containsKey(OVERFLOW_POLICY) && existingPolicy != _overflowPolicy)
         {
+            if (existingPolicy == OverflowPolicy.REJECT)
+            {
+                _rejectPolicyHandler = null;
+            }
+            _postEnqueueOverflowPolicyHandler = createPostEnqueueOverflowPolicyHandler(_overflowPolicy);
+            if (_overflowPolicy == OverflowPolicy.REJECT)
+            {
+                _rejectPolicyHandler = new RejectPolicyHandler(this);
+            }
+
             _postEnqueueOverflowPolicyHandler.checkOverflow(null);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bebb9f/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 63030cd..9a6b354 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -43,7 +43,7 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
 
     }
 
-    private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler
+    private static class Handler extends AbstractConfigurationChangeListener
     {
         private final Queue<?> _queue;
         private boolean _limitsChanged;
@@ -53,8 +53,7 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
             _queue = queue;
         }
 
-        @Override
-        public void checkOverflow(final QueueEntry newlyEnqueued)
+        private void checkOverflow(final QueueEntry newlyEnqueued)
         {
             long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
             long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bebb9f/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
index c2f9800..b2559b5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
@@ -57,7 +57,7 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH
         _handler.checkOverflow(newlyEnqueued);
     }
 
-    private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler
+    private static class Handler extends AbstractConfigurationChangeListener
     {
         private final Queue<?> _queue;
         private final EventLogger _eventLogger;
@@ -78,8 +78,7 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH
             }
         }
 
-        @Override
-        public void checkOverflow(final QueueEntry newlyEnqueued)
+        private void checkOverflow(final QueueEntry newlyEnqueued)
         {
             long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
             long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...