|
Hi,
I am using Qpid tools to create the queue's on C++ broker 0.14, and a java client to publish messages to the queue. Both messages and queues are durable/persistent. I have observed that when I have used, the java client, only the messages which enter to the in-memory, are persisted to the persistence store and when I stop and restart my qpid server, the messages are not lost. However, if any of the queue limits(size or count) is hit, messages are not flowing to disk and all the new messages where lost But, if I use the spout which is a c++ client, and any of the queue limit have been hit, messages are persisting to the disk, and I am not loosing messages, even when the queue is full. My requirement is to use C++ broker and a java client, the all the messages should persist to disk when queues are full. Below is the command which I use to create the queue: ======================================================================= Script to create a queue qpid-config add queue que1 --durable --file-size=24 --file-count=8 --max-queue-count=20 --limit-policy=flow-to-disk ======================================================================= I am using a java client, which is trying to publish messages to the queue. Below are the jms connection url, connection details and sample code, to show how I publish: =========================================================================== jms settings: connectionfactory.customer.qpidConnectionfactory = amqp\://guest\:guest@localhost/test?brokerlist\='tcp\://localhost\:5672?retries\='3'&connecttimeout\='10000'&connectdelay\='3000'' destination.customer.queue=BURL:direct://amq.direct/que1/que1?routingkey='que1'&exclusive='false' ==================== Sample Code to create connection ConnectionFactory connectionFactory = (AMQConnectionFactory) context.lookup("customer.qpidConnectionfactory"); AMQConnection connection = (AMQConnection) connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); ===================== Sample code to create Producer to queue Destination destination = (Destination) context.lookup("customer.queue"); MessageProducer producer = session.createProducer(destination); producer.send(message, DeliveryMode.PERSISTENT, 0, 0L); ======================================================================== Please help me with this problem. Thanks in advance. Regards, Parkirat Singh Bagga |
|
Did nobody faced this problem...???
|
|
In reply to this post by ParkiratBagga
The jms client code looks fine to me and the your observation that messages
are available after a restart confirms that the jms client is marking the messages as persistent and the broker doing it's part. (However IIRC messages does not need to be marked persistent for flow-2-disk to work - but I stand to be corrected). On a separate note I don't recommend using flow-2-disk as it's very slow and can drastically affect the broker performance in general. I don't think the situation has changed in the recent past, therefore my advice is to not use this feature. Regards, Rajith On Thu, Mar 22, 2012 at 8:41 AM, ParkiratBagga <[hidden email]>wrote: > Hi, > > I am using Qpid tools to create the queue's on C++ broker 0.14, and a java > client to publish messages to the queue. Both messages and queues are > durable/persistent. > > I have observed that when I have used, the java client, only the messages > which enter to the in-memory, are persisted to the persistence store and > when I stop and restart my qpid server, the messages are not lost. However, > if any of the queue limits(size or count) is hit, messages are not flowing > to disk and all the new messages where lost > > But, if I use the spout which is a c++ client, and any of the queue limit > have been hit, messages are persisting to the disk, and I am not loosing > messages, even when the queue is full. > > My requirement is to use C++ broker and a java client, the all the messages > should persist to disk when queues are full. > > Below is the command which I use to create the queue: > > ======================================================================= > *Script to create a queue* > qpid-config add queue que1 --durable --file-size=24 --file-count=8 > --max-queue-count=20 --limit-policy=flow-to-disk > ======================================================================= > > I am using a java client, which is trying to publish messages to the queue. > Below are the jms connection url, connection details and sample code, to > show how I publish: > > =========================================================================== > *jms settings:* > > connectionfactory.customer.qpidConnectionfactory = > amqp\://guest\:guest@localhost > /test?brokerlist\='tcp\://localhost\:5672?retries\='3'&connecttimeout\='10000'&connectdelay\='3000'' > > > destination.customer.queue=BURL:direct://amq.direct/que1/que1?routingkey='que1'&exclusive='false' > ==================== > *Sample Code to create connection* > > ConnectionFactory connectionFactory = (AMQConnectionFactory) > context.lookup("customer.qpidConnectionfactory"); > AMQConnection connection = (AMQConnection) > connectionFactory.createConnection(); > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > connection.start(); > ===================== > *Sample code to create Producer to queue* > > Destination destination = (Destination) context.lookup("customer.queue"); > MessageProducer producer = session.createProducer(destination); > producer.send(message, DeliveryMode.PERSISTENT, 0, 0L); > ======================================================================== > > Please help me with this problem. > > Thanks in advance. > > Regards, > Parkirat Singh Bagga > > > -- > View this message in context: > http://qpid.2158936.n2.nabble.com/C-broker-0-14-java-client-flow-to-disk-not-happening-tp7395366p7395366.html > Sent from the Apache Qpid users mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [hidden email] > For additional commands, e-mail: [hidden email] > > |
|
This post was updated on .
Thanks Rajith... for looking into the problem. Even I have seen performance drop when I use persistence especially with big messages.
I have a requirement of using qpid persistence with c++ broker and java client. When I use durable queues and durable messages and if any queue limit is hit. Message Enque rate starts to drop though now it is persisting to disk, but I get a exception while it is still persisting to disk. Below is the exception log ============================================================== Exceptionjavax.jms.JMSException: Exception when sending message javax.jms.JMSException: Exception when sending message at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456) at org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283) at com.apigee.qpid.producer.PublishMessageTask.run(PublishMessageTask.java:130) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:680) Caused by: org.apache.qpid.transport.SessionException: timed out waiting for completion at org.apache.qpid.transport.Session.invoke(Session.java:688) at org.apache.qpid.transport.Session.invoke(Session.java:559) at org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96) at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226) ... 12 more =============================================================== Kindly tell, if this is the expected behavior. Regards, Parkirat Singh Bagga. |
|
Even Java Spout Example is giving the following exception while running with c++ broker.
Queue Config: que --durable --file-size=100 --file-count=8 --max-queue-size=2242880 --max-queue-count=10 --limit-policy=flow-to-disk Spout Config: -b=guest:guest@<host>:5672 -c=100000 --content=PersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistence 'que' ==================================================== Exception Log: Exception in thread "main" javax.jms.JMSException: Exception when sending message at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456) at org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283) at org.apache.qpid.example.Spout.<init>(Spout.java:98) at org.apache.qpid.example.Spout.main(Spout.java:146) Caused by: org.apache.qpid.transport.SessionException: timed out waiting for completion at org.apache.qpid.transport.Session.invoke(Session.java:688) at org.apache.qpid.transport.Session.invoke(Session.java:559) at org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96) at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226) ... 5 more ==================================================== Regards, Parkirat Bagga. |
|
On 04/09/2012 09:18 PM, ParkiratBagga wrote:
> Even Java Spout Example is giving the following exception while running with > c++ broker. > > *Queue Config:* > que --durable --file-size=100 --file-count=8 --max-queue-size=2242880 > --max-queue-count=10 --limit-policy=flow-to-disk > > *Spout Config:* > -b=guest:guest@<host>:5672 -c=100000 > --content=PersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistence > 'que' > > ==================================================== > *Exception Log:* > Exception in thread "main" javax.jms.JMSException: Exception when sending > message > at > org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240) > at > org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501) > at > org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456) > at > org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283) > at org.apache.qpid.example.Spout.<init>(Spout.java:98) > at org.apache.qpid.example.Spout.main(Spout.java:146) > Caused by: org.apache.qpid.transport.SessionException: timed out waiting for > completion > at org.apache.qpid.transport.Session.invoke(Session.java:688) > at org.apache.qpid.transport.Session.invoke(Session.java:559) > at > org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96) > at > org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226) > ... 5 more > ==================================================== This looks like it is due to the brokers attempt to flow control the client. By default, the broker will stop acknowledging messages when it reaches 80% of queue capacity (except for ring queues). You can disable that by either setting the --default-flow-stop-threshold and --default-flow-resume-threshold options to qpidd to 0 (i.e. globally disabling the default) or setting the --flow-stop-count= and --flow-resume-count options to qpid-config to 0 (i.e. disable per-queue) --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
|
In reply to this post by ParkiratBagga
Sorry for dropping the ball on this.
It seems Gordon has provided the answer you are looking for. Sorry again for the delay in responding. Rajith On Mon, Apr 9, 2012 at 4:18 PM, ParkiratBagga <[hidden email]>wrote: > Even Java Spout Example is giving the following exception while running > with > c++ broker. > > *Queue Config:* > que --durable --file-size=100 --file-count=8 --max-queue-size=2242880 > --max-queue-count=10 --limit-policy=flow-to-disk > > *Spout Config:* > -b=guest:guest@<host>:5672 -c=100000 > > --content=PersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistencePersistence > 'que' > > ==================================================== > *Exception Log:* > Exception in thread "main" javax.jms.JMSException: Exception when sending > message > at > > org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240) > at > > org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501) > at > > org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456) > at > > org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283) > at org.apache.qpid.example.Spout.<init>(Spout.java:98) > at org.apache.qpid.example.Spout.main(Spout.java:146) > Caused by: org.apache.qpid.transport.SessionException: timed out waiting > for > completion > at org.apache.qpid.transport.Session.invoke(Session.java:688) > at org.apache.qpid.transport.Session.invoke(Session.java:559) > at > > org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96) > at > > org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226) > ... 5 more > ==================================================== > > Regards, > Parkirat Bagga. > > > -- > View this message in context: > http://qpid.2158936.n2.nabble.com/C-broker-0-14-java-client-flow-to-disk-not-happening-tp7395366p7450903.html > Sent from the Apache Qpid users mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [hidden email] > For additional commands, e-mail: [hidden email] > > |
|
This post was updated on .
Thanks Gordon and Rajith and Sorry for replying so late.
The flow threshold parameters worked well and now flow-to-disk is happening properly. Regards, Parkirat Bagga |
| Powered by Nabble | Edit this page |
