[1/2] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve AMQP 0-10 to 1.0 content conversion and add unit tests

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve AMQP 0-10 to 1.0 content conversion and add unit tests

lquack
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 67aa48fd5 -> 13c9e7e66


QPID-7434: [Java Broker] Improve AMQP 0-10 to 1.0 content conversion and add unit tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/13c9e7e6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/13c9e7e6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/13c9e7e6

Branch: refs/heads/master
Commit: 13c9e7e668f9c09c796981135433099a6946f2e4
Parents: 20c9c58
Author: Lorenz Quack <[hidden email]>
Authored: Wed Aug 9 14:22:35 2017 +0100
Committer: Lorenz Quack <[hidden email]>
Committed: Wed Aug 9 14:30:59 2017 +0100

----------------------------------------------------------------------
 .../MessageConverter_0_10_to_1_0.java           |  19 +-
 .../MessageConverter_0_10_to_1_0Test.java       | 496 +++++++++++++++++++
 2 files changed, 506 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/13c9e7e6/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
index c28d531..5bdfbf5 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
@@ -110,7 +111,7 @@ public class MessageConverter_0_10_to_1_0  extends MessageConverter_to_1_0<Messa
 
 
         ApplicationProperties applicationProperties = null;
-
+        String originalContentMimeType = null;
         if(msgProps != null)
         {
             if(msgProps.hasContentEncoding()
@@ -150,15 +151,13 @@ public class MessageConverter_0_10_to_1_0  extends MessageConverter_to_1_0<Messa
                 }
                 props.setReplyTo(to);
             }
+
             if(msgProps.hasContentType())
             {
-                props.setContentType(Symbol.valueOf(msgProps.getContentType()));
-
-                // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
-                if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
-                {
-                    props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
-                }
+                originalContentMimeType = msgProps.getContentType();
+                final Symbol contentType =
+                        MessageConverter_to_1_0.getContentType(originalContentMimeType, bodySection);
+                props.setContentType(contentType);
             }
 
             if(msgProps.hasUserId())
@@ -208,9 +207,11 @@ public class MessageConverter_0_10_to_1_0  extends MessageConverter_to_1_0<Messa
                 }
             }
         }
+        final MessageAnnotations messageAnnotation =
+                MessageConverter_to_1_0.createMessageAnnotation(bodySection, originalContentMimeType);
         return new MessageMetaData_1_0(header.createEncodingRetainingSection(),
                                        null,
-                                       null,
+                                       messageAnnotation == null ? null : messageAnnotation.createEncodingRetainingSection(),
                                        props.createEncodingRetainingSection(),
                                        applicationProperties == null ? null : applicationProperties.createEncodingRetainingSection(),
                                        null,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/13c9e7e6/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
new file mode 100644
index 0000000..2e8357a
--- /dev/null
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getContentType;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
+{
+    private final MessageConverter_0_10_to_1_0 _converter = new MessageConverter_0_10_to_1_0();
+    private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
+                                                                                     .registerTransportLayer()
+                                                                                     .registerMessagingLayer()
+                                                                                     .registerTransactionLayer()
+                                                                                     .registerSecurityLayer();
+
+    private final StoredMessage<MessageMetaData_0_10> _handle = mock(StoredMessage.class);
+
+    private final MessageMetaData_0_10 _metaData = mock(MessageMetaData_0_10.class);
+    private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+    private final Header _header = mock(Header.class);
+    private MessageProperties _messageProperties;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        _messageProperties = new MessageProperties();
+
+        when(_handle.getMetaData()).thenReturn(_metaData);
+        when(_header.getMessageProperties()).thenReturn(_messageProperties);
+        when(_metaData.getHeader()).thenReturn(_header);
+        when(_metaData.getMessageHeader()).thenReturn(_amqpHeader);
+        when(_metaData.getMessageProperties()).thenReturn(_messageProperties);
+    }
+
+    public void testConvertStringMessageBody() throws Exception
+    {
+        doTestTextMessage("helloworld", "text/plain");
+    }
+
+    public void testConvertEmptyStringMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/plain");
+    }
+
+    public void testConvertStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>", "text/xml");
+    }
+
+    public void testConvertEmptyStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/xml");
+    }
+
+    public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "application/xml");
+    }
+
+    public void testConvertStringWithContentTypeText() throws Exception
+    {
+        doTestTextMessage("foo","text/foobar");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception
+    {
+        doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/foo+xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJson() throws Exception
+    {
+        doTestTextMessage("[]","application/json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooJson() throws Exception
+    {
+        doTestTextMessage("[]","application/foo+json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJavascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/javascript");
+    }
+
+    public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/ecmascript");
+    }
+
+    public void testConvertBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage("helloworld".getBytes(), "application/octet-stream");
+    }
+
+    public void testConvertBytesMessageBodyNoContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        doTest(messageContent,
+               null,
+               DataSection.class,
+               messageContent,
+               null,
+               null);
+    }
+
+    public void testConvertBytesMessageBodyUnknownContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        doTest(messageContent,
+               "my/bytes",
+               DataSection.class,
+               messageContent,
+               Symbol.valueOf("my/bytes"),
+               null);
+    }
+
+
+    public void testConvertEmptyBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage(new byte[0], "application/octet-stream");
+    }
+
+    public void testConvertJmsStreamMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = getJmsStreamMessageBytes(expected);
+
+        final String mimeType = "jms/stream-message";
+        doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+    }
+
+    public void testConvertAmqpListMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+        final String mimeType = "amqp/list";
+        doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+    }
+
+    public void testConvertAmqpListMessageBodyWithNonJmsContent() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D, Lists.newArrayList("nonJMSList"));
+        final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+        final String mimeType = "amqp/list";
+        doTestStreamMessage(messageBytes, mimeType, expected, null);
+    }
+
+    public void testConvertJmsMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = getJmsMapMessageBytes(expected);
+
+        doTestMapMessage(messageBytes, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+    }
+
+    public void testConvertAmqpMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+        doTestMapMessage(messageBytes, "amqp/map", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+    }
+
+    public void testConvertAmqpMapMessageBodyWithNonJmsContent() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", Collections.singletonList("nonJmsList"));
+        final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+        doTestMapMessage(messageBytes, "amqp/map", expected, null);
+    }
+
+    public void testConvertObjectStreamMessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        final byte[] expectedBytes = messageBytes;
+
+        doTestObjectMessage(messageBytes, "application/java-object-stream", expectedBytes);
+    }
+
+    public void testConvertObjectStream2MessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        final byte[] expectedBytes = messageBytes;
+
+        doTestObjectMessage(messageBytes, "application/x-java-serialized-object", expectedBytes);
+    }
+
+    public void testConvertEmptyObjectStreamMessageBody() throws Exception
+    {
+        final byte[] messageBytes = null;
+        final byte[] expectedBytes = getObjectStreamMessageBytes(messageBytes);
+        final String mimeType = "application/java-object-stream";
+
+        doTestObjectMessage(messageBytes, mimeType, expectedBytes);
+    }
+
+    public void testConvertEmptyMessageWithoutContentType() throws Exception
+    {
+        doTest(null, null, AmqpValueSection.class, null, null, JmsMessageTypeAnnotation.MESSAGE.getType());
+    }
+
+    public void testConvertEmptyMessageWithUnknownContentType() throws Exception
+    {
+        doTest(null, "foo/bar", DataSection.class, new byte[0], Symbol.valueOf("foo/bar"), null);
+    }
+
+    public void testConvertMessageWithoutContentType() throws Exception
+    {
+        final byte[] expectedContent = "someContent".getBytes(UTF_8);
+        doTest(expectedContent, null, DataSection.class, expectedContent, null, null);
+    }
+
+
+    private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+    {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(bos))
+        {
+            oos.writeObject(o);
+            return bos.toByteArray();
+        }
+    }
+
+    private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception
+    {
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+        for (Object o : objects)
+        {
+            writer.writeObject(o);
+        }
+        return getBytes(writer);
+    }
+
+    private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception
+    {
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+        writer.writeIntImpl(map.size());
+        for (Map.Entry<String, Object> entry : map.entrySet())
+        {
+            writer.writeNullTerminatedStringImpl(entry.getKey());
+            writer.writeObject(entry.getValue());
+        }
+        return getBytes(writer);
+    }
+
+    private byte[] getBytes(final TypedBytesContentWriter writer)
+    {
+        ByteBuffer buf = writer.getData();
+        final byte[] expected = new byte[buf.remaining()];
+        buf.get(expected);
+        return expected;
+    }
+
+    private List<EncodingRetainingSection<?>> getEncodingRetainingSections(final Collection<QpidByteBuffer> content,
+                                                                           final int expectedNumberOfSections)
+            throws Exception
+    {
+        SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
+        final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(content));
+        assertEquals("Unexpected number of sections", expectedNumberOfSections, sections.size());
+        return sections;
+    }
+
+    protected MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
+    {
+        configureMessageContent(expected);
+        configureMessageHeader(mimeType);
+
+        final MessageTransferMessage messageTransferMessage = new MessageTransferMessage(_handle, new Object());
+        return messageTransferMessage;
+    }
+
+    private void configureMessageHeader(final String mimeType)
+    {
+        when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+        _messageProperties.setContentType(mimeType);
+    }
+
+    private void configureMessageContent(byte[] section)
+    {
+        if (section == null)
+        {
+            section = new byte[0];
+        }
+        final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+        when(_handle.getContentSize()).thenReturn(section.length);
+        final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(_handle.getContent(offsetCaptor.capture(),
+                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        {
+            @Override
+            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+            {
+                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+                return Collections.singleton(view);
+            }
+        });
+    }
+
+    private Byte getJmsMessageTypeAnnotation(final Message_1_0 convertedMessage)
+    {
+        MessageAnnotationsSection messageAnnotationsSection = convertedMessage.getMessageAnnotationsSection();
+        if (messageAnnotationsSection != null)
+        {
+            Map<Symbol, Object> messageAnnotations = messageAnnotationsSection.getValue();
+            if (messageAnnotations != null)
+            {
+                Object annotation = messageAnnotations.get(Symbol.valueOf("x-opt-jms-msg-type"));
+                if (annotation instanceof Byte)
+                {
+                    return ((Byte) annotation);
+                }
+            }
+        }
+        return null;
+    }
+
+    private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
+    {
+        final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8);
+        String expectedContent = originalContent == null ? null : originalContent;
+        doTest(contentBytes,
+               mimeType,
+               AmqpValueSection.class,
+               expectedContent,
+               Symbol.valueOf(mimeType),
+               JmsMessageTypeAnnotation.TEXT_MESSAGE.getType());
+    }
+
+
+    private void doTestMapMessage(final byte[] messageBytes,
+                                  final String mimeType,
+                                  final Map<String, Object> expected,
+                                  final Byte expectedJmsTypeAnnotation) throws Exception
+    {
+        doTest(messageBytes, mimeType, AmqpValueSection.class, expected, null, expectedJmsTypeAnnotation);
+    }
+
+    private void doTestBytesMessage(final byte[] messageContent, final String mimeType) throws Exception
+    {
+        doTest(messageContent,
+               mimeType,
+               DataSection.class,
+               messageContent,
+               Symbol.valueOf(mimeType),
+               JmsMessageTypeAnnotation.BYTES_MESSAGE.getType());
+    }
+
+    private void doTestStreamMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final List<Object> expected,
+                                     final Byte expectedJmsTypAnnotation) throws Exception
+    {
+        doTest(messageBytes, mimeType, AmqpSequenceSection.class, expected, null, expectedJmsTypAnnotation);
+    }
+
+    private void doTestObjectMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final byte[] expectedBytes)
+            throws Exception
+    {
+        doTest(messageBytes,
+               mimeType,
+               DataSection.class,
+               expectedBytes,
+               Symbol.valueOf("application/x-java-serialized-object"),
+               JmsMessageTypeAnnotation.OBJECT_MESSAGE.getType());
+    }
+
+    private void doTest(final byte[] messageBytes,
+                        final String mimeType,
+                        final Class<? extends EncodingRetainingSection<?>> expectedBodySection,
+                        final Object expectedContent,
+                        final Symbol expectedContentType,
+                        final Byte expectedJmsTypeAnnotation) throws Exception
+    {
+        final MessageTransferMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
+        EncodingRetainingSection<?> encodingRetainingSection = sections.get(0);
+        assertEquals("Unexpected section type", expectedBodySection, encodingRetainingSection.getClass());
+
+        if (expectedContent instanceof byte[])
+        {
+            assertArrayEquals("Unexpected content",
+                              ((byte[]) expectedContent),
+                              ((Binary) encodingRetainingSection.getValue()).getArray());
+        }
+        else
+        {
+            assertEquals("Unexpected content", expectedContent, encodingRetainingSection.getValue());
+        }
+
+        Symbol contentType = getContentType(convertedMessage);
+        if (expectedContentType == null)
+        {
+            assertNull("Content type should be null", contentType);
+        }
+        else
+        {
+            assertEquals("Unexpected content type", expectedContentType, contentType);
+        }
+
+        Byte jmsMessageTypeAnnotation = getJmsMessageTypeAnnotation(convertedMessage);
+        if (expectedJmsTypeAnnotation == null)
+        {
+            assertNull("Unexpected annotation 'x-opt-jms-msg-type'", jmsMessageTypeAnnotation);
+        }
+        else
+        {
+            assertEquals("Unexpected annotation 'x-opt-jms-msg-type'",
+                         expectedJmsTypeAnnotation,
+                         jmsMessageTypeAnnotation);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve AMQP 0-8 to 1.0 content conversion and unit tests

lquack
QPID-7434: [Java Broker] Improve AMQP 0-8 to 1.0 content conversion and unit tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/20c9c58c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/20c9c58c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/20c9c58c

Branch: refs/heads/master
Commit: 20c9c58c9469abd6398f16644c5b0e5c751a06dc
Parents: 67aa48f
Author: Alex Rudyy <[hidden email]>
Authored: Tue Aug 8 13:10:23 2017 +0100
Committer: Lorenz Quack <[hidden email]>
Committed: Wed Aug 9 14:30:59 2017 +0100

----------------------------------------------------------------------
 .../mimecontentconverter/IdentityConverter.java |  62 ++++
 .../MimeContentConverterRegistry.java           |   3 +-
 .../v1_0/MessageConverter_from_1_0.java         |  14 +-
 .../protocol/v1_0/MessageConverter_to_1_0.java  | 226 ++++++++++++-
 .../MessageConverter_1_0_to_v0_10Test.java      | 159 ++++++++-
 .../v0_8_v1_0/MessageConverter_0_8_to_1_0.java  |  21 +-
 .../MessageConverter_0_8_to_1_0Test.java        | 330 ++++++++++++++++---
 .../MessageConverter_1_0_to_v0_8Test.java       | 156 ++++++++-
 8 files changed, 880 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java
new file mode 100644
index 0000000..3b82582
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message.mimecontentconverter;
+
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class IdentityConverter implements ObjectToMimeContentConverter<Object>
+{
+    @Override
+    public String getType()
+    {
+        return getMimeType();
+    }
+
+    @Override
+    public String getMimeType()
+    {
+        return null;
+    }
+
+    @Override
+    public Class<Object> getObjectClass()
+    {
+        return Object.class;
+    }
+
+    @Override
+    public int getRank()
+    {
+        return Integer.MIN_VALUE;
+    }
+
+    @Override
+    public boolean isAcceptable(final Object object)
+    {
+        return object == null;
+    }
+
+    @Override
+    public byte[] toMimeContent(final Object object)
+    {
+        return new byte[0];
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
index b580557..79cbba7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
@@ -72,7 +72,7 @@ public class MimeContentConverterRegistry
             }
             classToMineConverters.put(objectClass, converter);
         }
-        classToMineConverters.put(Void.class, new StringToTextPlain());
+        classToMineConverters.put(Void.class, new IdentityConverter());
         return ImmutableMultimap.copyOf(classToMineConverters);
     }
 
@@ -137,6 +137,7 @@ public class MimeContentConverterRegistry
                 }
             }
         }
+
         return converter;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index 71add9e..62230d8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -76,11 +76,12 @@ public class MessageConverter_from_1_0
                                                                                         byte[].class,
                                                                                         UUID.class));
 
-    private static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
-    private static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
-    private static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
-    private static final Pattern
+    public static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
+    public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
+    public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
+    public static final Pattern
             OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$");
+    public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$");
 
     static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
@@ -320,6 +321,11 @@ public class MessageConverter_from_1_0
                 // the AMQP 0-x client does not accept the "application/x-java-serialized-object" mimeTypes so use fall back
                 supportedContentType = "application/java-object-stream";
             }
+            else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            {
+                contentTypeClassHint = byte[].class;
+                supportedContentType = "application/octet-stream";
+            }
 
             if (classHint == null || classHint == contentTypeClassHint)
             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 53c6666..09b9964 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -20,9 +20,22 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.BYTES_MESSAGE;
+import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MAP_MESSAGE;
+import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MESSAGE;
+import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.OBJECT_MESSAGE;
+import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.STREAM_MESSAGE;
+import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.TEXT_MESSAGE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.ListIterator;
@@ -30,8 +43,8 @@ import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
+import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
@@ -39,20 +52,163 @@ import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.NonEncodingRetainingSection;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.GZIPUtils;
 
 public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0>
 {
+    private static final byte[] SERIALIZED_NULL = getObjectBytes(null);
     private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
-                                                                                         .registerTransportLayer()
-                                                                                         .registerMessagingLayer()
-                                                                                         .registerTransactionLayer()
-                                                                                         .registerSecurityLayer();
+                                                                                     .registerTransportLayer()
+                                                                                     .registerMessagingLayer()
+                                                                                     .registerTransactionLayer()
+                                                                                     .registerSecurityLayer();
+
+    public static Symbol getContentType(final String contentMimeType, final EncodingRetainingSection<?> bodySection)
+    {
+        Symbol contentType = null;
+        if (contentMimeType != null)
+        {
+            if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                contentType = Symbol.valueOf(contentMimeType);
+            }
+            else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                contentType = Symbol.valueOf("application/octet-stream");
+            }
+            else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                contentType = null;
+            }
+            else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                contentType = null;
+            }
+            else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                contentType = Symbol.valueOf("application/x-java-serialized-object");
+            }
+            else
+            {
+                contentType = Symbol.valueOf(contentMimeType);
+            }
+        }
+        return contentType;
+    }
+
+    public static MessageAnnotations createMessageAnnotation(final EncodingRetainingSection<?> bodySection,
+                                                             final String contentMimeType)
+    {
+        MessageAnnotations messageAnnotations = null;
+        final Symbol key = Symbol.valueOf("x-opt-jms-msg-type");
+        if (contentMimeType != null)
+        {
+            if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, TEXT_MESSAGE.getType()));
+            }
+            else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, BYTES_MESSAGE.getType()));
+            }
+            else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                if (isSectionValidForJmsMap(bodySection))
+                {
+                    messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MAP_MESSAGE.getType()));
+                }
+            }
+            else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                if (isSectionValidForJmsList(bodySection))
+                {
+                    messageAnnotations =
+                            new MessageAnnotations(Collections.singletonMap(key, STREAM_MESSAGE.getType()));
+                }
+            }
+            else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            {
+                messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, OBJECT_MESSAGE.getType()));
+            }
+        }
+        else if (bodySection instanceof AmqpValueSection && bodySection.getValue() == null)
+        {
+            messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MESSAGE.getType()));
+        }
+        return messageAnnotations;
+    }
+
+    private static boolean isSectionValidForJmsList(final EncodingRetainingSection<?> section)
+    {
+        if (section instanceof AmqpSequenceSection)
+        {
+            final List<?> list = ((AmqpSequenceSection) section).getValue();
+            for (Object entry: list)
+            {
+                if (!(entry == null
+                      || entry instanceof Boolean
+                      || entry instanceof Byte
+                      || entry instanceof Short
+                      || entry instanceof Integer
+                      || entry instanceof Long
+                      || entry instanceof Float
+                      || entry instanceof Double
+                      || entry instanceof Character
+                      || entry instanceof String
+                      || entry instanceof Binary))
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private static boolean isSectionValidForJmsMap(final EncodingRetainingSection<?> section)
+    {
+        if (section instanceof AmqpValueSection)
+        {
+            final Object valueObject = ((AmqpValueSection) section).getValue();
+            if (valueObject instanceof Map)
+            {
+                final Map<?, ?> map = (Map) valueObject;
+                for (Map.Entry<?,?> entry: map.entrySet())
+                {
+                    if (!(entry.getKey() instanceof String))
+                    {
+                        return false;
+                    }
+                    Object value = entry.getValue();
+                    if (!(value == null
+                          || value instanceof Boolean
+                          || value instanceof Byte
+                          || value instanceof Short
+                          || value instanceof Integer
+                          || value instanceof Long
+                          || value instanceof Float
+                          || value instanceof Double
+                          || value instanceof Character
+                          || value instanceof String
+                          || value instanceof Binary))
+                    {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+        return false;
+    }
 
     @Override
     public final Class<Message_1_0> getOutputClass()
@@ -92,25 +248,45 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
 
     private static NonEncodingRetainingSection<?> convertMessageBody(String mimeType, byte[] data)
     {
-
-        MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType);
-        if (converter != null)
+        if (data != null && data.length != 0)
         {
-            Object bodyObject = converter.toObject(data);
 
-            if (bodyObject instanceof String)
+            MimeContentToObjectConverter converter =
+                    MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType);
+            if (converter != null)
             {
-                return new AmqpValue(bodyObject);
-            }
-            else if (bodyObject instanceof Map)
-            {
-                return new AmqpValue(fixMapValues((Map<String, Object>) bodyObject));
+                Object bodyObject = converter.toObject(data);
+
+                if (bodyObject instanceof String)
+                {
+                    return new AmqpValue(bodyObject);
+                }
+                else if (bodyObject instanceof Map)
+                {
+                    return new AmqpValue(fixMapValues((Map<String, Object>) bodyObject));
+                }
+                else if (bodyObject instanceof List)
+                {
+                    return new AmqpSequence(fixListValues((List<Object>) bodyObject));
+                }
             }
-            else if (bodyObject instanceof List)
+            else if (mimeType != null && MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
             {
-                return new AmqpValue(fixListValues((List<Object>) bodyObject));
+                return new AmqpValue(new String(data, UTF_8));
             }
         }
+        else if (mimeType == null)
+        {
+            return new AmqpValue(null);
+        }
+        else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return new Data(new Binary(SERIALIZED_NULL));
+        }
+        else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return new AmqpValue(null);
+        }
         return new Data(new Binary(data));
     }
 
@@ -188,6 +364,22 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         return convertMessageBody(mimeType, data).createEncodingRetainingSection();
     }
 
+    private static byte[] getObjectBytes(final Object object)
+    {
+        final byte[] expected;
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos))
+        {
+            oos.writeObject(object);
+            expected = baos.toByteArray();
+        }
+        catch (IOException e)
+        {
+            throw new IllegalStateException(e);
+        }
+        return expected;
+    }
+
     private static class ConvertedMessage<M extends ServerMessage> implements StoredMessage<MessageMetaData_1_0>
     {
         private final MessageMetaData_1_0 _metaData;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java
index 9e59fe6..3e83f73 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java
@@ -93,11 +93,12 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase
         _converter = new MessageConverter_1_0_to_v0_10();
     }
 
-    public void testAmqpValueWithNull() throws Exception
+    public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception
     {
         final Object expected = null;
         final AmqpValue amqpValue = new AmqpValue(expected);
-        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+        Message_1_0 sourceMessage =
+                createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
 
         final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
@@ -105,16 +106,156 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase
         assertEquals("Unexpected content size", 0, convertedMessage.getSize());
     }
 
-    public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception
+    public void testAmqpValueWithNullWithMessageAnnotation() throws Exception
     {
         final Object expected = null;
         final AmqpValue amqpValue = new AmqpValue(expected);
         Message_1_0 sourceMessage =
-                createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+                createTestMessage(MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
 
         final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getSize());
+    }
+
+    public void testAmqpValueWithNullWithObjectMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(OBJECT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertArrayEquals("Unexpected content size", getObjectBytes(null), getBytes(content));
+    }
+
+    public void testAmqpValueWithNullWithMapMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(MAP_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+        assertArrayEquals("Unexpected content size",
+                          new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()),
+                          getBytes(content));
+    }
+
+    public void testAmqpValueWithNullWithBytesMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(BYTE_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/octet-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getSize());
+    }
+
+    public void testAmqpValueWithNullWithStreamMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(STREAM_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getSize());
+    }
+
+    public void testAmqpValueWithNullWithUnknownMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage =
+                createTestMessage(new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"),
+                                                                                  (byte) 11)),
+                                  amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getSize());
+    }
+
+    public void testAmqpValueWithNullWithContentTypeApplicationOctetStream() throws Exception
+    {
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/octet-stream"));
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getSize());
+    }
+
+    public void testAmqpValueWithNullWithObjectMessageContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+
+        assertEquals("Unexpected content size",
+                     getObjectBytes(null).length,
+                     convertedMessage.getSize());
+    }
+
+    public void testAmqpValueWithNullWithJmsMapContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("jms/map-message"));
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+
+        assertArrayEquals("Unexpected content size",
+                          new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()),
+                          getBytes(content));
+    }
+
+
+
+
+    public void testAmqpValueWithNull() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getSize());
     }
 
@@ -630,7 +771,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase
 
         final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getSize());
     }
 
@@ -702,7 +843,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase
 
         final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getSize());
     }
 
@@ -712,7 +853,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase
 
         final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getSize());
     }
 
@@ -724,7 +865,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase
 
         final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getSize());
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
index 01597f2..ba3526c 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
@@ -25,12 +25,12 @@ import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
-import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
-import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
 import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
 import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
@@ -43,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 import org.apache.qpid.server.url.AMQBindingURL;
 import org.apache.qpid.server.util.GZIPUtils;
@@ -81,13 +82,8 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess
             props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
         }
 
-        props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
-
-        // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
-        if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
-        {
-            props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
-        }
+        Symbol contentType = getContentType(contentHeader.getContentTypeAsString(), bodySection);
+        props.setContentType(contentType);
 
         final AMQShortString correlationId = contentHeader.getCorrelationId();
         if(correlationId != null)
@@ -207,9 +203,12 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess
             throw new MessageConversionException("Could not convert message from 0-8 to 1.0 because headers conversion failed.", e);
         }
 
+        MessageAnnotations messageAnnotations = createMessageAnnotation(bodySection,
+                                                                        contentHeader.getContentTypeAsString());
+
         return new MessageMetaData_1_0(header.createEncodingRetainingSection(),
                                        null,
-                                       null,
+                                       messageAnnotations == null ? null : messageAnnotations.createEncodingRetainingSection(),
                                        props.createEncodingRetainingSection(),
                                        applicationProperties.createEncodingRetainingSection(),
                                        null,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
index 3ef2e7d..62dc8f4d 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getContentType;
 import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -43,17 +45,25 @@ import org.mockito.stubbing.Answer;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
 import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation;
 import org.apache.qpid.server.protocol.v1_0.Message_1_0;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.typedmessage.TypedBytesContentWriter;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -87,78 +97,195 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
 
     public void testConvertStringMessageBody() throws Exception
     {
-        final String expected = "helloworld";
+        doTestTextMessage("helloworld", "text/plain");
+    }
 
-        final AMQMessage sourceMessage = getAmqMessage(expected.getBytes(), "text/plain");
+    public void testConvertEmptyStringMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/plain");
+    }
 
-        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+    public void testConvertStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>", "text/xml");
+    }
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+    public void testConvertEmptyStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/xml");
+    }
 
-        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
-        assertEquals(expected, sections.get(0).getValue());
+    public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "application/xml");
+    }
+
+    public void testConvertStringWithContentTypeText() throws Exception
+    {
+        doTestTextMessage("foo","text/foobar");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception
+    {
+        doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/foo+xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJson() throws Exception
+    {
+        doTestTextMessage("[]","application/json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooJson() throws Exception
+    {
+        doTestTextMessage("[]","application/foo+json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJavascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/javascript");
+    }
+
+    public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/ecmascript");
     }
 
     public void testConvertBytesMessageBody() throws Exception
     {
-        final byte[] expected = "helloworld".getBytes();
+        doTestBytesMessage("helloworld".getBytes(), "application/octet-stream");
+    }
 
-        final AMQMessage sourceMessage = getAmqMessage(expected, "application/octet-stream");
+    public void testConvertBytesMessageBodyNoContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        doTest(messageContent,
+               null,
+               DataSection.class,
+               messageContent,
+               null,
+               null);
+    }
 
-        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+    public void testConvertBytesMessageBodyUnknownContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        doTest(messageContent,
+               "my/bytes",
+               DataSection.class,
+               messageContent,
+               Symbol.valueOf("my/bytes"),
+               null);
+    }
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
-        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
-        final Binary value = (Binary) sections.get(0).getValue();
-        assertArrayEquals(expected, value.getArray());
+    public void testConvertEmptyBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage(new byte[0], "application/octet-stream");
     }
 
-    public void testConvertListMessageBody() throws Exception
+    public void testConvertJmsStreamMessageBody() throws Exception
     {
-        final List<Object> expected = Lists.<Object>newArrayList("apple", 43, 31.42D);
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
         final byte[] messageBytes = getJmsStreamMessageBytes(expected);
 
-        final AMQMessage sourceMessage = getAmqMessage(messageBytes, "jms/stream-message");
+        final String mimeType = "jms/stream-message";
+        doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+    }
 
-        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+    public void testConvertAmqpListMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final String mimeType = "amqp/list";
+        doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+    }
 
-        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
-        assertEquals(expected, sections.get(0).getValue());
+    public void testConvertAmqpListMessageBodyWithNonJmsContent() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D, Lists.newArrayList("nonJMSList"));
+        final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+        final String mimeType = "amqp/list";
+        doTestStreamMessage(messageBytes, mimeType, expected, null);
     }
 
-    public void testConvertMapMessageBody() throws Exception
+    public void testConvertJmsMapMessageBody() throws Exception
     {
-        final Map<String, Object> expected = Collections.<String, Object>singletonMap("key", "value");
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
         final byte[] messageBytes = getJmsMapMessageBytes(expected);
 
-        final AMQMessage sourceMessage = getAmqMessage(messageBytes, "jms/map-message");
+        doTestMapMessage(messageBytes, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+    }
 
-        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+    public void testConvertAmqpMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        doTestMapMessage(messageBytes, "amqp/map", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+    }
 
-        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
-        assertEquals(expected, sections.get(0).getValue());
+    public void testConvertAmqpMapMessageBodyWithNonJmsContent() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", Collections.singletonList("nonJmsList"));
+        final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+        doTestMapMessage(messageBytes, "amqp/map", expected, null);
     }
 
     public void testConvertObjectStreamMessageBody() throws Exception
     {
         final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        final byte[] expectedBytes = messageBytes;
 
-        final AMQMessage sourceMessage = getAmqMessage(messageBytes, "application/java-object-stream");
+        doTestObjectMessage(messageBytes, "application/java-object-stream", expectedBytes);
+    }
 
-        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+    public void testConvertObjectStream2MessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        final byte[] expectedBytes = messageBytes;
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        doTestObjectMessage(messageBytes, "application/x-java-serialized-object", expectedBytes);
+    }
 
-        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
-        final Binary value = (Binary) sections.get(0).getValue();
-        assertArrayEquals(messageBytes, value.getArray());
+    public void testConvertEmptyObjectStreamMessageBody() throws Exception
+    {
+        final byte[] messageBytes = null;
+        final byte[] expectedBytes = getObjectStreamMessageBytes(messageBytes);
+        final String mimeType = "application/java-object-stream";
+
+        doTestObjectMessage(messageBytes, mimeType, expectedBytes);
+    }
+
+    public void testConvertEmptyMessageWithoutContentType() throws Exception
+    {
+        doTest(null, null, AmqpValueSection.class, null, null, JmsMessageTypeAnnotation.MESSAGE.getType());
     }
 
+    public void testConvertEmptyMessageWithUnknownContentType() throws Exception
+    {
+        doTest(null, "foo/bar", DataSection.class, new byte[0], Symbol.valueOf("foo/bar"), null);
+    }
+
+    public void testConvertMessageWithoutContentType() throws Exception
+    {
+        final byte[] expectedContent = "someContent".getBytes(UTF_8);
+        doTest(expectedContent, null, DataSection.class, expectedContent, null, null);
+    }
+
+
     private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
     {
         try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -172,18 +299,18 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
     private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception
     {
         TypedBytesContentWriter writer = new TypedBytesContentWriter();
-        for(Object o : objects)
+        for (Object o : objects)
         {
             writer.writeObject(o);
         }
         return getBytes(writer);
     }
 
-    private byte[] getJmsMapMessageBytes(Map<String,Object> map) throws Exception
+    private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception
     {
         TypedBytesContentWriter writer = new TypedBytesContentWriter();
         writer.writeIntImpl(map.size());
-        for(Map.Entry<String, Object> entry : map.entrySet())
+        for (Map.Entry<String, Object> entry : map.entrySet())
         {
             writer.writeNullTerminatedStringImpl(entry.getKey());
             writer.writeObject(entry.getValue());
@@ -207,7 +334,6 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
         final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(content));
         assertEquals("Unexpected number of sections", expectedNumberOfSections, sections.size());
         return sections;
-
     }
 
     protected AMQMessage getAmqMessage(final byte[] expected, final String mimeType)
@@ -221,16 +347,22 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
     private void configureMessageHeader(final String mimeType)
     {
         when(_header.getMimeType()).thenReturn(mimeType);
+        when(_basicContentHeaderProperties.getContentTypeAsString()).thenReturn(mimeType);
     }
 
-    private void configureMessageContent(final byte[] section)
+    private void configureMessageContent(byte[] section)
     {
+        if (section == null)
+        {
+            section = new byte[0];
+        }
         final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
-        when(_handle.getContentSize()).thenReturn((int) section.length);
+        when(_handle.getContentSize()).thenReturn(section.length);
         final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
         final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
 
-        when(_handle.getContent(offsetCaptor.capture(), sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        when(_handle.getContent(offsetCaptor.capture(),
+                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
         {
             @Override
             public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
@@ -241,4 +373,122 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
         });
     }
 
+    private Byte getJmsMessageTypeAnnotation(final Message_1_0 convertedMessage)
+    {
+        MessageAnnotationsSection messageAnnotationsSection = convertedMessage.getMessageAnnotationsSection();
+        if (messageAnnotationsSection != null)
+        {
+            Map<Symbol, Object> messageAnnotations = messageAnnotationsSection.getValue();
+            if (messageAnnotations != null)
+            {
+                Object annotation = messageAnnotations.get(Symbol.valueOf("x-opt-jms-msg-type"));
+                if (annotation instanceof Byte)
+                {
+                    return ((Byte) annotation);
+                }
+            }
+        }
+        return null;
+    }
+
+    private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
+    {
+        final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8);
+        String expectedContent = originalContent == null ? null : originalContent;
+        doTest(contentBytes,
+               mimeType,
+               AmqpValueSection.class,
+               expectedContent,
+               Symbol.valueOf(mimeType),
+               JmsMessageTypeAnnotation.TEXT_MESSAGE.getType());
+    }
+
+
+    private void doTestMapMessage(final byte[] messageBytes,
+                                  final String mimeType,
+                                  final Map<String, Object> expected,
+                                  final Byte expectedJmsTypeAnnotation) throws Exception
+    {
+        doTest(messageBytes, mimeType, AmqpValueSection.class, expected, null, expectedJmsTypeAnnotation);
+    }
+
+    private void doTestBytesMessage(final byte[] messageContent, final String mimeType) throws Exception
+    {
+        doTest(messageContent,
+               mimeType,
+               DataSection.class,
+               messageContent,
+               Symbol.valueOf(mimeType),
+               JmsMessageTypeAnnotation.BYTES_MESSAGE.getType());
+    }
+
+    private void doTestStreamMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final List<Object> expected,
+                                     final Byte expectedJmsTypAnnotation) throws Exception
+    {
+        doTest(messageBytes, mimeType, AmqpSequenceSection.class, expected, null, expectedJmsTypAnnotation);
+    }
+
+    private void doTestObjectMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final byte[] expectedBytes)
+            throws Exception
+    {
+        doTest(messageBytes,
+               mimeType,
+               DataSection.class,
+               expectedBytes,
+               Symbol.valueOf("application/x-java-serialized-object"),
+               JmsMessageTypeAnnotation.OBJECT_MESSAGE.getType());
+    }
+
+    private void doTest(final byte[] messageBytes,
+                        final String mimeType,
+                        final Class<? extends EncodingRetainingSection<?>> expectedBodySection,
+                        final Object expectedContent,
+                        final Symbol expectedContentType,
+                        final Byte expectedJmsTypeAnnotation) throws Exception
+    {
+        final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+        final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
+        EncodingRetainingSection<?> encodingRetainingSection = sections.get(0);
+        assertEquals("Unexpected section type", expectedBodySection, encodingRetainingSection.getClass());
+
+        if (expectedContent instanceof byte[])
+        {
+            assertArrayEquals("Unexpected content",
+                              ((byte[]) expectedContent),
+                              ((Binary) encodingRetainingSection.getValue()).getArray());
+        }
+        else
+        {
+            assertEquals("Unexpected content", expectedContent, encodingRetainingSection.getValue());
+        }
+
+        Symbol contentType = getContentType(convertedMessage);
+        if (expectedContentType == null)
+        {
+            assertNull("Content type should be null", contentType);
+        }
+        else
+        {
+            assertEquals("Unexpected content type", expectedContentType, contentType);
+        }
+
+        Byte jmsMessageTypeAnnotation = getJmsMessageTypeAnnotation(convertedMessage);
+        if (expectedJmsTypeAnnotation == null)
+        {
+            assertNull("Unexpected annotation 'x-opt-jms-msg-type'", jmsMessageTypeAnnotation);
+        }
+        else
+        {
+            assertEquals("Unexpected annotation 'x-opt-jms-msg-type'",
+                         expectedJmsTypeAnnotation,
+                         jmsMessageTypeAnnotation);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
index d272c87..c9d17cc 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
@@ -93,11 +93,12 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         _converter = new MessageConverter_1_0_to_v0_8();
     }
 
-    public void testAmqpValueWithNull() throws Exception
+    public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception
     {
         final Object expected = null;
         final AmqpValue amqpValue = new AmqpValue(expected);
-        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+        Message_1_0 sourceMessage =
+                createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
@@ -105,16 +106,153 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
     }
 
-    public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception
+    public void testAmqpValueWithNullWithMessageAnnotation() throws Exception
     {
         final Object expected = null;
         final AmqpValue amqpValue = new AmqpValue(expected);
         Message_1_0 sourceMessage =
-                createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+                createTestMessage(MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithObjectMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(OBJECT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertArrayEquals("Unexpected content size", getObjectBytes(null), getBytes(content));
+    }
+
+    public void testAmqpValueWithNullWithMapMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(MAP_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+        assertArrayEquals("Unexpected content size",
+                          new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()),
+                          getBytes(content));
+    }
+
+    public void testAmqpValueWithNullWithBytesMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(BYTE_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/octet-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithStreamMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(STREAM_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithUnknownMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage =
+                createTestMessage(new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"),
+                                                                                  (byte) 11)),
+                                  amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithContentTypeApplicationOctetStream() throws Exception
+    {
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/octet-stream"));
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithObjectMessageContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+
+        assertEquals("Unexpected content size",
+                     getObjectBytes(null).length,
+                     convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithJmsMapContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("jms/map-message"));
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+
+        assertArrayEquals("Unexpected content size",
+                          new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()),
+                          getBytes(content));
+    }
+
+    public void testAmqpValueWithNull() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
     }
 
@@ -630,7 +768,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
     }
 
@@ -702,7 +840,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
     }
 
@@ -712,7 +850,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
     }
 
@@ -724,7 +862,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType());
         assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...