qpid-broker-j git commit: QPID-7811: [Java Broker] Ensure that newly enqueued message is not deleted by AsynchronousMessageStoreRecoverer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

qpid-broker-j git commit: QPID-7811: [Java Broker] Ensure that newly enqueued message is not deleted by AsynchronousMessageStoreRecoverer

lquack
Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.0.x 5221fb1cc -> 807c0bc7b


QPID-7811: [Java Broker] Ensure that newly enqueued message is not deleted by AsynchronousMessageStoreRecoverer

* prevent deletion of newly enqueued message
* shutdown recoverer executor service after recovery is complete


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/807c0bc7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/807c0bc7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/807c0bc7

Branch: refs/heads/6.0.x
Commit: 807c0bc7b96c4c74e1c850cba1758e8adf1abf46
Parents: 5221fb1
Author: Alex Rudyy <[hidden email]>
Authored: Fri Jun 9 11:10:46 2017 +0100
Committer: Lorenz Quack <[hidden email]>
Committed: Tue Jun 13 10:53:50 2017 +0100

----------------------------------------------------------------------
 .../AsynchronousMessageStoreRecoverer.java      |  12 +-
 .../AsynchronousMessageStoreRecovererTest.java  | 139 ++++++++++++++++++-
 2 files changed, 144 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/807c0bc7/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 7661e18..cd20bf3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -190,13 +190,16 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
                 @Override
                 public boolean handle(final StoredMessage<?> storedMessage)
                 {
-
                     long messageNumber = storedMessage.getMessageNumber();
-                    if (!_recoveredMessages.containsKey(messageNumber))
+                    if ( _continueRecovery.get() && messageNumber < _maxMessageId)
                     {
-                        messagesToDelete.add(storedMessage);
+                        if (!_recoveredMessages.containsKey(messageNumber))
+                        {
+                            messagesToDelete.add(storedMessage);
+                        }
+                        return true;
                     }
-                    return _continueRecovery.get() && messageNumber < _maxMessageId - 1;
+                    return false;
                 }
             });
             for(StoredMessage<?> storedMessage : messagesToDelete)
@@ -213,6 +216,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
             messagesToDelete.clear();
             _recoveredMessages.clear();
             _storeReader.close();
+            _queueRecoveryExecutor.shutdown();
         }
 
         private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/807c0bc7/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
index 1c5a1d6..5ffb24e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
@@ -21,22 +21,38 @@
 package org.apache.qpid.server.virtualhost;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.concurrent.CountDownLatch;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.mockito.ArgumentMatcher;
 
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMessageMetaData;
+import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -91,4 +107,121 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
         ListenableFuture<Void> result = recoverer.recover(_virtualHost);
         assertNull(result.get());
     }
+
+    public void testRecoveryWhenLastRecoveryMessageIsConsumedBeforeRecoveryCompleted() throws Exception
+    {
+        AMQQueue<?> queue = mock(AMQQueue.class);
+        when(queue.getId()).thenReturn(UUID.randomUUID());
+        when(_virtualHost.getQueues()).thenReturn(Collections.singleton(queue));
+        when(_store.getNextMessageId()).thenReturn(3L);
+        when(_store.newTransaction()).thenReturn(mock(Transaction.class));
+
+        final List<StoredMessage<?>> testMessages = new ArrayList<>();
+        final StoredMessage<?> storedMessage = createTestMessage(1L);
+        testMessages.add(storedMessage);
+        StoredMessage<?> orphanedMessage = createTestMessage(2L);
+        testMessages.add(orphanedMessage);
+
+        StoredMessage newMessage = createTestMessage(4L);
+        testMessages.add(newMessage);
+
+        final MessageEnqueueRecord messageEnqueueRecord = mock(MessageEnqueueRecord.class);
+        UUID id = queue.getId();
+        when(messageEnqueueRecord.getQueueId()).thenReturn(id);
+        when(messageEnqueueRecord.getMessageNumber()).thenReturn(1L);
+
+        MockStoreReader storeReader = new MockStoreReader(Collections.singletonList(messageEnqueueRecord), testMessages);
+        when(_store.newMessageStoreReader()).thenReturn(storeReader);
+
+        AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
+        ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+        assertNull(result.get());
+
+        verify(orphanedMessage, times(1)).remove();
+        verify(newMessage, times(0)).remove();
+        verify(queue).recover(argThat(new ArgumentMatcher<ServerMessage>()
+        {
+            @Override
+            public boolean matches(final Object argument)
+            {
+                if (argument instanceof ServerMessage)
+                {
+                    ServerMessage serverMessage = (ServerMessage)argument;
+                    return serverMessage.getMessageNumber() == storedMessage.getMessageNumber();
+                }
+                return false;
+            }
+        }), same(messageEnqueueRecord));
+    }
+
+    private StoredMessage<?> createTestMessage(final long messageNumber)
+    {
+        final StorableMessageMetaData metaData = new TestMessageMetaData(messageNumber, 0);
+        final StoredMessage storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
+        when(storedMessage.getMetaData()).thenReturn(metaData);
+        return storedMessage;
+    }
+
+    private static class MockStoreReader implements MessageStore.MessageStoreReader
+    {
+        private final List<MessageEnqueueRecord> _messageEnqueueRecords;
+        private final List<StoredMessage<?>> _messages;
+
+        private MockStoreReader(final List<MessageEnqueueRecord> messageEnqueueRecords, List<StoredMessage<?>> messages)
+        {
+            _messageEnqueueRecords = messageEnqueueRecords;
+            _messages = messages;
+        }
+
+        @Override
+        public void visitMessages(final MessageHandler handler) throws StoreException
+        {
+            for (StoredMessage message: _messages)
+            {
+                handler.handle(message);
+            }
+        }
+
+        @Override
+        public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+        {
+            for(MessageEnqueueRecord record: _messageEnqueueRecords)
+            {
+                handler.handle(record);
+            }
+        }
+
+        @Override
+        public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler)
+                    throws StoreException
+        {
+            visitMessageInstances(handler);
+        }
+
+        @Override
+        public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+        {
+
+        }
+
+        @Override
+        public StoredMessage<?> getMessage(final long messageId)
+        {
+            for(StoredMessage<?> message: _messages)
+            {
+                if (message.getMessageNumber() == messageId)
+                {
+                    return message;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public void close()
+        {
+
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]