[Qpid JMS client] Asynchronous send JMS 2.0

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[Qpid JMS client] Asynchronous send JMS 2.0

Vavricka
Hi,

  qpid jms client version - 0.23.0
  qpid c++ broker version - 0.34

  I am unable to send messages asynchronously using the JMS 2.0 API.

  Connection string is same for JMS 1.1 and JMS 2.0 - "amqp://host:20405?jms.username=admin&jms.password=admin&amqp.traceFrames=true&jms.forceAsyncSend=true&jms.forceAsyncAcks=true"

  When I use JMS 1.1 API, it works ok.

Code for JMS 1.1 below.

try
{
    Topic queue = (Topic) initialContext.lookup("node");
    Connection connection = ((ConnectionFactory) initialContext.lookup("connection")).createConnection();
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    MessageProducer producer = session.createProducer(queue);
    connection.start();
    for (int i = 0; i < 500; i++)
    {
        producer.send(session.createTextMessage("test"));
    }
}
catch
...

 However when I use JMS 2.0 API, sending is extremely slow (1 message per second).

Code for JMS 2.0 API below.

try (JMSContext context = connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE))
{
    JMSProducer producer = context.createProducer();
    producer.setAsync(new SendCompletitionListener());
    for (int i = 0; i < senderOptionParser.getMessageCountValue(); i++)
    {
        producer.send(queue, context.createTextMessage("test"));
    }
}
catch
...

Part of trace output of JMS 1.1 during one message sent:

{Output before producer.send(session.createTextMessage("test"));}
...
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending message: JmsOutboundMessageDispatch {dispatchId = ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1, MessageID = ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1 }
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0, deliveryId=0, deliveryTag=0, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of: 2283 bytes
...
{Output after producer.send(session.createTextMessage("test"));}

Part of trace output of JMS 2.0 during one message sent:

{Output before producer.send(queue, context.createTextMessage("test"));}
...
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=[DELAYED_DELIVERY], properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of: 324 bytes
[epollEventLoopGroup-2-1] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 212 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 212, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast', handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=500, available=null, drain=false, echo=false, properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_INIT
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_LOCAL_OPEN
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_REMOTE_OPEN
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Open phase of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending message: JmsOutboundMessageDispatch {dispatchId = ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2, MessageID = ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2 }
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0, deliveryId=1, deliveryTag=0, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of: 2284 bytes
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
[epollEventLoopGroup-2-1] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 45 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 45, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=3, incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647, handle=0, deliveryCount=1, linkCredit=500, available=null, drain=false, echo=false, properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
[epollEventLoopGroup-2-1] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 38 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 38, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Disposition{role=RECEIVER, first=1, last=1, settled=true, state=Accepted{}, batchable=false}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: DELIVERY
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Outcome of delivery was accepted: DeliveryImpl [_tag=[48], _link=org.apache.qpid.proton.engine.impl.SenderImpl@67de5132, _deliveryState=null, _settled=false, _remoteSettled=true, _remoteDeliveryState=Accepted{}, _flags=0, _defaultDeliveryState=null, _transportDelivery=org.apache.qpid.proton.engine.impl.TransportDelivery@1303044f, _dataSize=0, _complete=true, _updated=true, _done=true, _offset=0]
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Send phase of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer { ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } requesting close on remote.
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_LOCAL_CLOSE
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Detach{handle=0, closed=true, error=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of: 16 bytes
[epollEventLoopGroup-2-1] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 24 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Detach{handle=0, closed=true, error=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_REMOTE_CLOSE
[AmqpProvider:(1):[amqp://host:20405]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer { ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } is now closed:
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Close phase of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FINAL
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Started send chain for anonymous producer: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] DEBUG org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder - Creating AmqpFixedProducer for: broadcast
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2:broadcast', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=[DELAYED_DELIVERY], properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of: 324 bytes
...
{Output after producer.send(queue, context.createTextMessage("test"));}

It seems to me that in JMS 2.0 api producer closes link after each send of message and then opens a new one when producer wants to send another message.

Am I doing something wrong with JMS 2.0 API?

Tomas
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Qpid JMS client] Asynchronous send JMS 2.0

Timothy Bish
On 07/14/2017 09:34 AM, Vavricka wrote:

> Hi,
>
>    qpid jms client version - 0.23.0
>    qpid c++ broker version - 0.34
>
>    I am unable to send messages asynchronously using the JMS 2.0 API.
>
>    Connection string is same for JMS 1.1 and JMS 2.0 -
> "amqp://host:20405?jms.username=admin&jms.password=admin&amqp.traceFrames=true&jms.forceAsyncSend=true&jms.forceAsyncAcks=true"
>
>    When I use JMS 1.1 API, it works ok.
>
> *Code for JMS 1.1 below.*
>
> try
> {
>      Topic queue = (Topic) initialContext.lookup("node");
>      Connection connection = ((ConnectionFactory)
> initialContext.lookup("connection")).createConnection();
>      Session session = connection.createSession(false,
> Session.CLIENT_ACKNOWLEDGE);
>      MessageProducer producer = session.createProducer(queue);
>      connection.start();
>      for (int i = 0; i < 500; i++)
>      {
>          producer.send(session.createTextMessage("test"));
>      }
> }
> catch
> ...
>
>   However when I use JMS 2.0 API, sending is extremely slow (1 message per
> second).
>
> *Code for JMS 2.0 API below.*
>
> try (JMSContext context =
> connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE))
> {
>      JMSProducer producer = context.createProducer();
>      producer.setAsync(new SendCompletitionListener());
>      for (int i = 0; i < senderOptionParser.getMessageCountValue(); i++)
>      {
>          producer.send(queue, context.createTextMessage("test"));
>      }
> }
> catch
> ...
>
> *Part of trace output of JMS 1.1 during one message sent:*
>
> {Output before producer.send(session.createTextMessage("test"));}
> ...
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
> message: JmsOutboundMessageDispatch {dispatchId =
> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1, MessageID =
> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1 }
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
> deliveryId=0, deliveryTag=0, messageFormat=0, settled=null, more=false,
> rcvSettleMode=null, state=null, resume=false, aborted=false,
> batchable=false}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 2283 bytes
> ...
> {Output after producer.send(session.createTextMessage("test"));}
>
> *Part of trace output of JMS 2.0 during one message sent:*
>
> {Output before producer.send(queue, context.createTextMessage("test"));}
> ...
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1',
> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
> dynamicNodeProperties=null, distributionMode=null, filter=null,
> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
> amqp:released:list, amqp:modified:list], capabilities=null},
> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
> maxMessageSize=null, offeredCapabilities=null,
> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 324 bytes
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 212
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 212, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
> handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST,
> source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
> filter=null, defaultOutcome=null, outcomes=null, capabilities=null},
> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
> maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
> properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=2,
> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
> handle=0, deliveryCount=0, linkCredit=500, available=null, drain=false,
> echo=false, properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_INIT
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_LOCAL_OPEN
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_REMOTE_OPEN
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Open phase
> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
> message: JmsOutboundMessageDispatch {dispatchId =
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2, MessageID =
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2 }
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
> deliveryId=1, deliveryTag=0, messageFormat=0, settled=null, more=false,
> rcvSettleMode=null, state=null, resume=false, aborted=false,
> batchable=false}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 2284 bytes
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 45
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 45, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=3,
> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
> handle=0, deliveryCount=1, linkCredit=500, available=null, drain=false,
> echo=false, properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 38
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 38, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Disposition{role=RECEIVER,
> first=1, last=1, settled=true, state=Accepted{}, batchable=false}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: DELIVERY
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Outcome of delivery
> was accepted: DeliveryImpl [_tag=[48],
> _link=org.apache.qpid.proton.engine.impl.SenderImpl@67de5132,
> _deliveryState=null, _settled=false, _remoteSettled=true,
> _remoteDeliveryState=Accepted{}, _flags=0, _defaultDeliveryState=null,
> _transportDelivery=org.apache.qpid.proton.engine.impl.TransportDelivery@1303044f,
> _dataSize=0, _complete=true, _updated=true, _done=true, _offset=0]
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Send phase
> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer {
> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } requesting close on remote.
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_LOCAL_CLOSE
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Detach{handle=0,
> closed=true, error=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 16 bytes
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 24
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Detach{handle=0,
> closed=true, error=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_REMOTE_CLOSE
> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer {
> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } is now closed:
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Close
> phase of anonymous send complete:
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_FINAL
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Started
> send chain for anonymous producer:
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
> org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder - Creating
> AmqpFixedProducer for: broadcast
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2:broadcast',
> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2',
> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
> dynamicNodeProperties=null, distributionMode=null, filter=null,
> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
> amqp:released:list, amqp:modified:list], capabilities=null},
> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
> maxMessageSize=null, offeredCapabilities=null,
> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 324 bytes
> ...
> {Output after producer.send(queue, context.createTextMessage("test"));}
>
> It seems to me that in JMS 2.0 api producer closes link after each send of
> message and then opens a new one when producer wants to send another
> message.
>
> Am I doing something wrong with JMS 2.0 API?
>
> Tomas

You are using it correctly although not necessarily comparing the two in
a fair manner.  In your JMS 1.1 sample you are creating a producer with
a target destination so that producer will send to that destination on
each call send.  However in the JMS 2.0 sample you must create an
anonymous producer as that's how the 2.0 JMS Context API is designed so
if you really want to compare the two correctly you should use a JMS 1.1
anonymous producer by calling createProducer(null) and then sending to
the target destination via producer.send(destination, message).

The reason that the client is creating a new link on each send is that
the message broker in question doesn't support anonymous relay links (at
least it hasn't advertised to the client that it does) so the client
must resort to creating a new link each time you perform a send using
the address of the destination you've chosen to send the message to.  
I'd guess your performance numbers will be much more comparable if you
employ a JMS 1.1 anonymous producer in your testing.

>
>
> --
> View this message in context: http://qpid.2158936.n2.nabble.com/Qpid-JMS-client-Asynchronous-send-JMS-2-0-tp7664785.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>

--
Tim Bish
twitter: @tabish121
blog: http://timbish.blogspot.com/


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Qpid JMS client] Asynchronous send JMS 2.0

Gordon Sim
On 14/07/17 14:49, Timothy Bish wrote:
> The reason that the client is creating a new link on each send is that
> the message broker in question doesn't support anonymous relay links (at
> least it hasn't advertised to the client that it does) so the client
> must resort to creating a new link each time you perform a send using
> the address of the destination you've chosen to send the message to.

Fixed for 1.35 (https://issues.apache.org/jira/browse/QPID-6754)

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Qpid JMS client] Asynchronous send JMS 2.0

Robbie Gemmell
Administrator
In reply to this post by Timothy Bish
On 14 July 2017 at 14:49, Timothy Bish <[hidden email]> wrote:

> On 07/14/2017 09:34 AM, Vavricka wrote:
>>
>> Hi,
>>
>>    qpid jms client version - 0.23.0
>>    qpid c++ broker version - 0.34
>>
>>    I am unable to send messages asynchronously using the JMS 2.0 API.
>>
>>    Connection string is same for JMS 1.1 and JMS 2.0 -
>>
>> "amqp://host:20405?jms.username=admin&jms.password=admin&amqp.traceFrames=true&jms.forceAsyncSend=true&jms.forceAsyncAcks=true"
>>
>>    When I use JMS 1.1 API, it works ok.
>>
>> *Code for JMS 1.1 below.*
>>
>> try
>> {
>>      Topic queue = (Topic) initialContext.lookup("node");
>>      Connection connection = ((ConnectionFactory)
>> initialContext.lookup("connection")).createConnection();
>>      Session session = connection.createSession(false,
>> Session.CLIENT_ACKNOWLEDGE);
>>      MessageProducer producer = session.createProducer(queue);
>>      connection.start();
>>      for (int i = 0; i < 500; i++)
>>      {
>>          producer.send(session.createTextMessage("test"));
>>      }
>> }
>> catch
>> ...
>>
>>   However when I use JMS 2.0 API, sending is extremely slow (1 message per
>> second).
>>
>> *Code for JMS 2.0 API below.*
>>
>> try (JMSContext context =
>> connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE))
>> {
>>      JMSProducer producer = context.createProducer();
>>      producer.setAsync(new SendCompletitionListener());
>>      for (int i = 0; i < senderOptionParser.getMessageCountValue(); i++)
>>      {
>>          producer.send(queue, context.createTextMessage("test"));
>>      }
>> }
>> catch
>> ...
>>
>> *Part of trace output of JMS 1.1 during one message sent:*
>>
>> {Output before producer.send(session.createTextMessage("test"));}
>> ...
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
>> message: JmsOutboundMessageDispatch {dispatchId =
>> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1, MessageID =
>> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1 }
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
>> deliveryId=0, deliveryTag=0, messageFormat=0, settled=null, more=false,
>> rcvSettleMode=null, state=null, resume=false, aborted=false,
>> batchable=false}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 2283 bytes
>> ...
>> {Output after producer.send(session.createTextMessage("test"));}
>>
>> *Part of trace output of JMS 2.0 during one message sent:*
>>
>> {Output before producer.send(queue, context.createTextMessage("test"));}
>> ...
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
>>
>> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
>> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
>> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1',
>> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
>> dynamicNodeProperties=null, distributionMode=null, filter=null,
>> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
>> amqp:released:list, amqp:modified:list], capabilities=null},
>> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> capabilities=[topic]},
>> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
>> maxMessageSize=null, offeredCapabilities=null,
>> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 324 bytes
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read:
>> 212
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 212, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
>>
>> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
>> handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST,
>> source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> distributionMode=null,
>> filter=null, defaultOutcome=null, outcomes=null, capabilities=null},
>> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> capabilities=[topic]},
>> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
>> maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
>> properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=2,
>> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
>> handle=0, deliveryCount=0, linkCredit=500, available=null, drain=false,
>> echo=false, properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_INIT
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_LOCAL_OPEN
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_REMOTE_OPEN
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Open
>> phase
>> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
>> message: JmsOutboundMessageDispatch {dispatchId =
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2, MessageID =
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2 }
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
>> deliveryId=1, deliveryTag=0, messageFormat=0, settled=null, more=false,
>> rcvSettleMode=null, state=null, resume=false, aborted=false,
>> batchable=false}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 2284 bytes
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_FLOW
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 45
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 45, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=3,
>> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
>> handle=0, deliveryCount=1, linkCredit=500, available=null, drain=false,
>> echo=false, properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_FLOW
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 38
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 38, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
>> Disposition{role=RECEIVER,
>> first=1, last=1, settled=true, state=Accepted{}, batchable=false}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> DELIVERY
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Outcome of delivery
>> was accepted: DeliveryImpl [_tag=[48],
>> _link=org.apache.qpid.proton.engine.impl.SenderImpl@67de5132,
>> _deliveryState=null, _settled=false, _remoteSettled=true,
>> _remoteDeliveryState=Accepted{}, _flags=0, _defaultDeliveryState=null,
>>
>> _transportDelivery=org.apache.qpid.proton.engine.impl.TransportDelivery@1303044f,
>> _dataSize=0, _complete=true, _updated=true, _done=true, _offset=0]
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Send
>> phase
>> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer
>> {
>> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } requesting close on
>> remote.
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_LOCAL_CLOSE
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Detach{handle=0,
>> closed=true, error=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 16 bytes
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 24
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Detach{handle=0,
>> closed=true, error=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_REMOTE_CLOSE
>> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
>> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer
>> {
>> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } is now closed:
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Close
>> phase of anonymous send complete:
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_FINAL
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Started
>> send chain for anonymous producer:
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
>> org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder - Creating
>> AmqpFixedProducer for: broadcast
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
>>
>> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2:broadcast',
>> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
>> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2',
>> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
>> dynamicNodeProperties=null, distributionMode=null, filter=null,
>> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
>> amqp:released:list, amqp:modified:list], capabilities=null},
>> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> capabilities=[topic]},
>> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
>> maxMessageSize=null, offeredCapabilities=null,
>> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 324 bytes
>> ...
>> {Output after producer.send(queue, context.createTextMessage("test"));}
>>
>> It seems to me that in JMS 2.0 api producer closes link after each send of
>> message and then opens a new one when producer wants to send another
>> message.
>>
>> Am I doing something wrong with JMS 2.0 API?
>>
>> Tomas
>
>
> You are using it correctly although not necessarily comparing the two in a
> fair manner.  In your JMS 1.1 sample you are creating a producer with a
> target destination so that producer will send to that destination on each
> call send.  However in the JMS 2.0 sample you must create an anonymous
> producer as that's how the 2.0 JMS Context API is designed so if you really
> want to compare the two correctly you should use a JMS 1.1 anonymous
> producer by calling createProducer(null) and then sending to the target
> destination via producer.send(destination, message).
>
> The reason that the client is creating a new link on each send is that the
> message broker in question doesn't support anonymous relay links (at least
> it hasn't advertised to the client that it does) so the client must resort
> to creating a new link each time you perform a send using the address of the
> destination you've chosen to send the message to.  I'd guess your
> performance numbers will be much more comparable if you employ a JMS 1.1
> anonymous producer in your testing.
>

That would be the direct comparison but I'd guess this would make them
both go equally slow :)

As slow as the fallback behaviour of opening and closing a link
per-message might be, necessary to support the anonymous producers
against the older broker, I expect it is still the C++ brokers journal
sync bound acceptance of the persistent message that makes things go
quite as slow as is being seen with the JMSProducer. Only reducing the
brokers journal sync time can make that bit faster.

An alternative suggestion against the older broker would be to
continue to create a 'classic API' JMS MessageProducer (rather than
'simplified API' JMSProducer) to the fixed destination, and use its
new JMS 2.0 send methods to do reliable async sends. That would avoid
the need to open new links for the sends. The message acceptance will
still be bound by the journal sync, but you could then do multiple
sends at a time reliably.

>>
>>
>> --
>> View this message in context:
>> http://qpid.2158936.n2.nabble.com/Qpid-JMS-client-Asynchronous-send-JMS-2-0-tp7664785.html
>> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>>
>
> --
> Tim Bish
> twitter: @tabish121
> blog: http://timbish.blogspot.com/
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...