Qpid Proton C++ - send message only when available from another thread

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Qpid Proton C++ - send message only when available from another thread

Francesco Raviglione
Dear all,
I'm experiencing some issues in writing an AMQP client with Qpid Proton C++.
My client should only send messages to a particular queue on an ActiveMQ
broker and it is not supposed to receive any message over that connection.
The client should not send the messages as soon as it is started (with
"proton::container(AMQP_client).run();"), but it should wait for the data
to be provided by an external thread, which may become available even after
some minutes (I cannot tell in advance when the data will be available, and
there may be even a long time between two consecutive chunks of available
data).

If I try to supply the AMQP client loop with new data through a pipe (on
which data is written by the external thread), I can write an "AMQP_client"
class like the following:

void AMQP_client::on_container_start(proton::container& c) {
     c.connect(broker_address);
}

void AMQP_client::on_connection_open(proton::connection& c) {
     c.open_sender(queue_name);
}

void AMQP_client::on_sendable(proton::sender &s) {
     uint8_t buffer[1024];
     int bufsize;
     proton::message amqp_msg;

     // Wait for new data to be sent (wait for data to be written on the
pipe)
     if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
         perror("read() error");
         return;
     }

     amqp_msg.body(proton::binary(buffer,buffer+bufsize));

     s.send(amqp_msg);
}

In this case, however, "on_sendable" blocks on the read() operation and, if
the data becomes available few minutes after, the broker closes the
connection as the client loop is completely blocked and cannot even send
the heartbeat messages.

If, instead, I do not block on the read() operation (for instance I read()
with a timeout, by using poll()), "on_sendable" is triggered only once and
I cannot find any other event to trigger the transmission of a message when
data becomes available.

I know that, in Python, I could solve this issue for instance by relying on
"EventInjector", but I'm unable to find a similar solution with the C++
version of the library (I would prefer to stick with C++, in this case, and
not to fall back to Qpid Proton C).

Do you know how I can solve this problem? Is there a way to "inject"
external aperiodic events/data to be sent via AMQP?

Thank you very much in advance,
Francesco Raviglione
Reply | Threaded
Open this post in threaded view
|

Re: Qpid Proton C++ - send message only when available from another thread

Robbie Gemmell
Administrator
On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione
<[hidden email]> wrote:

>
> Dear all,
> I'm experiencing some issues in writing an AMQP client with Qpid Proton C++.
> My client should only send messages to a particular queue on an ActiveMQ
> broker and it is not supposed to receive any message over that connection.
> The client should not send the messages as soon as it is started (with
> "proton::container(AMQP_client).run();"), but it should wait for the data
> to be provided by an external thread, which may become available even after
> some minutes (I cannot tell in advance when the data will be available, and
> there may be even a long time between two consecutive chunks of available
> data).
>
> If I try to supply the AMQP client loop with new data through a pipe (on
> which data is written by the external thread), I can write an "AMQP_client"
> class like the following:
>
> void AMQP_client::on_container_start(proton::container& c) {
>      c.connect(broker_address);
> }
>
> void AMQP_client::on_connection_open(proton::connection& c) {
>      c.open_sender(queue_name);
> }
>
> void AMQP_client::on_sendable(proton::sender &s) {
>      uint8_t buffer[1024];
>      int bufsize;
>      proton::message amqp_msg;
>
>      // Wait for new data to be sent (wait for data to be written on the
> pipe)
>      if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
>          perror("read() error");
>          return;
>      }
>
>      amqp_msg.body(proton::binary(buffer,buffer+bufsize));
>
>      s.send(amqp_msg);
> }
>
> In this case, however, "on_sendable" blocks on the read() operation and, if
> the data becomes available few minutes after, the broker closes the
> connection as the client loop is completely blocked and cannot even send
> the heartbeat messages.
>

Yes, as the container thread is also responsible for performing the
IO. By blocking it, you simply stop it doing anything at all for the
connection (and any others in the container), both processing of
[not-]arriving data and sending of any more, such as for heartbeats if
not actual messaging work. So when the thread is eventually unblocked,
its likely going to find either it needs to disconnect the peer for
not sending the client heartbeats (if requested to) or live traffic in
time to satisfy the clients timeout, or the client has itself already
been disconnected by the peer for not sending the peer heartbeats (if
requested to) or live traffic in time to satisfy the peers timeout
(the idle timeouts operate independently in each direction).


> If, instead, I do not block on the read() operation (for instance I read()
> with a timeout, by using poll()), "on_sendable" is triggered only once and
> I cannot find any other event to trigger the transmission of a message when
> data becomes available.
>
> I know that, in Python, I could solve this issue for instance by relying on
> "EventInjector", but I'm unable to find a similar solution with the C++
> version of the library (I would prefer to stick with C++, in this case, and
> not to fall back to Qpid Proton C).
>
> Do you know how I can solve this problem? Is there a way to "inject"
> external aperiodic events/data to be sent via AMQP?
>

Hopefully those with more/any clue about the C++ bits can hopefully
provide a better answer, but...

I believe that is what
http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html
is aimed at. An example with multiple threads using it is at
http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html,
and http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html
also makes use of it, though only from the single container thread
with some scheduling.




> Thank you very much in advance,
> Francesco Raviglione

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Qpid Proton C++ - send message only when available from another thread

Francesco Raviglione
Dear Robbie,
First of all, sorry for the very late reply.

Thank you very much for your reply and for the references to the C++ Work
Queues.
I have been quite busy in the past days with other projects, but I will
definitely look more in detail into Work Queues and their usage with Qpid
Proton C++.

Looking at the examples, however, they do not seem to tackle the case in
which work is added to the work queue from an external, non-Qpid Proton,
thread.
I assume I will need to find a way to create the work queue from the sender
with "&s.work_queue()", when the sender is opened (in "on_sender_open()",
like in the examples), and then make the proton::work_queue object
available outside the Qpid Proton class to be able to "inject" work from
other external threads (would, maybe, making the "proton::work_queue"
public work and, ensure, at the same time, thread safety?).

Thank you very much,
Francesco Raviglione



Il giorno mer 18 nov 2020 alle ore 11:25 Robbie Gemmell <
[hidden email]> ha scritto:

> On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione
> <[hidden email]> wrote:
> >
> > Dear all,
> > I'm experiencing some issues in writing an AMQP client with Qpid Proton
> C++.
> > My client should only send messages to a particular queue on an ActiveMQ
> > broker and it is not supposed to receive any message over that
> connection.
> > The client should not send the messages as soon as it is started (with
> > "proton::container(AMQP_client).run();"), but it should wait for the data
> > to be provided by an external thread, which may become available even
> after
> > some minutes (I cannot tell in advance when the data will be available,
> and
> > there may be even a long time between two consecutive chunks of available
> > data).
> >
> > If I try to supply the AMQP client loop with new data through a pipe (on
> > which data is written by the external thread), I can write an
> "AMQP_client"
> > class like the following:
> >
> > void AMQP_client::on_container_start(proton::container& c) {
> >      c.connect(broker_address);
> > }
> >
> > void AMQP_client::on_connection_open(proton::connection& c) {
> >      c.open_sender(queue_name);
> > }
> >
> > void AMQP_client::on_sendable(proton::sender &s) {
> >      uint8_t buffer[1024];
> >      int bufsize;
> >      proton::message amqp_msg;
> >
> >      // Wait for new data to be sent (wait for data to be written on the
> > pipe)
> >      if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
> >          perror("read() error");
> >          return;
> >      }
> >
> >      amqp_msg.body(proton::binary(buffer,buffer+bufsize));
> >
> >      s.send(amqp_msg);
> > }
> >
> > In this case, however, "on_sendable" blocks on the read() operation and,
> if
> > the data becomes available few minutes after, the broker closes the
> > connection as the client loop is completely blocked and cannot even send
> > the heartbeat messages.
> >
>
> Yes, as the container thread is also responsible for performing the
> IO. By blocking it, you simply stop it doing anything at all for the
> connection (and any others in the container), both processing of
> [not-]arriving data and sending of any more, such as for heartbeats if
> not actual messaging work. So when the thread is eventually unblocked,
> its likely going to find either it needs to disconnect the peer for
> not sending the client heartbeats (if requested to) or live traffic in
> time to satisfy the clients timeout, or the client has itself already
> been disconnected by the peer for not sending the peer heartbeats (if
> requested to) or live traffic in time to satisfy the peers timeout
> (the idle timeouts operate independently in each direction).
>
>
> > If, instead, I do not block on the read() operation (for instance I
> read()
> > with a timeout, by using poll()), "on_sendable" is triggered only once
> and
> > I cannot find any other event to trigger the transmission of a message
> when
> > data becomes available.
> >
> > I know that, in Python, I could solve this issue for instance by relying
> on
> > "EventInjector", but I'm unable to find a similar solution with the C++
> > version of the library (I would prefer to stick with C++, in this case,
> and
> > not to fall back to Qpid Proton C).
> >
> > Do you know how I can solve this problem? Is there a way to "inject"
> > external aperiodic events/data to be sent via AMQP?
> >
>
> Hopefully those with more/any clue about the C++ bits can hopefully
> provide a better answer, but...
>
> I believe that is what
>
> http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html
> is aimed at. An example with multiple threads using it is at
>
> http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html
> ,
> and
> http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html
> also makes use of it, though only from the single container thread
> with some scheduling.
>
>
>
>
> > Thank you very much in advance,
> > Francesco Raviglione
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Qpid Proton C++ - send message only when available from another thread

Robbie Gemmell
Administrator
I dont think you have looked at multithreaded examples closely enough?
That's exactly what it says it's doing and looks to me to do (well, it
also has a 3rd non-container thread for processing received messages).

On Thu, 26 Nov 2020 at 14:30, Francesco Raviglione
<[hidden email]> wrote:

>
> Dear Robbie,
> First of all, sorry for the very late reply.
>
> Thank you very much for your reply and for the references to the C++ Work
> Queues.
> I have been quite busy in the past days with other projects, but I will
> definitely look more in detail into Work Queues and their usage with Qpid
> Proton C++.
>
> Looking at the examples, however, they do not seem to tackle the case in
> which work is added to the work queue from an external, non-Qpid Proton,
> thread.
> I assume I will need to find a way to create the work queue from the sender
> with "&s.work_queue()", when the sender is opened (in "on_sender_open()",
> like in the examples), and then make the proton::work_queue object
> available outside the Qpid Proton class to be able to "inject" work from
> other external threads (would, maybe, making the "proton::work_queue"
> public work and, ensure, at the same time, thread safety?).
>
> Thank you very much,
> Francesco Raviglione
>
>
>
> Il giorno mer 18 nov 2020 alle ore 11:25 Robbie Gemmell <
> [hidden email]> ha scritto:
>
> > On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione
> > <[hidden email]> wrote:
> > >
> > > Dear all,
> > > I'm experiencing some issues in writing an AMQP client with Qpid Proton
> > C++.
> > > My client should only send messages to a particular queue on an ActiveMQ
> > > broker and it is not supposed to receive any message over that
> > connection.
> > > The client should not send the messages as soon as it is started (with
> > > "proton::container(AMQP_client).run();"), but it should wait for the data
> > > to be provided by an external thread, which may become available even
> > after
> > > some minutes (I cannot tell in advance when the data will be available,
> > and
> > > there may be even a long time between two consecutive chunks of available
> > > data).
> > >
> > > If I try to supply the AMQP client loop with new data through a pipe (on
> > > which data is written by the external thread), I can write an
> > "AMQP_client"
> > > class like the following:
> > >
> > > void AMQP_client::on_container_start(proton::container& c) {
> > >      c.connect(broker_address);
> > > }
> > >
> > > void AMQP_client::on_connection_open(proton::connection& c) {
> > >      c.open_sender(queue_name);
> > > }
> > >
> > > void AMQP_client::on_sendable(proton::sender &s) {
> > >      uint8_t buffer[1024];
> > >      int bufsize;
> > >      proton::message amqp_msg;
> > >
> > >      // Wait for new data to be sent (wait for data to be written on the
> > > pipe)
> > >      if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
> > >          perror("read() error");
> > >          return;
> > >      }
> > >
> > >      amqp_msg.body(proton::binary(buffer,buffer+bufsize));
> > >
> > >      s.send(amqp_msg);
> > > }
> > >
> > > In this case, however, "on_sendable" blocks on the read() operation and,
> > if
> > > the data becomes available few minutes after, the broker closes the
> > > connection as the client loop is completely blocked and cannot even send
> > > the heartbeat messages.
> > >
> >
> > Yes, as the container thread is also responsible for performing the
> > IO. By blocking it, you simply stop it doing anything at all for the
> > connection (and any others in the container), both processing of
> > [not-]arriving data and sending of any more, such as for heartbeats if
> > not actual messaging work. So when the thread is eventually unblocked,
> > its likely going to find either it needs to disconnect the peer for
> > not sending the client heartbeats (if requested to) or live traffic in
> > time to satisfy the clients timeout, or the client has itself already
> > been disconnected by the peer for not sending the peer heartbeats (if
> > requested to) or live traffic in time to satisfy the peers timeout
> > (the idle timeouts operate independently in each direction).
> >
> >
> > > If, instead, I do not block on the read() operation (for instance I
> > read()
> > > with a timeout, by using poll()), "on_sendable" is triggered only once
> > and
> > > I cannot find any other event to trigger the transmission of a message
> > when
> > > data becomes available.
> > >
> > > I know that, in Python, I could solve this issue for instance by relying
> > on
> > > "EventInjector", but I'm unable to find a similar solution with the C++
> > > version of the library (I would prefer to stick with C++, in this case,
> > and
> > > not to fall back to Qpid Proton C).
> > >
> > > Do you know how I can solve this problem? Is there a way to "inject"
> > > external aperiodic events/data to be sent via AMQP?
> > >
> >
> > Hopefully those with more/any clue about the C++ bits can hopefully
> > provide a better answer, but...
> >
> > I believe that is what
> >
> > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html
> > is aimed at. An example with multiple threads using it is at
> >
> > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html
> > ,
> > and
> > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html
> > also makes use of it, though only from the single container thread
> > with some scheduling.
> >
> >
> >
> >
> > > Thank you very much in advance,
> > > Francesco Raviglione
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [hidden email]
> > For additional commands, e-mail: [hidden email]
> >
> >

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Qpid Proton C++ - send message only when available from another thread

Francesco Raviglione
 Dear Robbie,
You are right.
Today I had time to have a more close and better look at the examples and I
noticed the the "multithreaded_client" (
http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html)
could be exactly what I'm looking for, as it is showing how work (i.e.
sending messages, other than having a separate thread for processing
received messages) is fed to the Qpid Proton thread from other external
threads (i.e., if I understood this correctly, the "sender" thread, calling
"cl.send(msg)", which causes new work to be added to the Qpid Proton thread
with "work_queue()->add([=]() { sender_.send(msg); });").

Thank you again for your assistance,
Francesco Raviglione



Il giorno gio 26 nov 2020 alle ore 16:00 Robbie Gemmell <
[hidden email]> ha scritto:

> I dont think you have looked at multithreaded examples closely enough?
> That's exactly what it says it's doing and looks to me to do (well, it
> also has a 3rd non-container thread for processing received messages).
>
> On Thu, 26 Nov 2020 at 14:30, Francesco Raviglione
> <[hidden email]> wrote:
> >
> > Dear Robbie,
> > First of all, sorry for the very late reply.
> >
> > Thank you very much for your reply and for the references to the C++ Work
> > Queues.
> > I have been quite busy in the past days with other projects, but I will
> > definitely look more in detail into Work Queues and their usage with Qpid
> > Proton C++.
> >
> > Looking at the examples, however, they do not seem to tackle the case in
> > which work is added to the work queue from an external, non-Qpid Proton,
> > thread.
> > I assume I will need to find a way to create the work queue from the
> sender
> > with "&s.work_queue()", when the sender is opened (in "on_sender_open()",
> > like in the examples), and then make the proton::work_queue object
> > available outside the Qpid Proton class to be able to "inject" work from
> > other external threads (would, maybe, making the "proton::work_queue"
> > public work and, ensure, at the same time, thread safety?).
> >
> > Thank you very much,
> > Francesco Raviglione
> >
> >
> >
> > Il giorno mer 18 nov 2020 alle ore 11:25 Robbie Gemmell <
> > [hidden email]> ha scritto:
> >
> > > On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione
> > > <[hidden email]> wrote:
> > > >
> > > > Dear all,
> > > > I'm experiencing some issues in writing an AMQP client with Qpid
> Proton
> > > C++.
> > > > My client should only send messages to a particular queue on an
> ActiveMQ
> > > > broker and it is not supposed to receive any message over that
> > > connection.
> > > > The client should not send the messages as soon as it is started
> (with
> > > > "proton::container(AMQP_client).run();"), but it should wait for the
> data
> > > > to be provided by an external thread, which may become available even
> > > after
> > > > some minutes (I cannot tell in advance when the data will be
> available,
> > > and
> > > > there may be even a long time between two consecutive chunks of
> available
> > > > data).
> > > >
> > > > If I try to supply the AMQP client loop with new data through a pipe
> (on
> > > > which data is written by the external thread), I can write an
> > > "AMQP_client"
> > > > class like the following:
> > > >
> > > > void AMQP_client::on_container_start(proton::container& c) {
> > > >      c.connect(broker_address);
> > > > }
> > > >
> > > > void AMQP_client::on_connection_open(proton::connection& c) {
> > > >      c.open_sender(queue_name);
> > > > }
> > > >
> > > > void AMQP_client::on_sendable(proton::sender &s) {
> > > >      uint8_t buffer[1024];
> > > >      int bufsize;
> > > >      proton::message amqp_msg;
> > > >
> > > >      // Wait for new data to be sent (wait for data to be written on
> the
> > > > pipe)
> > > >      if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
> > > >          perror("read() error");
> > > >          return;
> > > >      }
> > > >
> > > >      amqp_msg.body(proton::binary(buffer,buffer+bufsize));
> > > >
> > > >      s.send(amqp_msg);
> > > > }
> > > >
> > > > In this case, however, "on_sendable" blocks on the read() operation
> and,
> > > if
> > > > the data becomes available few minutes after, the broker closes the
> > > > connection as the client loop is completely blocked and cannot even
> send
> > > > the heartbeat messages.
> > > >
> > >
> > > Yes, as the container thread is also responsible for performing the
> > > IO. By blocking it, you simply stop it doing anything at all for the
> > > connection (and any others in the container), both processing of
> > > [not-]arriving data and sending of any more, such as for heartbeats if
> > > not actual messaging work. So when the thread is eventually unblocked,
> > > its likely going to find either it needs to disconnect the peer for
> > > not sending the client heartbeats (if requested to) or live traffic in
> > > time to satisfy the clients timeout, or the client has itself already
> > > been disconnected by the peer for not sending the peer heartbeats (if
> > > requested to) or live traffic in time to satisfy the peers timeout
> > > (the idle timeouts operate independently in each direction).
> > >
> > >
> > > > If, instead, I do not block on the read() operation (for instance I
> > > read()
> > > > with a timeout, by using poll()), "on_sendable" is triggered only
> once
> > > and
> > > > I cannot find any other event to trigger the transmission of a
> message
> > > when
> > > > data becomes available.
> > > >
> > > > I know that, in Python, I could solve this issue for instance by
> relying
> > > on
> > > > "EventInjector", but I'm unable to find a similar solution with the
> C++
> > > > version of the library (I would prefer to stick with C++, in this
> case,
> > > and
> > > > not to fall back to Qpid Proton C).
> > > >
> > > > Do you know how I can solve this problem? Is there a way to "inject"
> > > > external aperiodic events/data to be sent via AMQP?
> > > >
> > >
> > > Hopefully those with more/any clue about the C++ bits can hopefully
> > > provide a better answer, but...
> > >
> > > I believe that is what
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html
> > > is aimed at. An example with multiple threads using it is at
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html
> > > ,
> > > and
> > >
> http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html
> > > also makes use of it, though only from the single container thread
> > > with some scheduling.
> > >
> > >
> > >
> > >
> > > > Thank you very much in advance,
> > > > Francesco Raviglione
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: [hidden email]
> > > For additional commands, e-mail: [hidden email]
> > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>